matrixfederationclient.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import cgi
  17. import logging
  18. import random
  19. import sys
  20. from io import BytesIO
  21. from six import PY3, raise_from, string_types
  22. from six.moves import urllib
  23. import attr
  24. import treq
  25. from canonicaljson import encode_canonical_json
  26. from prometheus_client import Counter
  27. from signedjson.sign import sign_json
  28. from twisted.internet import defer, protocol
  29. from twisted.internet.error import DNSLookupError
  30. from twisted.internet.task import _EPSILON, Cooperator
  31. from twisted.web._newclient import ResponseDone
  32. from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
  33. from twisted.web.http_headers import Headers
  34. import synapse.metrics
  35. import synapse.util.retryutils
  36. from synapse.api.errors import (
  37. Codes,
  38. FederationDeniedError,
  39. HttpResponseException,
  40. RequestSendFailed,
  41. SynapseError,
  42. )
  43. from synapse.http.endpoint import matrix_federation_endpoint
  44. from synapse.util.async_helpers import timeout_deferred
  45. from synapse.util.logcontext import make_deferred_yieldable
  46. from synapse.util.metrics import Measure
  47. logger = logging.getLogger(__name__)
  48. outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
  49. "", ["method"])
  50. incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
  51. "", ["method", "code"])
  52. MAX_LONG_RETRIES = 10
  53. MAX_SHORT_RETRIES = 3
  54. if PY3:
  55. MAXINT = sys.maxsize
  56. else:
  57. MAXINT = sys.maxint
  58. class MatrixFederationEndpointFactory(object):
  59. def __init__(self, hs):
  60. self.reactor = hs.get_reactor()
  61. self.tls_client_options_factory = hs.tls_client_options_factory
  62. def endpointForURI(self, uri):
  63. destination = uri.netloc.decode('ascii')
  64. return matrix_federation_endpoint(
  65. self.reactor, destination, timeout=10,
  66. tls_client_options_factory=self.tls_client_options_factory
  67. )
  68. _next_id = 1
  69. @attr.s
  70. class MatrixFederationRequest(object):
  71. method = attr.ib()
  72. """HTTP method
  73. :type: str
  74. """
  75. path = attr.ib()
  76. """HTTP path
  77. :type: str
  78. """
  79. destination = attr.ib()
  80. """The remote server to send the HTTP request to.
  81. :type: str"""
  82. json = attr.ib(default=None)
  83. """JSON to send in the body.
  84. :type: dict|None
  85. """
  86. json_callback = attr.ib(default=None)
  87. """A callback to generate the JSON.
  88. :type: func|None
  89. """
  90. query = attr.ib(default=None)
  91. """Query arguments.
  92. :type: dict|None
  93. """
  94. txn_id = attr.ib(default=None)
  95. """Unique ID for this request (for logging)
  96. :type: str|None
  97. """
  98. def __attrs_post_init__(self):
  99. global _next_id
  100. self.txn_id = "%s-O-%s" % (self.method, _next_id)
  101. _next_id = (_next_id + 1) % (MAXINT - 1)
  102. def get_json(self):
  103. if self.json_callback:
  104. return self.json_callback()
  105. return self.json
  106. @defer.inlineCallbacks
  107. def _handle_json_response(reactor, timeout_sec, request, response):
  108. """
  109. Reads the JSON body of a response, with a timeout
  110. Args:
  111. reactor (IReactor): twisted reactor, for the timeout
  112. timeout_sec (float): number of seconds to wait for response to complete
  113. request (MatrixFederationRequest): the request that triggered the response
  114. response (IResponse): response to the request
  115. Returns:
  116. dict: parsed JSON response
  117. """
  118. try:
  119. check_content_type_is_json(response.headers)
  120. d = treq.json_content(response)
  121. d = timeout_deferred(
  122. d,
  123. timeout=timeout_sec,
  124. reactor=reactor,
  125. )
  126. body = yield make_deferred_yieldable(d)
  127. except Exception as e:
  128. logger.warn(
  129. "{%s} [%s] Error reading response: %s",
  130. request.txn_id,
  131. request.destination,
  132. e,
  133. )
  134. raise
  135. logger.info(
  136. "{%s} [%s] Completed: %d %s",
  137. request.txn_id,
  138. request.destination,
  139. response.code,
  140. response.phrase.decode('ascii', errors='replace'),
  141. )
  142. defer.returnValue(body)
  143. class MatrixFederationHttpClient(object):
  144. """HTTP client used to talk to other homeservers over the federation
  145. protocol. Send client certificates and signs requests.
  146. Attributes:
  147. agent (twisted.web.client.Agent): The twisted Agent used to send the
  148. requests.
  149. """
  150. def __init__(self, hs):
  151. self.hs = hs
  152. self.signing_key = hs.config.signing_key[0]
  153. self.server_name = hs.hostname
  154. reactor = hs.get_reactor()
  155. pool = HTTPConnectionPool(reactor)
  156. pool.retryAutomatically = False
  157. pool.maxPersistentPerHost = 5
  158. pool.cachedConnectionTimeout = 2 * 60
  159. self.agent = Agent.usingEndpointFactory(
  160. reactor, MatrixFederationEndpointFactory(hs), pool=pool
  161. )
  162. self.clock = hs.get_clock()
  163. self._store = hs.get_datastore()
  164. self.version_string_bytes = hs.version_string.encode('ascii')
  165. self.default_timeout = 60
  166. def schedule(x):
  167. reactor.callLater(_EPSILON, x)
  168. self._cooperator = Cooperator(scheduler=schedule)
  169. @defer.inlineCallbacks
  170. def _send_request(
  171. self,
  172. request,
  173. retry_on_dns_fail=True,
  174. timeout=None,
  175. long_retries=False,
  176. ignore_backoff=False,
  177. backoff_on_404=False
  178. ):
  179. """
  180. Sends a request to the given server.
  181. Args:
  182. request (MatrixFederationRequest): details of request to be sent
  183. timeout (int|None): number of milliseconds to wait for the response headers
  184. (including connecting to the server). 60s by default.
  185. ignore_backoff (bool): true to ignore the historical backoff data
  186. and try the request anyway.
  187. backoff_on_404 (bool): Back off if we get a 404
  188. Returns:
  189. Deferred: resolves with the http response object on success.
  190. Fails with ``HttpResponseException``: if we get an HTTP response
  191. code >= 300 (except 429).
  192. Fails with ``NotRetryingDestination`` if we are not yet ready
  193. to retry this server.
  194. Fails with ``FederationDeniedError`` if this destination
  195. is not on our federation whitelist
  196. Fails with ``RequestSendFailed`` if there were problems connecting to
  197. the remote, due to e.g. DNS failures, connection timeouts etc.
  198. """
  199. if timeout:
  200. _sec_timeout = timeout / 1000
  201. else:
  202. _sec_timeout = self.default_timeout
  203. if (
  204. self.hs.config.federation_domain_whitelist is not None and
  205. request.destination not in self.hs.config.federation_domain_whitelist
  206. ):
  207. raise FederationDeniedError(request.destination)
  208. limiter = yield synapse.util.retryutils.get_retry_limiter(
  209. request.destination,
  210. self.clock,
  211. self._store,
  212. backoff_on_404=backoff_on_404,
  213. ignore_backoff=ignore_backoff,
  214. )
  215. method_bytes = request.method.encode("ascii")
  216. destination_bytes = request.destination.encode("ascii")
  217. path_bytes = request.path.encode("ascii")
  218. if request.query:
  219. query_bytes = encode_query_args(request.query)
  220. else:
  221. query_bytes = b""
  222. headers_dict = {
  223. b"User-Agent": [self.version_string_bytes],
  224. b"Host": [destination_bytes],
  225. }
  226. with limiter:
  227. # XXX: Would be much nicer to retry only at the transaction-layer
  228. # (once we have reliable transactions in place)
  229. if long_retries:
  230. retries_left = MAX_LONG_RETRIES
  231. else:
  232. retries_left = MAX_SHORT_RETRIES
  233. url_bytes = urllib.parse.urlunparse((
  234. b"matrix", destination_bytes,
  235. path_bytes, None, query_bytes, b"",
  236. ))
  237. url_str = url_bytes.decode('ascii')
  238. url_to_sign_bytes = urllib.parse.urlunparse((
  239. b"", b"",
  240. path_bytes, None, query_bytes, b"",
  241. ))
  242. while True:
  243. try:
  244. json = request.get_json()
  245. if json:
  246. headers_dict[b"Content-Type"] = [b"application/json"]
  247. self.sign_request(
  248. destination_bytes, method_bytes, url_to_sign_bytes,
  249. headers_dict, json,
  250. )
  251. data = encode_canonical_json(json)
  252. producer = FileBodyProducer(
  253. BytesIO(data),
  254. cooperator=self._cooperator,
  255. )
  256. else:
  257. producer = None
  258. self.sign_request(
  259. destination_bytes, method_bytes, url_to_sign_bytes,
  260. headers_dict,
  261. )
  262. logger.info(
  263. "{%s} [%s] Sending request: %s %s",
  264. request.txn_id, request.destination, request.method,
  265. url_str,
  266. )
  267. # we don't want all the fancy cookie and redirect handling that
  268. # treq.request gives: just use the raw Agent.
  269. request_deferred = self.agent.request(
  270. method_bytes,
  271. url_bytes,
  272. headers=Headers(headers_dict),
  273. bodyProducer=producer,
  274. )
  275. request_deferred = timeout_deferred(
  276. request_deferred,
  277. timeout=_sec_timeout,
  278. reactor=self.hs.get_reactor(),
  279. )
  280. try:
  281. with Measure(self.clock, "outbound_request"):
  282. response = yield make_deferred_yieldable(
  283. request_deferred,
  284. )
  285. except DNSLookupError as e:
  286. raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
  287. except Exception as e:
  288. raise_from(RequestSendFailed(e, can_retry=True), e)
  289. logger.info(
  290. "{%s} [%s] Got response headers: %d %s",
  291. request.txn_id,
  292. request.destination,
  293. response.code,
  294. response.phrase.decode('ascii', errors='replace'),
  295. )
  296. if 200 <= response.code < 300:
  297. pass
  298. else:
  299. # :'(
  300. # Update transactions table?
  301. d = treq.content(response)
  302. d = timeout_deferred(
  303. d,
  304. timeout=_sec_timeout,
  305. reactor=self.hs.get_reactor(),
  306. )
  307. try:
  308. body = yield make_deferred_yieldable(d)
  309. except Exception as e:
  310. # Eh, we're already going to raise an exception so lets
  311. # ignore if this fails.
  312. logger.warn(
  313. "{%s} [%s] Failed to get error response: %s %s: %s",
  314. request.txn_id,
  315. request.destination,
  316. request.method,
  317. url_str,
  318. _flatten_response_never_received(e),
  319. )
  320. body = None
  321. e = HttpResponseException(
  322. response.code, response.phrase, body
  323. )
  324. # Retry if the error is a 429 (Too Many Requests),
  325. # otherwise just raise a standard HttpResponseException
  326. if response.code == 429:
  327. raise_from(RequestSendFailed(e, can_retry=True), e)
  328. else:
  329. raise e
  330. break
  331. except RequestSendFailed as e:
  332. logger.warn(
  333. "{%s} [%s] Request failed: %s %s: %s",
  334. request.txn_id,
  335. request.destination,
  336. request.method,
  337. url_str,
  338. _flatten_response_never_received(e.inner_exception),
  339. )
  340. if not e.can_retry:
  341. raise
  342. if retries_left and not timeout:
  343. if long_retries:
  344. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  345. delay = min(delay, 60)
  346. delay *= random.uniform(0.8, 1.4)
  347. else:
  348. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  349. delay = min(delay, 2)
  350. delay *= random.uniform(0.8, 1.4)
  351. logger.debug(
  352. "{%s} [%s] Waiting %ss before re-sending...",
  353. request.txn_id,
  354. request.destination,
  355. delay,
  356. )
  357. yield self.clock.sleep(delay)
  358. retries_left -= 1
  359. else:
  360. raise
  361. except Exception as e:
  362. logger.warn(
  363. "{%s} [%s] Request failed: %s %s: %s",
  364. request.txn_id,
  365. request.destination,
  366. request.method,
  367. url_str,
  368. _flatten_response_never_received(e),
  369. )
  370. raise
  371. defer.returnValue(response)
  372. def sign_request(self, destination, method, url_bytes, headers_dict,
  373. content=None, destination_is=None):
  374. """
  375. Signs a request by adding an Authorization header to headers_dict
  376. Args:
  377. destination (bytes|None): The desination home server of the request.
  378. May be None if the destination is an identity server, in which case
  379. destination_is must be non-None.
  380. method (bytes): The HTTP method of the request
  381. url_bytes (bytes): The URI path of the request
  382. headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to
  383. append to
  384. content (object): The body of the request
  385. destination_is (bytes): As 'destination', but if the destination is an
  386. identity server
  387. Returns:
  388. None
  389. """
  390. request = {
  391. "method": method,
  392. "uri": url_bytes,
  393. "origin": self.server_name,
  394. }
  395. if destination is not None:
  396. request["destination"] = destination
  397. if destination_is is not None:
  398. request["destination_is"] = destination_is
  399. if content is not None:
  400. request["content"] = content
  401. request = sign_json(request, self.server_name, self.signing_key)
  402. auth_headers = []
  403. for key, sig in request["signatures"][self.server_name].items():
  404. auth_headers.append((
  405. "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
  406. self.server_name, key, sig,
  407. )).encode('ascii')
  408. )
  409. headers_dict[b"Authorization"] = auth_headers
  410. @defer.inlineCallbacks
  411. def put_json(self, destination, path, args={}, data={},
  412. json_data_callback=None,
  413. long_retries=False, timeout=None,
  414. ignore_backoff=False,
  415. backoff_on_404=False):
  416. """ Sends the specifed json data using PUT
  417. Args:
  418. destination (str): The remote server to send the HTTP request
  419. to.
  420. path (str): The HTTP path.
  421. args (dict): query params
  422. data (dict): A dict containing the data that will be used as
  423. the request body. This will be encoded as JSON.
  424. json_data_callback (callable): A callable returning the dict to
  425. use as the request body.
  426. long_retries (bool): A boolean that indicates whether we should
  427. retry for a short or long time.
  428. timeout(int): How long to try (in ms) the destination for before
  429. giving up. None indicates no timeout.
  430. ignore_backoff (bool): true to ignore the historical backoff data
  431. and try the request anyway.
  432. backoff_on_404 (bool): True if we should count a 404 response as
  433. a failure of the server (and should therefore back off future
  434. requests)
  435. Returns:
  436. Deferred: Succeeds when we get a 2xx HTTP response. The result
  437. will be the decoded JSON body.
  438. Fails with ``HttpResponseException`` if we get an HTTP response
  439. code >= 300.
  440. Fails with ``NotRetryingDestination`` if we are not yet ready
  441. to retry this server.
  442. Fails with ``FederationDeniedError`` if this destination
  443. is not on our federation whitelist
  444. """
  445. request = MatrixFederationRequest(
  446. method="PUT",
  447. destination=destination,
  448. path=path,
  449. query=args,
  450. json_callback=json_data_callback,
  451. json=data,
  452. )
  453. response = yield self._send_request(
  454. request,
  455. long_retries=long_retries,
  456. timeout=timeout,
  457. ignore_backoff=ignore_backoff,
  458. backoff_on_404=backoff_on_404,
  459. )
  460. body = yield _handle_json_response(
  461. self.hs.get_reactor(), self.default_timeout, request, response,
  462. )
  463. defer.returnValue(body)
  464. @defer.inlineCallbacks
  465. def post_json(self, destination, path, data={}, long_retries=False,
  466. timeout=None, ignore_backoff=False, args={}):
  467. """ Sends the specifed json data using POST
  468. Args:
  469. destination (str): The remote server to send the HTTP request
  470. to.
  471. path (str): The HTTP path.
  472. data (dict): A dict containing the data that will be used as
  473. the request body. This will be encoded as JSON.
  474. long_retries (bool): A boolean that indicates whether we should
  475. retry for a short or long time.
  476. timeout(int): How long to try (in ms) the destination for before
  477. giving up. None indicates no timeout.
  478. ignore_backoff (bool): true to ignore the historical backoff data and
  479. try the request anyway.
  480. args (dict): query params
  481. Returns:
  482. Deferred: Succeeds when we get a 2xx HTTP response. The result
  483. will be the decoded JSON body.
  484. Fails with ``HttpResponseException`` if we get an HTTP response
  485. code >= 300.
  486. Fails with ``NotRetryingDestination`` if we are not yet ready
  487. to retry this server.
  488. Fails with ``FederationDeniedError`` if this destination
  489. is not on our federation whitelist
  490. """
  491. request = MatrixFederationRequest(
  492. method="POST",
  493. destination=destination,
  494. path=path,
  495. query=args,
  496. json=data,
  497. )
  498. response = yield self._send_request(
  499. request,
  500. long_retries=long_retries,
  501. timeout=timeout,
  502. ignore_backoff=ignore_backoff,
  503. )
  504. if timeout:
  505. _sec_timeout = timeout / 1000
  506. else:
  507. _sec_timeout = self.default_timeout
  508. body = yield _handle_json_response(
  509. self.hs.get_reactor(), _sec_timeout, request, response,
  510. )
  511. defer.returnValue(body)
  512. @defer.inlineCallbacks
  513. def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
  514. timeout=None, ignore_backoff=False):
  515. """ GETs some json from the given host homeserver and path
  516. Args:
  517. destination (str): The remote server to send the HTTP request
  518. to.
  519. path (str): The HTTP path.
  520. args (dict|None): A dictionary used to create query strings, defaults to
  521. None.
  522. timeout (int): How long to try (in ms) the destination for before
  523. giving up. None indicates no timeout and that the request will
  524. be retried.
  525. ignore_backoff (bool): true to ignore the historical backoff data
  526. and try the request anyway.
  527. Returns:
  528. Deferred: Succeeds when we get a 2xx HTTP response. The result
  529. will be the decoded JSON body.
  530. Fails with ``HttpResponseException`` if we get an HTTP response
  531. code >= 300.
  532. Fails with ``NotRetryingDestination`` if we are not yet ready
  533. to retry this server.
  534. Fails with ``FederationDeniedError`` if this destination
  535. is not on our federation whitelist
  536. """
  537. logger.debug("get_json args: %s", args)
  538. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  539. request = MatrixFederationRequest(
  540. method="GET",
  541. destination=destination,
  542. path=path,
  543. query=args,
  544. )
  545. response = yield self._send_request(
  546. request,
  547. retry_on_dns_fail=retry_on_dns_fail,
  548. timeout=timeout,
  549. ignore_backoff=ignore_backoff,
  550. )
  551. body = yield _handle_json_response(
  552. self.hs.get_reactor(), self.default_timeout, request, response,
  553. )
  554. defer.returnValue(body)
  555. @defer.inlineCallbacks
  556. def delete_json(self, destination, path, long_retries=False,
  557. timeout=None, ignore_backoff=False, args={}):
  558. """Send a DELETE request to the remote expecting some json response
  559. Args:
  560. destination (str): The remote server to send the HTTP request
  561. to.
  562. path (str): The HTTP path.
  563. long_retries (bool): A boolean that indicates whether we should
  564. retry for a short or long time.
  565. timeout(int): How long to try (in ms) the destination for before
  566. giving up. None indicates no timeout.
  567. ignore_backoff (bool): true to ignore the historical backoff data and
  568. try the request anyway.
  569. Returns:
  570. Deferred: Succeeds when we get a 2xx HTTP response. The result
  571. will be the decoded JSON body.
  572. Fails with ``HttpResponseException`` if we get an HTTP response
  573. code >= 300.
  574. Fails with ``NotRetryingDestination`` if we are not yet ready
  575. to retry this server.
  576. Fails with ``FederationDeniedError`` if this destination
  577. is not on our federation whitelist
  578. """
  579. request = MatrixFederationRequest(
  580. method="DELETE",
  581. destination=destination,
  582. path=path,
  583. query=args,
  584. )
  585. response = yield self._send_request(
  586. request,
  587. long_retries=long_retries,
  588. timeout=timeout,
  589. ignore_backoff=ignore_backoff,
  590. )
  591. body = yield _handle_json_response(
  592. self.hs.get_reactor(), self.default_timeout, request, response,
  593. )
  594. defer.returnValue(body)
  595. @defer.inlineCallbacks
  596. def get_file(self, destination, path, output_stream, args={},
  597. retry_on_dns_fail=True, max_size=None,
  598. ignore_backoff=False):
  599. """GETs a file from a given homeserver
  600. Args:
  601. destination (str): The remote server to send the HTTP request to.
  602. path (str): The HTTP path to GET.
  603. output_stream (file): File to write the response body to.
  604. args (dict): Optional dictionary used to create the query string.
  605. ignore_backoff (bool): true to ignore the historical backoff data
  606. and try the request anyway.
  607. Returns:
  608. Deferred: resolves with an (int,dict) tuple of the file length and
  609. a dict of the response headers.
  610. Fails with ``HttpResponseException`` if we get an HTTP response code
  611. >= 300
  612. Fails with ``NotRetryingDestination`` if we are not yet ready
  613. to retry this server.
  614. Fails with ``FederationDeniedError`` if this destination
  615. is not on our federation whitelist
  616. """
  617. request = MatrixFederationRequest(
  618. method="GET",
  619. destination=destination,
  620. path=path,
  621. query=args,
  622. )
  623. response = yield self._send_request(
  624. request,
  625. retry_on_dns_fail=retry_on_dns_fail,
  626. ignore_backoff=ignore_backoff,
  627. )
  628. headers = dict(response.headers.getAllRawHeaders())
  629. try:
  630. d = _readBodyToFile(response, output_stream, max_size)
  631. d.addTimeout(self.default_timeout, self.hs.get_reactor())
  632. length = yield make_deferred_yieldable(d)
  633. except Exception as e:
  634. logger.warn(
  635. "{%s} [%s] Error reading response: %s",
  636. request.txn_id,
  637. request.destination,
  638. e,
  639. )
  640. raise
  641. logger.info(
  642. "{%s} [%s] Completed: %d %s [%d bytes]",
  643. request.txn_id,
  644. request.destination,
  645. response.code,
  646. response.phrase.decode('ascii', errors='replace'),
  647. length,
  648. )
  649. defer.returnValue((length, headers))
  650. class _ReadBodyToFileProtocol(protocol.Protocol):
  651. def __init__(self, stream, deferred, max_size):
  652. self.stream = stream
  653. self.deferred = deferred
  654. self.length = 0
  655. self.max_size = max_size
  656. def dataReceived(self, data):
  657. self.stream.write(data)
  658. self.length += len(data)
  659. if self.max_size is not None and self.length >= self.max_size:
  660. self.deferred.errback(SynapseError(
  661. 502,
  662. "Requested file is too large > %r bytes" % (self.max_size,),
  663. Codes.TOO_LARGE,
  664. ))
  665. self.deferred = defer.Deferred()
  666. self.transport.loseConnection()
  667. def connectionLost(self, reason):
  668. if reason.check(ResponseDone):
  669. self.deferred.callback(self.length)
  670. else:
  671. self.deferred.errback(reason)
  672. def _readBodyToFile(response, stream, max_size):
  673. d = defer.Deferred()
  674. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  675. return d
  676. def _flatten_response_never_received(e):
  677. if hasattr(e, "reasons"):
  678. reasons = ", ".join(
  679. _flatten_response_never_received(f.value)
  680. for f in e.reasons
  681. )
  682. return "%s:[%s]" % (type(e).__name__, reasons)
  683. else:
  684. return repr(e)
  685. def check_content_type_is_json(headers):
  686. """
  687. Check that a set of HTTP headers have a Content-Type header, and that it
  688. is application/json.
  689. Args:
  690. headers (twisted.web.http_headers.Headers): headers to check
  691. Raises:
  692. RuntimeError if the
  693. """
  694. c_type = headers.getRawHeaders(b"Content-Type")
  695. if c_type is None:
  696. raise RuntimeError(
  697. "No Content-Type header"
  698. )
  699. c_type = c_type[0].decode('ascii') # only the first header
  700. val, options = cgi.parse_header(c_type)
  701. if val != "application/json":
  702. raise RuntimeError(
  703. "Content-Type not application/json: was '%s'" % c_type
  704. )
  705. def encode_query_args(args):
  706. if args is None:
  707. return b""
  708. encoded_args = {}
  709. for k, vs in args.items():
  710. if isinstance(vs, string_types):
  711. vs = [vs]
  712. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  713. query_bytes = urllib.parse.urlencode(encoded_args, True)
  714. return query_bytes.encode('utf8')