matrixfederationclient.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import cgi
  17. import logging
  18. import random
  19. import sys
  20. from io import BytesIO
  21. from six import PY3, raise_from, string_types
  22. from six.moves import urllib
  23. import attr
  24. import treq
  25. from canonicaljson import encode_canonical_json
  26. from prometheus_client import Counter
  27. from signedjson.sign import sign_json
  28. from twisted.internet import defer, protocol
  29. from twisted.internet.error import DNSLookupError
  30. from twisted.internet.task import _EPSILON, Cooperator
  31. from twisted.web._newclient import ResponseDone
  32. from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
  33. from twisted.web.http_headers import Headers
  34. import synapse.metrics
  35. import synapse.util.retryutils
  36. from synapse.api.errors import (
  37. Codes,
  38. FederationDeniedError,
  39. HttpResponseException,
  40. RequestSendFailed,
  41. SynapseError,
  42. )
  43. from synapse.http.endpoint import matrix_federation_endpoint
  44. from synapse.util.async_helpers import timeout_deferred
  45. from synapse.util.logcontext import make_deferred_yieldable
  46. from synapse.util.metrics import Measure
  47. logger = logging.getLogger(__name__)
  48. outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
  49. "", ["method"])
  50. incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
  51. "", ["method", "code"])
  52. MAX_LONG_RETRIES = 10
  53. MAX_SHORT_RETRIES = 3
  54. if PY3:
  55. MAXINT = sys.maxsize
  56. else:
  57. MAXINT = sys.maxint
  58. class MatrixFederationEndpointFactory(object):
  59. def __init__(self, hs):
  60. self.reactor = hs.get_reactor()
  61. self.tls_client_options_factory = hs.tls_client_options_factory
  62. def endpointForURI(self, uri):
  63. destination = uri.netloc.decode('ascii')
  64. return matrix_federation_endpoint(
  65. self.reactor, destination, timeout=10,
  66. tls_client_options_factory=self.tls_client_options_factory
  67. )
  68. _next_id = 1
  69. @attr.s
  70. class MatrixFederationRequest(object):
  71. method = attr.ib()
  72. """HTTP method
  73. :type: str
  74. """
  75. path = attr.ib()
  76. """HTTP path
  77. :type: str
  78. """
  79. destination = attr.ib()
  80. """The remote server to send the HTTP request to.
  81. :type: str"""
  82. json = attr.ib(default=None)
  83. """JSON to send in the body.
  84. :type: dict|None
  85. """
  86. json_callback = attr.ib(default=None)
  87. """A callback to generate the JSON.
  88. :type: func|None
  89. """
  90. query = attr.ib(default=None)
  91. """Query arguments.
  92. :type: dict|None
  93. """
  94. txn_id = attr.ib(default=None)
  95. """Unique ID for this request (for logging)
  96. :type: str|None
  97. """
  98. def __attrs_post_init__(self):
  99. global _next_id
  100. self.txn_id = "%s-O-%s" % (self.method, _next_id)
  101. _next_id = (_next_id + 1) % (MAXINT - 1)
  102. def get_json(self):
  103. if self.json_callback:
  104. return self.json_callback()
  105. return self.json
  106. @defer.inlineCallbacks
  107. def _handle_json_response(reactor, timeout_sec, request, response):
  108. """
  109. Reads the JSON body of a response, with a timeout
  110. Args:
  111. reactor (IReactor): twisted reactor, for the timeout
  112. timeout_sec (float): number of seconds to wait for response to complete
  113. request (MatrixFederationRequest): the request that triggered the response
  114. response (IResponse): response to the request
  115. Returns:
  116. dict: parsed JSON response
  117. """
  118. try:
  119. check_content_type_is_json(response.headers)
  120. d = treq.json_content(response)
  121. d = timeout_deferred(
  122. d,
  123. timeout=timeout_sec,
  124. reactor=reactor,
  125. )
  126. body = yield make_deferred_yieldable(d)
  127. except Exception as e:
  128. logger.warn(
  129. "{%s} [%s] Error reading response: %s",
  130. request.txn_id,
  131. request.destination,
  132. e,
  133. )
  134. raise
  135. logger.info(
  136. "{%s} [%s] Completed: %d %s",
  137. request.txn_id,
  138. request.destination,
  139. response.code,
  140. response.phrase.decode('ascii', errors='replace'),
  141. )
  142. defer.returnValue(body)
  143. class MatrixFederationHttpClient(object):
  144. """HTTP client used to talk to other homeservers over the federation
  145. protocol. Send client certificates and signs requests.
  146. Attributes:
  147. agent (twisted.web.client.Agent): The twisted Agent used to send the
  148. requests.
  149. """
  150. def __init__(self, hs):
  151. self.hs = hs
  152. self.signing_key = hs.config.signing_key[0]
  153. self.server_name = hs.hostname
  154. reactor = hs.get_reactor()
  155. pool = HTTPConnectionPool(reactor)
  156. pool.retryAutomatically = False
  157. pool.maxPersistentPerHost = 5
  158. pool.cachedConnectionTimeout = 2 * 60
  159. self.agent = Agent.usingEndpointFactory(
  160. reactor, MatrixFederationEndpointFactory(hs), pool=pool
  161. )
  162. self.clock = hs.get_clock()
  163. self._store = hs.get_datastore()
  164. self.version_string_bytes = hs.version_string.encode('ascii')
  165. self.default_timeout = 60
  166. def schedule(x):
  167. reactor.callLater(_EPSILON, x)
  168. self._cooperator = Cooperator(scheduler=schedule)
  169. @defer.inlineCallbacks
  170. def _send_request(
  171. self,
  172. request,
  173. retry_on_dns_fail=True,
  174. timeout=None,
  175. long_retries=False,
  176. ignore_backoff=False,
  177. backoff_on_404=False
  178. ):
  179. """
  180. Sends a request to the given server.
  181. Args:
  182. request (MatrixFederationRequest): details of request to be sent
  183. timeout (int|None): number of milliseconds to wait for the response headers
  184. (including connecting to the server). 60s by default.
  185. ignore_backoff (bool): true to ignore the historical backoff data
  186. and try the request anyway.
  187. backoff_on_404 (bool): Back off if we get a 404
  188. Returns:
  189. Deferred[twisted.web.client.Response]: resolves with the HTTP
  190. response object on success.
  191. Raises:
  192. HttpResponseException: If we get an HTTP response code >= 300
  193. (except 429).
  194. NotRetryingDestination: If we are not yet ready to retry this
  195. server.
  196. FederationDeniedError: If this destination is not on our
  197. federation whitelist
  198. RequestSendFailed: If there were problems connecting to the
  199. remote, due to e.g. DNS failures, connection timeouts etc.
  200. """
  201. if timeout:
  202. _sec_timeout = timeout / 1000
  203. else:
  204. _sec_timeout = self.default_timeout
  205. if (
  206. self.hs.config.federation_domain_whitelist is not None and
  207. request.destination not in self.hs.config.federation_domain_whitelist
  208. ):
  209. raise FederationDeniedError(request.destination)
  210. limiter = yield synapse.util.retryutils.get_retry_limiter(
  211. request.destination,
  212. self.clock,
  213. self._store,
  214. backoff_on_404=backoff_on_404,
  215. ignore_backoff=ignore_backoff,
  216. )
  217. method_bytes = request.method.encode("ascii")
  218. destination_bytes = request.destination.encode("ascii")
  219. path_bytes = request.path.encode("ascii")
  220. if request.query:
  221. query_bytes = encode_query_args(request.query)
  222. else:
  223. query_bytes = b""
  224. headers_dict = {
  225. b"User-Agent": [self.version_string_bytes],
  226. b"Host": [destination_bytes],
  227. }
  228. with limiter:
  229. # XXX: Would be much nicer to retry only at the transaction-layer
  230. # (once we have reliable transactions in place)
  231. if long_retries:
  232. retries_left = MAX_LONG_RETRIES
  233. else:
  234. retries_left = MAX_SHORT_RETRIES
  235. url_bytes = urllib.parse.urlunparse((
  236. b"matrix", destination_bytes,
  237. path_bytes, None, query_bytes, b"",
  238. ))
  239. url_str = url_bytes.decode('ascii')
  240. url_to_sign_bytes = urllib.parse.urlunparse((
  241. b"", b"",
  242. path_bytes, None, query_bytes, b"",
  243. ))
  244. while True:
  245. try:
  246. json = request.get_json()
  247. if json:
  248. headers_dict[b"Content-Type"] = [b"application/json"]
  249. auth_headers = self.build_auth_headers(
  250. destination_bytes, method_bytes, url_to_sign_bytes,
  251. json,
  252. )
  253. data = encode_canonical_json(json)
  254. producer = FileBodyProducer(
  255. BytesIO(data),
  256. cooperator=self._cooperator,
  257. )
  258. else:
  259. producer = None
  260. auth_headers = self.build_auth_headers(
  261. destination_bytes, method_bytes, url_to_sign_bytes,
  262. )
  263. headers_dict[b"Authorization"] = auth_headers
  264. logger.info(
  265. "{%s} [%s] Sending request: %s %s",
  266. request.txn_id, request.destination, request.method,
  267. url_str,
  268. )
  269. try:
  270. with Measure(self.clock, "outbound_request"):
  271. # we don't want all the fancy cookie and redirect handling
  272. # that treq.request gives: just use the raw Agent.
  273. request_deferred = self.agent.request(
  274. method_bytes,
  275. url_bytes,
  276. headers=Headers(headers_dict),
  277. bodyProducer=producer,
  278. )
  279. request_deferred = timeout_deferred(
  280. request_deferred,
  281. timeout=_sec_timeout,
  282. reactor=self.hs.get_reactor(),
  283. )
  284. response = yield make_deferred_yieldable(
  285. request_deferred,
  286. )
  287. except DNSLookupError as e:
  288. raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
  289. except Exception as e:
  290. raise_from(RequestSendFailed(e, can_retry=True), e)
  291. logger.info(
  292. "{%s} [%s] Got response headers: %d %s",
  293. request.txn_id,
  294. request.destination,
  295. response.code,
  296. response.phrase.decode('ascii', errors='replace'),
  297. )
  298. if 200 <= response.code < 300:
  299. pass
  300. else:
  301. # :'(
  302. # Update transactions table?
  303. d = treq.content(response)
  304. d = timeout_deferred(
  305. d,
  306. timeout=_sec_timeout,
  307. reactor=self.hs.get_reactor(),
  308. )
  309. try:
  310. body = yield make_deferred_yieldable(d)
  311. except Exception as e:
  312. # Eh, we're already going to raise an exception so lets
  313. # ignore if this fails.
  314. logger.warn(
  315. "{%s} [%s] Failed to get error response: %s %s: %s",
  316. request.txn_id,
  317. request.destination,
  318. request.method,
  319. url_str,
  320. _flatten_response_never_received(e),
  321. )
  322. body = None
  323. e = HttpResponseException(
  324. response.code, response.phrase, body
  325. )
  326. # Retry if the error is a 429 (Too Many Requests),
  327. # otherwise just raise a standard HttpResponseException
  328. if response.code == 429:
  329. raise_from(RequestSendFailed(e, can_retry=True), e)
  330. else:
  331. raise e
  332. break
  333. except RequestSendFailed as e:
  334. logger.warn(
  335. "{%s} [%s] Request failed: %s %s: %s",
  336. request.txn_id,
  337. request.destination,
  338. request.method,
  339. url_str,
  340. _flatten_response_never_received(e.inner_exception),
  341. )
  342. if not e.can_retry:
  343. raise
  344. if retries_left and not timeout:
  345. if long_retries:
  346. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  347. delay = min(delay, 60)
  348. delay *= random.uniform(0.8, 1.4)
  349. else:
  350. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  351. delay = min(delay, 2)
  352. delay *= random.uniform(0.8, 1.4)
  353. logger.debug(
  354. "{%s} [%s] Waiting %ss before re-sending...",
  355. request.txn_id,
  356. request.destination,
  357. delay,
  358. )
  359. yield self.clock.sleep(delay)
  360. retries_left -= 1
  361. else:
  362. raise
  363. except Exception as e:
  364. logger.warn(
  365. "{%s} [%s] Request failed: %s %s: %s",
  366. request.txn_id,
  367. request.destination,
  368. request.method,
  369. url_str,
  370. _flatten_response_never_received(e),
  371. )
  372. raise
  373. defer.returnValue(response)
  374. def build_auth_headers(
  375. self, destination, method, url_bytes, content=None, destination_is=None,
  376. ):
  377. """
  378. Builds the Authorization headers for a federation request
  379. Args:
  380. destination (bytes|None): The desination home server of the request.
  381. May be None if the destination is an identity server, in which case
  382. destination_is must be non-None.
  383. method (bytes): The HTTP method of the request
  384. url_bytes (bytes): The URI path of the request
  385. content (object): The body of the request
  386. destination_is (bytes): As 'destination', but if the destination is an
  387. identity server
  388. Returns:
  389. list[bytes]: a list of headers to be added as "Authorization:" headers
  390. """
  391. request = {
  392. "method": method,
  393. "uri": url_bytes,
  394. "origin": self.server_name,
  395. }
  396. if destination is not None:
  397. request["destination"] = destination
  398. if destination_is is not None:
  399. request["destination_is"] = destination_is
  400. if content is not None:
  401. request["content"] = content
  402. request = sign_json(request, self.server_name, self.signing_key)
  403. auth_headers = []
  404. for key, sig in request["signatures"][self.server_name].items():
  405. auth_headers.append((
  406. "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
  407. self.server_name, key, sig,
  408. )).encode('ascii')
  409. )
  410. return auth_headers
  411. @defer.inlineCallbacks
  412. def put_json(self, destination, path, args={}, data={},
  413. json_data_callback=None,
  414. long_retries=False, timeout=None,
  415. ignore_backoff=False,
  416. backoff_on_404=False):
  417. """ Sends the specifed json data using PUT
  418. Args:
  419. destination (str): The remote server to send the HTTP request
  420. to.
  421. path (str): The HTTP path.
  422. args (dict): query params
  423. data (dict): A dict containing the data that will be used as
  424. the request body. This will be encoded as JSON.
  425. json_data_callback (callable): A callable returning the dict to
  426. use as the request body.
  427. long_retries (bool): A boolean that indicates whether we should
  428. retry for a short or long time.
  429. timeout(int): How long to try (in ms) the destination for before
  430. giving up. None indicates no timeout.
  431. ignore_backoff (bool): true to ignore the historical backoff data
  432. and try the request anyway.
  433. backoff_on_404 (bool): True if we should count a 404 response as
  434. a failure of the server (and should therefore back off future
  435. requests)
  436. Returns:
  437. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  438. result will be the decoded JSON body.
  439. Raises:
  440. HttpResponseException: If we get an HTTP response code >= 300
  441. (except 429).
  442. NotRetryingDestination: If we are not yet ready to retry this
  443. server.
  444. FederationDeniedError: If this destination is not on our
  445. federation whitelist
  446. RequestSendFailed: If there were problems connecting to the
  447. remote, due to e.g. DNS failures, connection timeouts etc.
  448. """
  449. request = MatrixFederationRequest(
  450. method="PUT",
  451. destination=destination,
  452. path=path,
  453. query=args,
  454. json_callback=json_data_callback,
  455. json=data,
  456. )
  457. response = yield self._send_request(
  458. request,
  459. long_retries=long_retries,
  460. timeout=timeout,
  461. ignore_backoff=ignore_backoff,
  462. backoff_on_404=backoff_on_404,
  463. )
  464. body = yield _handle_json_response(
  465. self.hs.get_reactor(), self.default_timeout, request, response,
  466. )
  467. defer.returnValue(body)
  468. @defer.inlineCallbacks
  469. def post_json(self, destination, path, data={}, long_retries=False,
  470. timeout=None, ignore_backoff=False, args={}):
  471. """ Sends the specifed json data using POST
  472. Args:
  473. destination (str): The remote server to send the HTTP request
  474. to.
  475. path (str): The HTTP path.
  476. data (dict): A dict containing the data that will be used as
  477. the request body. This will be encoded as JSON.
  478. long_retries (bool): A boolean that indicates whether we should
  479. retry for a short or long time.
  480. timeout(int): How long to try (in ms) the destination for before
  481. giving up. None indicates no timeout.
  482. ignore_backoff (bool): true to ignore the historical backoff data and
  483. try the request anyway.
  484. args (dict): query params
  485. Returns:
  486. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  487. result will be the decoded JSON body.
  488. Raises:
  489. HttpResponseException: If we get an HTTP response code >= 300
  490. (except 429).
  491. NotRetryingDestination: If we are not yet ready to retry this
  492. server.
  493. FederationDeniedError: If this destination is not on our
  494. federation whitelist
  495. RequestSendFailed: If there were problems connecting to the
  496. remote, due to e.g. DNS failures, connection timeouts etc.
  497. """
  498. request = MatrixFederationRequest(
  499. method="POST",
  500. destination=destination,
  501. path=path,
  502. query=args,
  503. json=data,
  504. )
  505. response = yield self._send_request(
  506. request,
  507. long_retries=long_retries,
  508. timeout=timeout,
  509. ignore_backoff=ignore_backoff,
  510. )
  511. if timeout:
  512. _sec_timeout = timeout / 1000
  513. else:
  514. _sec_timeout = self.default_timeout
  515. body = yield _handle_json_response(
  516. self.hs.get_reactor(), _sec_timeout, request, response,
  517. )
  518. defer.returnValue(body)
  519. @defer.inlineCallbacks
  520. def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
  521. timeout=None, ignore_backoff=False):
  522. """ GETs some json from the given host homeserver and path
  523. Args:
  524. destination (str): The remote server to send the HTTP request
  525. to.
  526. path (str): The HTTP path.
  527. args (dict|None): A dictionary used to create query strings, defaults to
  528. None.
  529. timeout (int): How long to try (in ms) the destination for before
  530. giving up. None indicates no timeout and that the request will
  531. be retried.
  532. ignore_backoff (bool): true to ignore the historical backoff data
  533. and try the request anyway.
  534. Returns:
  535. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  536. result will be the decoded JSON body.
  537. Raises:
  538. HttpResponseException: If we get an HTTP response code >= 300
  539. (except 429).
  540. NotRetryingDestination: If we are not yet ready to retry this
  541. server.
  542. FederationDeniedError: If this destination is not on our
  543. federation whitelist
  544. RequestSendFailed: If there were problems connecting to the
  545. remote, due to e.g. DNS failures, connection timeouts etc.
  546. """
  547. logger.debug("get_json args: %s", args)
  548. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  549. request = MatrixFederationRequest(
  550. method="GET",
  551. destination=destination,
  552. path=path,
  553. query=args,
  554. )
  555. response = yield self._send_request(
  556. request,
  557. retry_on_dns_fail=retry_on_dns_fail,
  558. timeout=timeout,
  559. ignore_backoff=ignore_backoff,
  560. )
  561. body = yield _handle_json_response(
  562. self.hs.get_reactor(), self.default_timeout, request, response,
  563. )
  564. defer.returnValue(body)
  565. @defer.inlineCallbacks
  566. def delete_json(self, destination, path, long_retries=False,
  567. timeout=None, ignore_backoff=False, args={}):
  568. """Send a DELETE request to the remote expecting some json response
  569. Args:
  570. destination (str): The remote server to send the HTTP request
  571. to.
  572. path (str): The HTTP path.
  573. long_retries (bool): A boolean that indicates whether we should
  574. retry for a short or long time.
  575. timeout(int): How long to try (in ms) the destination for before
  576. giving up. None indicates no timeout.
  577. ignore_backoff (bool): true to ignore the historical backoff data and
  578. try the request anyway.
  579. Returns:
  580. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  581. result will be the decoded JSON body.
  582. Raises:
  583. HttpResponseException: If we get an HTTP response code >= 300
  584. (except 429).
  585. NotRetryingDestination: If we are not yet ready to retry this
  586. server.
  587. FederationDeniedError: If this destination is not on our
  588. federation whitelist
  589. RequestSendFailed: If there were problems connecting to the
  590. remote, due to e.g. DNS failures, connection timeouts etc.
  591. """
  592. request = MatrixFederationRequest(
  593. method="DELETE",
  594. destination=destination,
  595. path=path,
  596. query=args,
  597. )
  598. response = yield self._send_request(
  599. request,
  600. long_retries=long_retries,
  601. timeout=timeout,
  602. ignore_backoff=ignore_backoff,
  603. )
  604. body = yield _handle_json_response(
  605. self.hs.get_reactor(), self.default_timeout, request, response,
  606. )
  607. defer.returnValue(body)
  608. @defer.inlineCallbacks
  609. def get_file(self, destination, path, output_stream, args={},
  610. retry_on_dns_fail=True, max_size=None,
  611. ignore_backoff=False):
  612. """GETs a file from a given homeserver
  613. Args:
  614. destination (str): The remote server to send the HTTP request to.
  615. path (str): The HTTP path to GET.
  616. output_stream (file): File to write the response body to.
  617. args (dict): Optional dictionary used to create the query string.
  618. ignore_backoff (bool): true to ignore the historical backoff data
  619. and try the request anyway.
  620. Returns:
  621. Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of
  622. the file length and a dict of the response headers.
  623. Raises:
  624. HttpResponseException: If we get an HTTP response code >= 300
  625. (except 429).
  626. NotRetryingDestination: If we are not yet ready to retry this
  627. server.
  628. FederationDeniedError: If this destination is not on our
  629. federation whitelist
  630. RequestSendFailed: If there were problems connecting to the
  631. remote, due to e.g. DNS failures, connection timeouts etc.
  632. """
  633. request = MatrixFederationRequest(
  634. method="GET",
  635. destination=destination,
  636. path=path,
  637. query=args,
  638. )
  639. response = yield self._send_request(
  640. request,
  641. retry_on_dns_fail=retry_on_dns_fail,
  642. ignore_backoff=ignore_backoff,
  643. )
  644. headers = dict(response.headers.getAllRawHeaders())
  645. try:
  646. d = _readBodyToFile(response, output_stream, max_size)
  647. d.addTimeout(self.default_timeout, self.hs.get_reactor())
  648. length = yield make_deferred_yieldable(d)
  649. except Exception as e:
  650. logger.warn(
  651. "{%s} [%s] Error reading response: %s",
  652. request.txn_id,
  653. request.destination,
  654. e,
  655. )
  656. raise
  657. logger.info(
  658. "{%s} [%s] Completed: %d %s [%d bytes]",
  659. request.txn_id,
  660. request.destination,
  661. response.code,
  662. response.phrase.decode('ascii', errors='replace'),
  663. length,
  664. )
  665. defer.returnValue((length, headers))
  666. class _ReadBodyToFileProtocol(protocol.Protocol):
  667. def __init__(self, stream, deferred, max_size):
  668. self.stream = stream
  669. self.deferred = deferred
  670. self.length = 0
  671. self.max_size = max_size
  672. def dataReceived(self, data):
  673. self.stream.write(data)
  674. self.length += len(data)
  675. if self.max_size is not None and self.length >= self.max_size:
  676. self.deferred.errback(SynapseError(
  677. 502,
  678. "Requested file is too large > %r bytes" % (self.max_size,),
  679. Codes.TOO_LARGE,
  680. ))
  681. self.deferred = defer.Deferred()
  682. self.transport.loseConnection()
  683. def connectionLost(self, reason):
  684. if reason.check(ResponseDone):
  685. self.deferred.callback(self.length)
  686. else:
  687. self.deferred.errback(reason)
  688. def _readBodyToFile(response, stream, max_size):
  689. d = defer.Deferred()
  690. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  691. return d
  692. def _flatten_response_never_received(e):
  693. if hasattr(e, "reasons"):
  694. reasons = ", ".join(
  695. _flatten_response_never_received(f.value)
  696. for f in e.reasons
  697. )
  698. return "%s:[%s]" % (type(e).__name__, reasons)
  699. else:
  700. return repr(e)
  701. def check_content_type_is_json(headers):
  702. """
  703. Check that a set of HTTP headers have a Content-Type header, and that it
  704. is application/json.
  705. Args:
  706. headers (twisted.web.http_headers.Headers): headers to check
  707. Raises:
  708. RequestSendFailed: if the Content-Type header is missing or isn't JSON
  709. """
  710. c_type = headers.getRawHeaders(b"Content-Type")
  711. if c_type is None:
  712. raise RequestSendFailed(RuntimeError(
  713. "No Content-Type header"
  714. ), can_retry=False)
  715. c_type = c_type[0].decode('ascii') # only the first header
  716. val, options = cgi.parse_header(c_type)
  717. if val != "application/json":
  718. raise RequestSendFailed(RuntimeError(
  719. "Content-Type not application/json: was '%s'" % c_type
  720. ), can_retry=False)
  721. def encode_query_args(args):
  722. if args is None:
  723. return b""
  724. encoded_args = {}
  725. for k, vs in args.items():
  726. if isinstance(vs, string_types):
  727. vs = [vs]
  728. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  729. query_bytes = urllib.parse.urlencode(encoded_args, True)
  730. return query_bytes.encode('utf8')