test_matrixfederationclient.py 21 KB


  1. # Copyright 2018 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Generator
  15. from unittest.mock import Mock
  16. from netaddr import IPSet
  17. from parameterized import parameterized
  18. from twisted.internet import defer
  19. from twisted.internet.defer import Deferred, TimeoutError
  20. from twisted.internet.error import ConnectingCancelledError, DNSLookupError
  21. from twisted.test.proto_helpers import MemoryReactor, StringTransport
  22. from twisted.web.client import ResponseNeverReceived
  23. from twisted.web.http import HTTPChannel
  24. from synapse.api.errors import RequestSendFailed
  25. from synapse.http.matrixfederationclient import (
  26. ByteParser,
  27. MatrixFederationHttpClient,
  28. MatrixFederationRequest,
  29. )
  30. from synapse.logging.context import (
  31. SENTINEL_CONTEXT,
  32. LoggingContext,
  33. LoggingContextOrSentinel,
  34. current_context,
  35. )
  36. from synapse.server import HomeServer
  37. from synapse.util import Clock
  38. from tests.server import FakeTransport
  39. from tests.unittest import HomeserverTestCase
  40. def check_logcontext(context: LoggingContextOrSentinel) -> None:
  41. current = current_context()
  42. if current is not context:
  43. raise AssertionError("Expected logcontext %s but was %s" % (context, current))
  44. class FederationClientTests(HomeserverTestCase):
  45. def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
  46. hs = self.setup_test_homeserver(reactor=reactor, clock=clock)
  47. return hs
  48. def prepare(
  49. self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
  50. ) -> None:
  51. self.cl = MatrixFederationHttpClient(self.hs, None)
  52. self.reactor.lookups["testserv"] = "1.2.3.4"
  53. def test_client_get(self) -> None:
  54. """
  55. happy-path test of a GET request
  56. """
  57. @defer.inlineCallbacks
  58. def do_request() -> Generator["Deferred[object]", object, object]:
  59. with LoggingContext("one") as context:
  60. fetch_d = defer.ensureDeferred(
  61. self.cl.get_json("testserv:8008", "foo/bar")
  62. )
  63. # Nothing happened yet
  64. self.assertNoResult(fetch_d)
  65. # should have reset logcontext to the sentinel
  66. check_logcontext(SENTINEL_CONTEXT)
  67. try:
  68. fetch_res = yield fetch_d
  69. return fetch_res
  70. finally:
  71. check_logcontext(context)
  72. test_d = do_request()
  73. self.pump()
  74. # Nothing happened yet
  75. self.assertNoResult(test_d)
  76. # Make sure treq is trying to connect
  77. clients = self.reactor.tcpClients
  78. self.assertEqual(len(clients), 1)
  79. (host, port, factory, _timeout, _bindAddress) = clients[0]
  80. self.assertEqual(host, "1.2.3.4")
  81. self.assertEqual(port, 8008)
  82. # complete the connection and wire it up to a fake transport
  83. protocol = factory.buildProtocol(None)
  84. transport = StringTransport()
  85. protocol.makeConnection(transport)
  86. # that should have made it send the request to the transport
  87. self.assertRegex(transport.value(), b"^GET /foo/bar")
  88. self.assertRegex(transport.value(), b"Host: testserv:8008")
  89. # Deferred is still without a result
  90. self.assertNoResult(test_d)
  91. # Send it the HTTP response
  92. res_json = b'{ "a": 1 }'
  93. protocol.dataReceived(
  94. b"HTTP/1.1 200 OK\r\n"
  95. b"Server: Fake\r\n"
  96. b"Content-Type: application/json\r\n"
  97. b"Content-Length: %i\r\n"
  98. b"\r\n"
  99. b"%s" % (len(res_json), res_json)
  100. )
  101. self.pump()
  102. res = self.successResultOf(test_d)
  103. # check the response is as expected
  104. self.assertEqual(res, {"a": 1})
  105. def test_dns_error(self) -> None:
  106. """
  107. If the DNS lookup returns an error, it will bubble up.
  108. """
  109. d = defer.ensureDeferred(
  110. self.cl.get_json("testserv2:8008", "foo/bar", timeout=10000)
  111. )
  112. self.pump()
  113. f = self.failureResultOf(d)
  114. self.assertIsInstance(f.value, RequestSendFailed)
  115. self.assertIsInstance(f.value.inner_exception, DNSLookupError)
  116. def test_client_connection_refused(self) -> None:
  117. d = defer.ensureDeferred(
  118. self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
  119. )
  120. self.pump()
  121. # Nothing happened yet
  122. self.assertNoResult(d)
  123. clients = self.reactor.tcpClients
  124. self.assertEqual(len(clients), 1)
  125. (host, port, factory, _timeout, _bindAddress) = clients[0]
  126. self.assertEqual(host, "1.2.3.4")
  127. self.assertEqual(port, 8008)
  128. e = Exception("go away")
  129. factory.clientConnectionFailed(None, e)
  130. self.pump(0.5)
  131. f = self.failureResultOf(d)
  132. self.assertIsInstance(f.value, RequestSendFailed)
  133. self.assertIs(f.value.inner_exception, e)
  134. def test_client_never_connect(self) -> None:
  135. """
  136. If the HTTP request is not connected and is timed out, it'll give a
  137. ConnectingCancelledError or TimeoutError.
  138. """
  139. d = defer.ensureDeferred(
  140. self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
  141. )
  142. self.pump()
  143. # Nothing happened yet
  144. self.assertNoResult(d)
  145. # Make sure treq is trying to connect
  146. clients = self.reactor.tcpClients
  147. self.assertEqual(len(clients), 1)
  148. self.assertEqual(clients[0][0], "1.2.3.4")
  149. self.assertEqual(clients[0][1], 8008)
  150. # Deferred is still without a result
  151. self.assertNoResult(d)
  152. # Push by enough to time it out
  153. self.reactor.advance(10.5)
  154. f = self.failureResultOf(d)
  155. self.assertIsInstance(f.value, RequestSendFailed)
  156. self.assertIsInstance(
  157. f.value.inner_exception, (ConnectingCancelledError, TimeoutError)
  158. )
  159. def test_client_connect_no_response(self) -> None:
  160. """
  161. If the HTTP request is connected, but gets no response before being
  162. timed out, it'll give a ResponseNeverReceived.
  163. """
  164. d = defer.ensureDeferred(
  165. self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
  166. )
  167. self.pump()
  168. # Nothing happened yet
  169. self.assertNoResult(d)
  170. # Make sure treq is trying to connect
  171. clients = self.reactor.tcpClients
  172. self.assertEqual(len(clients), 1)
  173. self.assertEqual(clients[0][0], "1.2.3.4")
  174. self.assertEqual(clients[0][1], 8008)
  175. conn = Mock()
  176. client = clients[0][2].buildProtocol(None)
  177. client.makeConnection(conn)
  178. # Deferred is still without a result
  179. self.assertNoResult(d)
  180. # Push by enough to time it out
  181. self.reactor.advance(10.5)
  182. f = self.failureResultOf(d)
  183. self.assertIsInstance(f.value, RequestSendFailed)
  184. self.assertIsInstance(f.value.inner_exception, ResponseNeverReceived)
  185. def test_client_ip_range_blocklist(self) -> None:
  186. """Ensure that Synapse does not try to connect to blocked IPs"""
  187. # Set up the ip_range blocklist
  188. self.hs.config.server.federation_ip_range_blocklist = IPSet(
  189. ["127.0.0.0/8", "fe80::/64"]
  190. )
  191. self.reactor.lookups["internal"] = "127.0.0.1"
  192. self.reactor.lookups["internalv6"] = "fe80:0:0:0:0:8a2e:370:7337"
  193. self.reactor.lookups["fine"] = "10.20.30.40"
  194. cl = MatrixFederationHttpClient(self.hs, None)
  195. # Try making a GET request to a blocked IPv4 address
  196. # ------------------------------------------------------
  197. # Make the request
  198. d = defer.ensureDeferred(cl.get_json("internal:8008", "foo/bar", timeout=10000))
  199. # Nothing happened yet
  200. self.assertNoResult(d)
  201. self.pump(1)
  202. # Check that it was unable to resolve the address
  203. clients = self.reactor.tcpClients
  204. self.assertEqual(len(clients), 0)
  205. f = self.failureResultOf(d)
  206. self.assertIsInstance(f.value, RequestSendFailed)
  207. self.assertIsInstance(f.value.inner_exception, DNSLookupError)
  208. # Try making a POST request to a blocked IPv6 address
  209. # -------------------------------------------------------
  210. # Make the request
  211. d = defer.ensureDeferred(
  212. cl.post_json("internalv6:8008", "foo/bar", timeout=10000)
  213. )
  214. # Nothing has happened yet
  215. self.assertNoResult(d)
  216. # Move the reactor forwards
  217. self.pump(1)
  218. # Check that it was unable to resolve the address
  219. clients = self.reactor.tcpClients
  220. self.assertEqual(len(clients), 0)
  221. # Check that it was due to a blocked DNS lookup
  222. f = self.failureResultOf(d, RequestSendFailed)
  223. self.assertIsInstance(f.value.inner_exception, DNSLookupError)
  224. # Try making a GET request to an allowed IPv4 address
  225. # ----------------------------------------------------------
  226. # Make the request
  227. d = defer.ensureDeferred(cl.post_json("fine:8008", "foo/bar", timeout=10000))
  228. # Nothing has happened yet
  229. self.assertNoResult(d)
  230. # Move the reactor forwards
  231. self.pump(1)
  232. # Check that it was able to resolve the address
  233. clients = self.reactor.tcpClients
  234. self.assertNotEqual(len(clients), 0)
  235. # Connection will still fail as this IP address does not resolve to anything
  236. f = self.failureResultOf(d, RequestSendFailed)
  237. self.assertIsInstance(f.value.inner_exception, ConnectingCancelledError)
  238. def test_client_gets_headers(self) -> None:
  239. """
  240. Once the client gets the headers, _request returns successfully.
  241. """
  242. request = MatrixFederationRequest(
  243. method="GET", destination="testserv:8008", path="foo/bar"
  244. )
  245. d = defer.ensureDeferred(self.cl._send_request(request, timeout=10000))
  246. self.pump()
  247. conn = Mock()
  248. clients = self.reactor.tcpClients
  249. client = clients[0][2].buildProtocol(None)
  250. client.makeConnection(conn)
  251. # Deferred does not have a result
  252. self.assertNoResult(d)
  253. # Send it the HTTP response
  254. client.dataReceived(b"HTTP/1.1 200 OK\r\nServer: Fake\r\n\r\n")
  255. # We should get a successful response
  256. r = self.successResultOf(d)
  257. self.assertEqual(r.code, 200)
  258. @parameterized.expand(["get_json", "post_json", "delete_json", "put_json"])
  259. def test_timeout_reading_body(self, method_name: str) -> None:
  260. """
  261. If the HTTP request is connected, but gets no response before being
  262. timed out, it'll give a RequestSendFailed with can_retry.
  263. """
  264. method = getattr(self.cl, method_name)
  265. d = defer.ensureDeferred(method("testserv:8008", "foo/bar", timeout=10000))
  266. self.pump()
  267. conn = Mock()
  268. clients = self.reactor.tcpClients
  269. client = clients[0][2].buildProtocol(None)
  270. client.makeConnection(conn)
  271. # Deferred does not have a result
  272. self.assertNoResult(d)
  273. # Send it the HTTP response
  274. client.dataReceived(
  275. b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n"
  276. b"Server: Fake\r\n\r\n"
  277. )
  278. # Push by enough to time it out
  279. self.reactor.advance(10.5)
  280. f = self.failureResultOf(d)
  281. self.assertIsInstance(f.value, RequestSendFailed)
  282. self.assertTrue(f.value.can_retry)
  283. self.assertIsInstance(f.value.inner_exception, defer.TimeoutError)
  284. def test_client_requires_trailing_slashes(self) -> None:
  285. """
  286. If a connection is made to a client but the client rejects it due to
  287. requiring a trailing slash. We need to retry the request with a
  288. trailing slash. Workaround for Synapse <= v0.99.3, explained in #3622.
  289. """
  290. d = defer.ensureDeferred(
  291. self.cl.get_json("testserv:8008", "foo/bar", try_trailing_slash_on_400=True)
  292. )
  293. # Send the request
  294. self.pump()
  295. # there should have been a call to connectTCP
  296. clients = self.reactor.tcpClients
  297. self.assertEqual(len(clients), 1)
  298. (_host, _port, factory, _timeout, _bindAddress) = clients[0]
  299. # complete the connection and wire it up to a fake transport
  300. client = factory.buildProtocol(None)
  301. conn = StringTransport()
  302. client.makeConnection(conn)
  303. # that should have made it send the request to the connection
  304. self.assertRegex(conn.value(), b"^GET /foo/bar")
  305. # Clear the original request data before sending a response
  306. conn.clear()
  307. # Send the HTTP response
  308. client.dataReceived(
  309. b"HTTP/1.1 400 Bad Request\r\n"
  310. b"Content-Type: application/json\r\n"
  311. b"Content-Length: 59\r\n"
  312. b"\r\n"
  313. b'{"errcode":"M_UNRECOGNIZED","error":"Unrecognized request"}'
  314. )
  315. # We should get another request with a trailing slash
  316. self.assertRegex(conn.value(), b"^GET /foo/bar/")
  317. # Send a happy response this time
  318. client.dataReceived(
  319. b"HTTP/1.1 200 OK\r\n"
  320. b"Content-Type: application/json\r\n"
  321. b"Content-Length: 2\r\n"
  322. b"\r\n"
  323. b"{}"
  324. )
  325. # We should get a successful response
  326. r = self.successResultOf(d)
  327. self.assertEqual(r, {})
  328. def test_client_does_not_retry_on_400_plus(self) -> None:
  329. """
  330. Another test for trailing slashes but now test that we don't retry on
  331. trailing slashes on a non-400/M_UNRECOGNIZED response.
  332. See test_client_requires_trailing_slashes() for context.
  333. """
  334. d = defer.ensureDeferred(
  335. self.cl.get_json("testserv:8008", "foo/bar", try_trailing_slash_on_400=True)
  336. )
  337. # Send the request
  338. self.pump()
  339. # there should have been a call to connectTCP
  340. clients = self.reactor.tcpClients
  341. self.assertEqual(len(clients), 1)
  342. (_host, _port, factory, _timeout, _bindAddress) = clients[0]
  343. # complete the connection and wire it up to a fake transport
  344. client = factory.buildProtocol(None)
  345. conn = StringTransport()
  346. client.makeConnection(conn)
  347. # that should have made it send the request to the connection
  348. self.assertRegex(conn.value(), b"^GET /foo/bar")
  349. # Clear the original request data before sending a response
  350. conn.clear()
  351. # Send the HTTP response
  352. client.dataReceived(
  353. b"HTTP/1.1 404 Not Found\r\n"
  354. b"Content-Type: application/json\r\n"
  355. b"Content-Length: 2\r\n"
  356. b"\r\n"
  357. b"{}"
  358. )
  359. # We should not get another request
  360. self.assertEqual(conn.value(), b"")
  361. # We should get a 404 failure response
  362. self.failureResultOf(d)
  363. def test_client_sends_body(self) -> None:
  364. defer.ensureDeferred(
  365. self.cl.post_json(
  366. "testserv:8008", "foo/bar", timeout=10000, data={"a": "b"}
  367. )
  368. )
  369. self.pump()
  370. clients = self.reactor.tcpClients
  371. self.assertEqual(len(clients), 1)
  372. client = clients[0][2].buildProtocol(None)
  373. server = HTTPChannel()
  374. client.makeConnection(FakeTransport(server, self.reactor))
  375. server.makeConnection(FakeTransport(client, self.reactor))
  376. self.pump(0.1)
  377. self.assertEqual(len(server.requests), 1)
  378. request = server.requests[0]
  379. content = request.content.read()
  380. self.assertEqual(content, b'{"a":"b"}')
  381. def test_closes_connection(self) -> None:
  382. """Check that the client closes unused HTTP connections"""
  383. d = defer.ensureDeferred(self.cl.get_json("testserv:8008", "foo/bar"))
  384. self.pump()
  385. # there should have been a call to connectTCP
  386. clients = self.reactor.tcpClients
  387. self.assertEqual(len(clients), 1)
  388. (_host, _port, factory, _timeout, _bindAddress) = clients[0]
  389. # complete the connection and wire it up to a fake transport
  390. client = factory.buildProtocol(None)
  391. conn = StringTransport()
  392. client.makeConnection(conn)
  393. # that should have made it send the request to the connection
  394. self.assertRegex(conn.value(), b"^GET /foo/bar")
  395. # Send the HTTP response
  396. client.dataReceived(
  397. b"HTTP/1.1 200 OK\r\n"
  398. b"Content-Type: application/json\r\n"
  399. b"Content-Length: 2\r\n"
  400. b"\r\n"
  401. b"{}"
  402. )
  403. # We should get a successful response
  404. r = self.successResultOf(d)
  405. self.assertEqual(r, {})
  406. self.assertFalse(conn.disconnecting)
  407. # wait for a while
  408. self.reactor.advance(120)
  409. self.assertTrue(conn.disconnecting)
  410. @parameterized.expand([(b"",), (b"foo",), (b'{"a": Infinity}',)])
  411. def test_json_error(self, return_value: bytes) -> None:
  412. """
  413. Test what happens if invalid JSON is returned from the remote endpoint.
  414. """
  415. test_d = defer.ensureDeferred(self.cl.get_json("testserv:8008", "foo/bar"))
  416. self.pump()
  417. # Nothing happened yet
  418. self.assertNoResult(test_d)
  419. # Make sure treq is trying to connect
  420. clients = self.reactor.tcpClients
  421. self.assertEqual(len(clients), 1)
  422. (host, port, factory, _timeout, _bindAddress) = clients[0]
  423. self.assertEqual(host, "1.2.3.4")
  424. self.assertEqual(port, 8008)
  425. # complete the connection and wire it up to a fake transport
  426. protocol = factory.buildProtocol(None)
  427. transport = StringTransport()
  428. protocol.makeConnection(transport)
  429. # that should have made it send the request to the transport
  430. self.assertRegex(transport.value(), b"^GET /foo/bar")
  431. self.assertRegex(transport.value(), b"Host: testserv:8008")
  432. # Deferred is still without a result
  433. self.assertNoResult(test_d)
  434. # Send it the HTTP response
  435. protocol.dataReceived(
  436. b"HTTP/1.1 200 OK\r\n"
  437. b"Server: Fake\r\n"
  438. b"Content-Type: application/json\r\n"
  439. b"Content-Length: %i\r\n"
  440. b"\r\n"
  441. b"%s" % (len(return_value), return_value)
  442. )
  443. self.pump()
  444. f = self.failureResultOf(test_d)
  445. self.assertIsInstance(f.value, RequestSendFailed)
  446. def test_too_big(self) -> None:
  447. """
  448. Test what happens if a huge response is returned from the remote endpoint.
  449. """
  450. test_d = defer.ensureDeferred(self.cl.get_json("testserv:8008", "foo/bar"))
  451. self.pump()
  452. # Nothing happened yet
  453. self.assertNoResult(test_d)
  454. # Make sure treq is trying to connect
  455. clients = self.reactor.tcpClients
  456. self.assertEqual(len(clients), 1)
  457. (host, port, factory, _timeout, _bindAddress) = clients[0]
  458. self.assertEqual(host, "1.2.3.4")
  459. self.assertEqual(port, 8008)
  460. # complete the connection and wire it up to a fake transport
  461. protocol = factory.buildProtocol(None)
  462. transport = StringTransport()
  463. protocol.makeConnection(transport)
  464. # that should have made it send the request to the transport
  465. self.assertRegex(transport.value(), b"^GET /foo/bar")
  466. self.assertRegex(transport.value(), b"Host: testserv:8008")
  467. # Deferred is still without a result
  468. self.assertNoResult(test_d)
  469. # Send it a huge HTTP response
  470. protocol.dataReceived(
  471. b"HTTP/1.1 200 OK\r\n"
  472. b"Server: Fake\r\n"
  473. b"Content-Type: application/json\r\n"
  474. b"\r\n"
  475. )
  476. self.pump()
  477. # should still be waiting
  478. self.assertNoResult(test_d)
  479. sent = 0
  480. chunk_size = 1024 * 512
  481. while not test_d.called:
  482. protocol.dataReceived(b"a" * chunk_size)
  483. sent += chunk_size
  484. self.assertLessEqual(sent, ByteParser.MAX_RESPONSE_SIZE)
  485. self.assertEqual(sent, ByteParser.MAX_RESPONSE_SIZE)
  486. f = self.failureResultOf(test_d)
  487. self.assertIsInstance(f.value, RequestSendFailed)
  488. self.assertTrue(transport.disconnecting)
  489. def test_build_auth_headers_rejects_falsey_destinations(self) -> None:
  490. with self.assertRaises(ValueError):
  491. self.cl.build_auth_headers(None, b"GET", b"https://example.com")
  492. with self.assertRaises(ValueError):
  493. self.cl.build_auth_headers(b"", b"GET", b"https://example.com")
  494. with self.assertRaises(ValueError):
  495. self.cl.build_auth_headers(
  496. None, b"GET", b"https://example.com", destination_is=b""
  497. )
  498. with self.assertRaises(ValueError):
  499. self.cl.build_auth_headers(
  500. b"", b"GET", b"https://example.com", destination_is=b""
  501. )