test_fedclient.py 10 KB


  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from mock import Mock
  16. from twisted.internet import defer
  17. from twisted.internet.defer import TimeoutError
  18. from twisted.internet.error import ConnectingCancelledError, DNSLookupError
  19. from twisted.test.proto_helpers import StringTransport
  20. from twisted.web.client import ResponseNeverReceived
  21. from twisted.web.http import HTTPChannel
  22. from synapse.api.errors import RequestSendFailed
  23. from synapse.http.matrixfederationclient import (
  24. MatrixFederationHttpClient,
  25. MatrixFederationRequest,
  26. )
  27. from synapse.util.logcontext import LoggingContext
  28. from tests.server import FakeTransport
  29. from tests.unittest import HomeserverTestCase
  30. def check_logcontext(context):
  31. current = LoggingContext.current_context()
  32. if current is not context:
  33. raise AssertionError(
  34. "Expected logcontext %s but was %s" % (context, current),
  35. )
  36. class FederationClientTests(HomeserverTestCase):
  37. def make_homeserver(self, reactor, clock):
  38. hs = self.setup_test_homeserver(reactor=reactor, clock=clock)
  39. hs.tls_client_options_factory = None
  40. return hs
  41. def prepare(self, reactor, clock, homeserver):
  42. self.cl = MatrixFederationHttpClient(self.hs)
  43. self.reactor.lookups["testserv"] = "1.2.3.4"
  44. def test_client_get(self):
  45. """
  46. happy-path test of a GET request
  47. """
  48. @defer.inlineCallbacks
  49. def do_request():
  50. with LoggingContext("one") as context:
  51. fetch_d = self.cl.get_json("testserv:8008", "foo/bar")
  52. # Nothing happened yet
  53. self.assertNoResult(fetch_d)
  54. # should have reset logcontext to the sentinel
  55. check_logcontext(LoggingContext.sentinel)
  56. try:
  57. fetch_res = yield fetch_d
  58. defer.returnValue(fetch_res)
  59. finally:
  60. check_logcontext(context)
  61. test_d = do_request()
  62. self.pump()
  63. # Nothing happened yet
  64. self.assertNoResult(test_d)
  65. # Make sure treq is trying to connect
  66. clients = self.reactor.tcpClients
  67. self.assertEqual(len(clients), 1)
  68. (host, port, factory, _timeout, _bindAddress) = clients[0]
  69. self.assertEqual(host, '1.2.3.4')
  70. self.assertEqual(port, 8008)
  71. # complete the connection and wire it up to a fake transport
  72. protocol = factory.buildProtocol(None)
  73. transport = StringTransport()
  74. protocol.makeConnection(transport)
  75. # that should have made it send the request to the transport
  76. self.assertRegex(transport.value(), b"^GET /foo/bar")
  77. # Deferred is still without a result
  78. self.assertNoResult(test_d)
  79. # Send it the HTTP response
  80. res_json = '{ "a": 1 }'.encode('ascii')
  81. protocol.dataReceived(
  82. b"HTTP/1.1 200 OK\r\n"
  83. b"Server: Fake\r\n"
  84. b"Content-Type: application/json\r\n"
  85. b"Content-Length: %i\r\n"
  86. b"\r\n"
  87. b"%s" % (len(res_json), res_json)
  88. )
  89. self.pump()
  90. res = self.successResultOf(test_d)
  91. # check the response is as expected
  92. self.assertEqual(res, {"a": 1})
  93. def test_dns_error(self):
  94. """
  95. If the DNS lookup returns an error, it will bubble up.
  96. """
  97. d = self.cl.get_json("testserv2:8008", "foo/bar", timeout=10000)
  98. self.pump()
  99. f = self.failureResultOf(d)
  100. self.assertIsInstance(f.value, RequestSendFailed)
  101. self.assertIsInstance(f.value.inner_exception, DNSLookupError)
  102. def test_client_connection_refused(self):
  103. d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
  104. self.pump()
  105. # Nothing happened yet
  106. self.assertNoResult(d)
  107. clients = self.reactor.tcpClients
  108. self.assertEqual(len(clients), 1)
  109. (host, port, factory, _timeout, _bindAddress) = clients[0]
  110. self.assertEqual(host, '1.2.3.4')
  111. self.assertEqual(port, 8008)
  112. e = Exception("go away")
  113. factory.clientConnectionFailed(None, e)
  114. self.pump(0.5)
  115. f = self.failureResultOf(d)
  116. self.assertIsInstance(f.value, RequestSendFailed)
  117. self.assertIs(f.value.inner_exception, e)
  118. def test_client_never_connect(self):
  119. """
  120. If the HTTP request is not connected and is timed out, it'll give a
  121. ConnectingCancelledError or TimeoutError.
  122. """
  123. d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
  124. self.pump()
  125. # Nothing happened yet
  126. self.assertNoResult(d)
  127. # Make sure treq is trying to connect
  128. clients = self.reactor.tcpClients
  129. self.assertEqual(len(clients), 1)
  130. self.assertEqual(clients[0][0], '1.2.3.4')
  131. self.assertEqual(clients[0][1], 8008)
  132. # Deferred is still without a result
  133. self.assertNoResult(d)
  134. # Push by enough to time it out
  135. self.reactor.advance(10.5)
  136. f = self.failureResultOf(d)
  137. self.assertIsInstance(f.value, RequestSendFailed)
  138. self.assertIsInstance(
  139. f.value.inner_exception,
  140. (ConnectingCancelledError, TimeoutError),
  141. )
  142. def test_client_connect_no_response(self):
  143. """
  144. If the HTTP request is connected, but gets no response before being
  145. timed out, it'll give a ResponseNeverReceived.
  146. """
  147. d = self.cl.get_json("testserv:8008", "foo/bar", timeout=10000)
  148. self.pump()
  149. # Nothing happened yet
  150. self.assertNoResult(d)
  151. # Make sure treq is trying to connect
  152. clients = self.reactor.tcpClients
  153. self.assertEqual(len(clients), 1)
  154. self.assertEqual(clients[0][0], '1.2.3.4')
  155. self.assertEqual(clients[0][1], 8008)
  156. conn = Mock()
  157. client = clients[0][2].buildProtocol(None)
  158. client.makeConnection(conn)
  159. # Deferred is still without a result
  160. self.assertNoResult(d)
  161. # Push by enough to time it out
  162. self.reactor.advance(10.5)
  163. f = self.failureResultOf(d)
  164. self.assertIsInstance(f.value, RequestSendFailed)
  165. self.assertIsInstance(f.value.inner_exception, ResponseNeverReceived)
  166. def test_client_gets_headers(self):
  167. """
  168. Once the client gets the headers, _request returns successfully.
  169. """
  170. request = MatrixFederationRequest(
  171. method="GET",
  172. destination="testserv:8008",
  173. path="foo/bar",
  174. )
  175. d = self.cl._send_request(request, timeout=10000)
  176. self.pump()
  177. conn = Mock()
  178. clients = self.reactor.tcpClients
  179. client = clients[0][2].buildProtocol(None)
  180. client.makeConnection(conn)
  181. # Deferred does not have a result
  182. self.assertNoResult(d)
  183. # Send it the HTTP response
  184. client.dataReceived(b"HTTP/1.1 200 OK\r\nServer: Fake\r\n\r\n")
  185. # We should get a successful response
  186. r = self.successResultOf(d)
  187. self.assertEqual(r.code, 200)
  188. def test_client_headers_no_body(self):
  189. """
  190. If the HTTP request is connected, but gets no response before being
  191. timed out, it'll give a ResponseNeverReceived.
  192. """
  193. d = self.cl.post_json("testserv:8008", "foo/bar", timeout=10000)
  194. self.pump()
  195. conn = Mock()
  196. clients = self.reactor.tcpClients
  197. client = clients[0][2].buildProtocol(None)
  198. client.makeConnection(conn)
  199. # Deferred does not have a result
  200. self.assertNoResult(d)
  201. # Send it the HTTP response
  202. client.dataReceived(
  203. (b"HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n"
  204. b"Server: Fake\r\n\r\n")
  205. )
  206. # Push by enough to time it out
  207. self.reactor.advance(10.5)
  208. f = self.failureResultOf(d)
  209. self.assertIsInstance(f.value, TimeoutError)
  210. def test_client_sends_body(self):
  211. self.cl.post_json(
  212. "testserv:8008", "foo/bar", timeout=10000,
  213. data={"a": "b"}
  214. )
  215. self.pump()
  216. clients = self.reactor.tcpClients
  217. self.assertEqual(len(clients), 1)
  218. client = clients[0][2].buildProtocol(None)
  219. server = HTTPChannel()
  220. client.makeConnection(FakeTransport(server, self.reactor))
  221. server.makeConnection(FakeTransport(client, self.reactor))
  222. self.pump(0.1)
  223. self.assertEqual(len(server.requests), 1)
  224. request = server.requests[0]
  225. content = request.content.read()
  226. self.assertEqual(content, b'{"a":"b"}')
  227. def test_closes_connection(self):
  228. """Check that the client closes unused HTTP connections"""
  229. d = self.cl.get_json("testserv:8008", "foo/bar")
  230. self.pump()
  231. # there should have been a call to connectTCP
  232. clients = self.reactor.tcpClients
  233. self.assertEqual(len(clients), 1)
  234. (_host, _port, factory, _timeout, _bindAddress) = clients[0]
  235. # complete the connection and wire it up to a fake transport
  236. client = factory.buildProtocol(None)
  237. conn = StringTransport()
  238. client.makeConnection(conn)
  239. # that should have made it send the request to the connection
  240. self.assertRegex(conn.value(), b"^GET /foo/bar")
  241. # Send the HTTP response
  242. client.dataReceived(
  243. b"HTTP/1.1 200 OK\r\n"
  244. b"Content-Type: application/json\r\n"
  245. b"Content-Length: 2\r\n"
  246. b"\r\n"
  247. b"{}"
  248. )
  249. # We should get a successful response
  250. r = self.successResultOf(d)
  251. self.assertEqual(r, {})
  252. self.assertFalse(conn.disconnecting)
  253. # wait for a while
  254. self.pump(120)
  255. self.assertTrue(conn.disconnecting)