matrixfederationclient.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import cgi
  17. import logging
  18. import random
  19. import sys
  20. from io import BytesIO
  21. from six import PY3, raise_from, string_types
  22. from six.moves import urllib
  23. import attr
  24. import treq
  25. from canonicaljson import encode_canonical_json
  26. from prometheus_client import Counter
  27. from signedjson.sign import sign_json
  28. from zope.interface import implementer
  29. from twisted.internet import defer, protocol
  30. from twisted.internet.error import DNSLookupError
  31. from twisted.internet.interfaces import IReactorPluggableNameResolver
  32. from twisted.internet.task import _EPSILON, Cooperator
  33. from twisted.web._newclient import ResponseDone
  34. from twisted.web.http_headers import Headers
  35. import synapse.metrics
  36. import synapse.util.retryutils
  37. from synapse.api.errors import (
  38. Codes,
  39. FederationDeniedError,
  40. HttpResponseException,
  41. RequestSendFailed,
  42. SynapseError,
  43. )
  44. from synapse.http import QuieterFileBodyProducer
  45. from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver
  46. from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
  47. from synapse.logging.context import make_deferred_yieldable
  48. from synapse.util.async_helpers import timeout_deferred
  49. from synapse.util.metrics import Measure
  50. logger = logging.getLogger(__name__)
  51. outgoing_requests_counter = Counter(
  52. "synapse_http_matrixfederationclient_requests", "", ["method"]
  53. )
  54. incoming_responses_counter = Counter(
  55. "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
  56. )
  57. MAX_LONG_RETRIES = 10
  58. MAX_SHORT_RETRIES = 3
  59. if PY3:
  60. MAXINT = sys.maxsize
  61. else:
  62. MAXINT = sys.maxint
  63. _next_id = 1
  64. @attr.s
  65. class MatrixFederationRequest(object):
  66. method = attr.ib()
  67. """HTTP method
  68. :type: str
  69. """
  70. path = attr.ib()
  71. """HTTP path
  72. :type: str
  73. """
  74. destination = attr.ib()
  75. """The remote server to send the HTTP request to.
  76. :type: str"""
  77. json = attr.ib(default=None)
  78. """JSON to send in the body.
  79. :type: dict|None
  80. """
  81. json_callback = attr.ib(default=None)
  82. """A callback to generate the JSON.
  83. :type: func|None
  84. """
  85. query = attr.ib(default=None)
  86. """Query arguments.
  87. :type: dict|None
  88. """
  89. txn_id = attr.ib(default=None)
  90. """Unique ID for this request (for logging)
  91. :type: str|None
  92. """
  93. def __attrs_post_init__(self):
  94. global _next_id
  95. self.txn_id = "%s-O-%s" % (self.method, _next_id)
  96. _next_id = (_next_id + 1) % (MAXINT - 1)
  97. def get_json(self):
  98. if self.json_callback:
  99. return self.json_callback()
  100. return self.json
  101. @defer.inlineCallbacks
  102. def _handle_json_response(reactor, timeout_sec, request, response):
  103. """
  104. Reads the JSON body of a response, with a timeout
  105. Args:
  106. reactor (IReactor): twisted reactor, for the timeout
  107. timeout_sec (float): number of seconds to wait for response to complete
  108. request (MatrixFederationRequest): the request that triggered the response
  109. response (IResponse): response to the request
  110. Returns:
  111. dict: parsed JSON response
  112. """
  113. try:
  114. check_content_type_is_json(response.headers)
  115. d = treq.json_content(response)
  116. d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
  117. body = yield make_deferred_yieldable(d)
  118. except Exception as e:
  119. logger.warn(
  120. "{%s} [%s] Error reading response: %s",
  121. request.txn_id,
  122. request.destination,
  123. e,
  124. )
  125. raise
  126. logger.info(
  127. "{%s} [%s] Completed: %d %s",
  128. request.txn_id,
  129. request.destination,
  130. response.code,
  131. response.phrase.decode("ascii", errors="replace"),
  132. )
  133. defer.returnValue(body)
  134. class MatrixFederationHttpClient(object):
  135. """HTTP client used to talk to other homeservers over the federation
  136. protocol. Send client certificates and signs requests.
  137. Attributes:
  138. agent (twisted.web.client.Agent): The twisted Agent used to send the
  139. requests.
  140. """
  141. def __init__(self, hs, tls_client_options_factory):
  142. self.hs = hs
  143. self.signing_key = hs.config.signing_key[0]
  144. self.server_name = hs.hostname
  145. real_reactor = hs.get_reactor()
  146. # We need to use a DNS resolver which filters out blacklisted IP
  147. # addresses, to prevent DNS rebinding.
  148. nameResolver = IPBlacklistingResolver(
  149. real_reactor, None, hs.config.federation_ip_range_blacklist
  150. )
  151. @implementer(IReactorPluggableNameResolver)
  152. class Reactor(object):
  153. def __getattr__(_self, attr):
  154. if attr == "nameResolver":
  155. return nameResolver
  156. else:
  157. return getattr(real_reactor, attr)
  158. self.reactor = Reactor()
  159. self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory)
  160. # Use a BlacklistingAgentWrapper to prevent circumventing the IP
  161. # blacklist via IP literals in server names
  162. self.agent = BlacklistingAgentWrapper(
  163. self.agent,
  164. self.reactor,
  165. ip_blacklist=hs.config.federation_ip_range_blacklist,
  166. )
  167. self.clock = hs.get_clock()
  168. self._store = hs.get_datastore()
  169. self.version_string_bytes = hs.version_string.encode("ascii")
  170. self.default_timeout = 60
  171. def schedule(x):
  172. self.reactor.callLater(_EPSILON, x)
  173. self._cooperator = Cooperator(scheduler=schedule)
  174. @defer.inlineCallbacks
  175. def _send_request_with_optional_trailing_slash(
  176. self, request, try_trailing_slash_on_400=False, **send_request_args
  177. ):
  178. """Wrapper for _send_request which can optionally retry the request
  179. upon receiving a combination of a 400 HTTP response code and a
  180. 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
  181. due to #3622.
  182. Args:
  183. request (MatrixFederationRequest): details of request to be sent
  184. try_trailing_slash_on_400 (bool): Whether on receiving a 400
  185. 'M_UNRECOGNIZED' from the server to retry the request with a
  186. trailing slash appended to the request path.
  187. send_request_args (Dict): A dictionary of arguments to pass to
  188. `_send_request()`.
  189. Raises:
  190. HttpResponseException: If we get an HTTP response code >= 300
  191. (except 429).
  192. Returns:
  193. Deferred[Dict]: Parsed JSON response body.
  194. """
  195. try:
  196. response = yield self._send_request(request, **send_request_args)
  197. except HttpResponseException as e:
  198. # Received an HTTP error > 300. Check if it meets the requirements
  199. # to retry with a trailing slash
  200. if not try_trailing_slash_on_400:
  201. raise
  202. if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
  203. raise
  204. # Retry with a trailing slash if we received a 400 with
  205. # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
  206. # trailing slash on Synapse <= v0.99.3.
  207. logger.info("Retrying request with trailing slash")
  208. request.path += "/"
  209. response = yield self._send_request(request, **send_request_args)
  210. defer.returnValue(response)
  211. @defer.inlineCallbacks
  212. def _send_request(
  213. self,
  214. request,
  215. retry_on_dns_fail=True,
  216. timeout=None,
  217. long_retries=False,
  218. ignore_backoff=False,
  219. backoff_on_404=False,
  220. ):
  221. """
  222. Sends a request to the given server.
  223. Args:
  224. request (MatrixFederationRequest): details of request to be sent
  225. timeout (int|None): number of milliseconds to wait for the response headers
  226. (including connecting to the server), *for each attempt*.
  227. 60s by default.
  228. long_retries (bool): whether to use the long retry algorithm.
  229. The regular retry algorithm makes 4 attempts, with intervals
  230. [0.5s, 1s, 2s].
  231. The long retry algorithm makes 11 attempts, with intervals
  232. [4s, 16s, 60s, 60s, ...]
  233. Both algorithms add -20%/+40% jitter to the retry intervals.
  234. Note that the above intervals are *in addition* to the time spent
  235. waiting for the request to complete (up to `timeout` ms).
  236. NB: the long retry algorithm takes over 20 minutes to complete, with
  237. a default timeout of 60s!
  238. ignore_backoff (bool): true to ignore the historical backoff data
  239. and try the request anyway.
  240. backoff_on_404 (bool): Back off if we get a 404
  241. Returns:
  242. Deferred[twisted.web.client.Response]: resolves with the HTTP
  243. response object on success.
  244. Raises:
  245. HttpResponseException: If we get an HTTP response code >= 300
  246. (except 429).
  247. NotRetryingDestination: If we are not yet ready to retry this
  248. server.
  249. FederationDeniedError: If this destination is not on our
  250. federation whitelist
  251. RequestSendFailed: If there were problems connecting to the
  252. remote, due to e.g. DNS failures, connection timeouts etc.
  253. """
  254. if timeout:
  255. _sec_timeout = timeout / 1000
  256. else:
  257. _sec_timeout = self.default_timeout
  258. if (
  259. self.hs.config.federation_domain_whitelist is not None
  260. and request.destination not in self.hs.config.federation_domain_whitelist
  261. ):
  262. raise FederationDeniedError(request.destination)
  263. limiter = yield synapse.util.retryutils.get_retry_limiter(
  264. request.destination,
  265. self.clock,
  266. self._store,
  267. backoff_on_404=backoff_on_404,
  268. ignore_backoff=ignore_backoff,
  269. )
  270. method_bytes = request.method.encode("ascii")
  271. destination_bytes = request.destination.encode("ascii")
  272. path_bytes = request.path.encode("ascii")
  273. if request.query:
  274. query_bytes = encode_query_args(request.query)
  275. else:
  276. query_bytes = b""
  277. headers_dict = {b"User-Agent": [self.version_string_bytes]}
  278. with limiter:
  279. # XXX: Would be much nicer to retry only at the transaction-layer
  280. # (once we have reliable transactions in place)
  281. if long_retries:
  282. retries_left = MAX_LONG_RETRIES
  283. else:
  284. retries_left = MAX_SHORT_RETRIES
  285. url_bytes = urllib.parse.urlunparse(
  286. (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
  287. )
  288. url_str = url_bytes.decode("ascii")
  289. url_to_sign_bytes = urllib.parse.urlunparse(
  290. (b"", b"", path_bytes, None, query_bytes, b"")
  291. )
  292. while True:
  293. try:
  294. json = request.get_json()
  295. if json:
  296. headers_dict[b"Content-Type"] = [b"application/json"]
  297. auth_headers = self.build_auth_headers(
  298. destination_bytes, method_bytes, url_to_sign_bytes, json
  299. )
  300. data = encode_canonical_json(json)
  301. producer = QuieterFileBodyProducer(
  302. BytesIO(data), cooperator=self._cooperator
  303. )
  304. else:
  305. producer = None
  306. auth_headers = self.build_auth_headers(
  307. destination_bytes, method_bytes, url_to_sign_bytes
  308. )
  309. headers_dict[b"Authorization"] = auth_headers
  310. logger.info(
  311. "{%s} [%s] Sending request: %s %s; timeout %fs",
  312. request.txn_id,
  313. request.destination,
  314. request.method,
  315. url_str,
  316. _sec_timeout,
  317. )
  318. try:
  319. with Measure(self.clock, "outbound_request"):
  320. # we don't want all the fancy cookie and redirect handling
  321. # that treq.request gives: just use the raw Agent.
  322. request_deferred = self.agent.request(
  323. method_bytes,
  324. url_bytes,
  325. headers=Headers(headers_dict),
  326. bodyProducer=producer,
  327. )
  328. request_deferred = timeout_deferred(
  329. request_deferred,
  330. timeout=_sec_timeout,
  331. reactor=self.reactor,
  332. )
  333. response = yield request_deferred
  334. except DNSLookupError as e:
  335. raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
  336. except Exception as e:
  337. logger.info("Failed to send request: %s", e)
  338. raise_from(RequestSendFailed(e, can_retry=True), e)
  339. logger.info(
  340. "{%s} [%s] Got response headers: %d %s",
  341. request.txn_id,
  342. request.destination,
  343. response.code,
  344. response.phrase.decode("ascii", errors="replace"),
  345. )
  346. if 200 <= response.code < 300:
  347. pass
  348. else:
  349. # :'(
  350. # Update transactions table?
  351. d = treq.content(response)
  352. d = timeout_deferred(
  353. d, timeout=_sec_timeout, reactor=self.reactor
  354. )
  355. try:
  356. body = yield make_deferred_yieldable(d)
  357. except Exception as e:
  358. # Eh, we're already going to raise an exception so lets
  359. # ignore if this fails.
  360. logger.warn(
  361. "{%s} [%s] Failed to get error response: %s %s: %s",
  362. request.txn_id,
  363. request.destination,
  364. request.method,
  365. url_str,
  366. _flatten_response_never_received(e),
  367. )
  368. body = None
  369. e = HttpResponseException(response.code, response.phrase, body)
  370. # Retry if the error is a 429 (Too Many Requests),
  371. # otherwise just raise a standard HttpResponseException
  372. if response.code == 429:
  373. raise_from(RequestSendFailed(e, can_retry=True), e)
  374. else:
  375. raise e
  376. break
  377. except RequestSendFailed as e:
  378. logger.warn(
  379. "{%s} [%s] Request failed: %s %s: %s",
  380. request.txn_id,
  381. request.destination,
  382. request.method,
  383. url_str,
  384. _flatten_response_never_received(e.inner_exception),
  385. )
  386. if not e.can_retry:
  387. raise
  388. if retries_left and not timeout:
  389. if long_retries:
  390. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  391. delay = min(delay, 60)
  392. delay *= random.uniform(0.8, 1.4)
  393. else:
  394. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  395. delay = min(delay, 2)
  396. delay *= random.uniform(0.8, 1.4)
  397. logger.debug(
  398. "{%s} [%s] Waiting %ss before re-sending...",
  399. request.txn_id,
  400. request.destination,
  401. delay,
  402. )
  403. yield self.clock.sleep(delay)
  404. retries_left -= 1
  405. else:
  406. raise
  407. except Exception as e:
  408. logger.warn(
  409. "{%s} [%s] Request failed: %s %s: %s",
  410. request.txn_id,
  411. request.destination,
  412. request.method,
  413. url_str,
  414. _flatten_response_never_received(e),
  415. )
  416. raise
  417. defer.returnValue(response)
  418. def build_auth_headers(
  419. self, destination, method, url_bytes, content=None, destination_is=None
  420. ):
  421. """
  422. Builds the Authorization headers for a federation request
  423. Args:
  424. destination (bytes|None): The desination home server of the request.
  425. May be None if the destination is an identity server, in which case
  426. destination_is must be non-None.
  427. method (bytes): The HTTP method of the request
  428. url_bytes (bytes): The URI path of the request
  429. content (object): The body of the request
  430. destination_is (bytes): As 'destination', but if the destination is an
  431. identity server
  432. Returns:
  433. list[bytes]: a list of headers to be added as "Authorization:" headers
  434. """
  435. request = {"method": method, "uri": url_bytes, "origin": self.server_name}
  436. if destination is not None:
  437. request["destination"] = destination
  438. if destination_is is not None:
  439. request["destination_is"] = destination_is
  440. if content is not None:
  441. request["content"] = content
  442. request = sign_json(request, self.server_name, self.signing_key)
  443. auth_headers = []
  444. for key, sig in request["signatures"][self.server_name].items():
  445. auth_headers.append(
  446. (
  447. 'X-Matrix origin=%s,key="%s",sig="%s"'
  448. % (self.server_name, key, sig)
  449. ).encode("ascii")
  450. )
  451. return auth_headers
  452. @defer.inlineCallbacks
  453. def put_json(
  454. self,
  455. destination,
  456. path,
  457. args={},
  458. data={},
  459. json_data_callback=None,
  460. long_retries=False,
  461. timeout=None,
  462. ignore_backoff=False,
  463. backoff_on_404=False,
  464. try_trailing_slash_on_400=False,
  465. ):
  466. """ Sends the specifed json data using PUT
  467. Args:
  468. destination (str): The remote server to send the HTTP request
  469. to.
  470. path (str): The HTTP path.
  471. args (dict): query params
  472. data (dict): A dict containing the data that will be used as
  473. the request body. This will be encoded as JSON.
  474. json_data_callback (callable): A callable returning the dict to
  475. use as the request body.
  476. long_retries (bool): whether to use the long retry algorithm. See
  477. docs on _send_request for details.
  478. timeout (int|None): number of milliseconds to wait for the response headers
  479. (including connecting to the server), *for each attempt*.
  480. self._default_timeout (60s) by default.
  481. ignore_backoff (bool): true to ignore the historical backoff data
  482. and try the request anyway.
  483. backoff_on_404 (bool): True if we should count a 404 response as
  484. a failure of the server (and should therefore back off future
  485. requests).
  486. try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
  487. response we should try appending a trailing slash to the end
  488. of the request. Workaround for #3622 in Synapse <= v0.99.3. This
  489. will be attempted before backing off if backing off has been
  490. enabled.
  491. Returns:
  492. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  493. result will be the decoded JSON body.
  494. Raises:
  495. HttpResponseException: If we get an HTTP response code >= 300
  496. (except 429).
  497. NotRetryingDestination: If we are not yet ready to retry this
  498. server.
  499. FederationDeniedError: If this destination is not on our
  500. federation whitelist
  501. RequestSendFailed: If there were problems connecting to the
  502. remote, due to e.g. DNS failures, connection timeouts etc.
  503. """
  504. request = MatrixFederationRequest(
  505. method="PUT",
  506. destination=destination,
  507. path=path,
  508. query=args,
  509. json_callback=json_data_callback,
  510. json=data,
  511. )
  512. response = yield self._send_request_with_optional_trailing_slash(
  513. request,
  514. try_trailing_slash_on_400,
  515. backoff_on_404=backoff_on_404,
  516. ignore_backoff=ignore_backoff,
  517. long_retries=long_retries,
  518. timeout=timeout,
  519. )
  520. body = yield _handle_json_response(
  521. self.reactor, self.default_timeout, request, response
  522. )
  523. defer.returnValue(body)
  524. @defer.inlineCallbacks
  525. def post_json(
  526. self,
  527. destination,
  528. path,
  529. data={},
  530. long_retries=False,
  531. timeout=None,
  532. ignore_backoff=False,
  533. args={},
  534. ):
  535. """ Sends the specifed json data using POST
  536. Args:
  537. destination (str): The remote server to send the HTTP request
  538. to.
  539. path (str): The HTTP path.
  540. data (dict): A dict containing the data that will be used as
  541. the request body. This will be encoded as JSON.
  542. long_retries (bool): whether to use the long retry algorithm. See
  543. docs on _send_request for details.
  544. timeout (int|None): number of milliseconds to wait for the response headers
  545. (including connecting to the server), *for each attempt*.
  546. self._default_timeout (60s) by default.
  547. ignore_backoff (bool): true to ignore the historical backoff data and
  548. try the request anyway.
  549. args (dict): query params
  550. Returns:
  551. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  552. result will be the decoded JSON body.
  553. Raises:
  554. HttpResponseException: If we get an HTTP response code >= 300
  555. (except 429).
  556. NotRetryingDestination: If we are not yet ready to retry this
  557. server.
  558. FederationDeniedError: If this destination is not on our
  559. federation whitelist
  560. RequestSendFailed: If there were problems connecting to the
  561. remote, due to e.g. DNS failures, connection timeouts etc.
  562. """
  563. request = MatrixFederationRequest(
  564. method="POST", destination=destination, path=path, query=args, json=data
  565. )
  566. response = yield self._send_request(
  567. request,
  568. long_retries=long_retries,
  569. timeout=timeout,
  570. ignore_backoff=ignore_backoff,
  571. )
  572. if timeout:
  573. _sec_timeout = timeout / 1000
  574. else:
  575. _sec_timeout = self.default_timeout
  576. body = yield _handle_json_response(
  577. self.reactor, _sec_timeout, request, response
  578. )
  579. defer.returnValue(body)
  580. @defer.inlineCallbacks
  581. def get_json(
  582. self,
  583. destination,
  584. path,
  585. args=None,
  586. retry_on_dns_fail=True,
  587. timeout=None,
  588. ignore_backoff=False,
  589. try_trailing_slash_on_400=False,
  590. ):
  591. """ GETs some json from the given host homeserver and path
  592. Args:
  593. destination (str): The remote server to send the HTTP request
  594. to.
  595. path (str): The HTTP path.
  596. args (dict|None): A dictionary used to create query strings, defaults to
  597. None.
  598. timeout (int|None): number of milliseconds to wait for the response headers
  599. (including connecting to the server), *for each attempt*.
  600. self._default_timeout (60s) by default.
  601. ignore_backoff (bool): true to ignore the historical backoff data
  602. and try the request anyway.
  603. try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
  604. response we should try appending a trailing slash to the end of
  605. the request. Workaround for #3622 in Synapse <= v0.99.3.
  606. Returns:
  607. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  608. result will be the decoded JSON body.
  609. Raises:
  610. HttpResponseException: If we get an HTTP response code >= 300
  611. (except 429).
  612. NotRetryingDestination: If we are not yet ready to retry this
  613. server.
  614. FederationDeniedError: If this destination is not on our
  615. federation whitelist
  616. RequestSendFailed: If there were problems connecting to the
  617. remote, due to e.g. DNS failures, connection timeouts etc.
  618. """
  619. request = MatrixFederationRequest(
  620. method="GET", destination=destination, path=path, query=args
  621. )
  622. response = yield self._send_request_with_optional_trailing_slash(
  623. request,
  624. try_trailing_slash_on_400,
  625. backoff_on_404=False,
  626. ignore_backoff=ignore_backoff,
  627. retry_on_dns_fail=retry_on_dns_fail,
  628. timeout=timeout,
  629. )
  630. body = yield _handle_json_response(
  631. self.reactor, self.default_timeout, request, response
  632. )
  633. defer.returnValue(body)
  634. @defer.inlineCallbacks
  635. def delete_json(
  636. self,
  637. destination,
  638. path,
  639. long_retries=False,
  640. timeout=None,
  641. ignore_backoff=False,
  642. args={},
  643. ):
  644. """Send a DELETE request to the remote expecting some json response
  645. Args:
  646. destination (str): The remote server to send the HTTP request
  647. to.
  648. path (str): The HTTP path.
  649. long_retries (bool): whether to use the long retry algorithm. See
  650. docs on _send_request for details.
  651. timeout (int|None): number of milliseconds to wait for the response headers
  652. (including connecting to the server), *for each attempt*.
  653. self._default_timeout (60s) by default.
  654. ignore_backoff (bool): true to ignore the historical backoff data and
  655. try the request anyway.
  656. args (dict): query params
  657. Returns:
  658. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  659. result will be the decoded JSON body.
  660. Raises:
  661. HttpResponseException: If we get an HTTP response code >= 300
  662. (except 429).
  663. NotRetryingDestination: If we are not yet ready to retry this
  664. server.
  665. FederationDeniedError: If this destination is not on our
  666. federation whitelist
  667. RequestSendFailed: If there were problems connecting to the
  668. remote, due to e.g. DNS failures, connection timeouts etc.
  669. """
  670. request = MatrixFederationRequest(
  671. method="DELETE", destination=destination, path=path, query=args
  672. )
  673. response = yield self._send_request(
  674. request,
  675. long_retries=long_retries,
  676. timeout=timeout,
  677. ignore_backoff=ignore_backoff,
  678. )
  679. body = yield _handle_json_response(
  680. self.reactor, self.default_timeout, request, response
  681. )
  682. defer.returnValue(body)
  683. @defer.inlineCallbacks
  684. def get_file(
  685. self,
  686. destination,
  687. path,
  688. output_stream,
  689. args={},
  690. retry_on_dns_fail=True,
  691. max_size=None,
  692. ignore_backoff=False,
  693. ):
  694. """GETs a file from a given homeserver
  695. Args:
  696. destination (str): The remote server to send the HTTP request to.
  697. path (str): The HTTP path to GET.
  698. output_stream (file): File to write the response body to.
  699. args (dict): Optional dictionary used to create the query string.
  700. ignore_backoff (bool): true to ignore the historical backoff data
  701. and try the request anyway.
  702. Returns:
  703. Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of
  704. the file length and a dict of the response headers.
  705. Raises:
  706. HttpResponseException: If we get an HTTP response code >= 300
  707. (except 429).
  708. NotRetryingDestination: If we are not yet ready to retry this
  709. server.
  710. FederationDeniedError: If this destination is not on our
  711. federation whitelist
  712. RequestSendFailed: If there were problems connecting to the
  713. remote, due to e.g. DNS failures, connection timeouts etc.
  714. """
  715. request = MatrixFederationRequest(
  716. method="GET", destination=destination, path=path, query=args
  717. )
  718. response = yield self._send_request(
  719. request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
  720. )
  721. headers = dict(response.headers.getAllRawHeaders())
  722. try:
  723. d = _readBodyToFile(response, output_stream, max_size)
  724. d.addTimeout(self.default_timeout, self.reactor)
  725. length = yield make_deferred_yieldable(d)
  726. except Exception as e:
  727. logger.warn(
  728. "{%s} [%s] Error reading response: %s",
  729. request.txn_id,
  730. request.destination,
  731. e,
  732. )
  733. raise
  734. logger.info(
  735. "{%s} [%s] Completed: %d %s [%d bytes]",
  736. request.txn_id,
  737. request.destination,
  738. response.code,
  739. response.phrase.decode("ascii", errors="replace"),
  740. length,
  741. )
  742. defer.returnValue((length, headers))
  743. class _ReadBodyToFileProtocol(protocol.Protocol):
  744. def __init__(self, stream, deferred, max_size):
  745. self.stream = stream
  746. self.deferred = deferred
  747. self.length = 0
  748. self.max_size = max_size
  749. def dataReceived(self, data):
  750. self.stream.write(data)
  751. self.length += len(data)
  752. if self.max_size is not None and self.length >= self.max_size:
  753. self.deferred.errback(
  754. SynapseError(
  755. 502,
  756. "Requested file is too large > %r bytes" % (self.max_size,),
  757. Codes.TOO_LARGE,
  758. )
  759. )
  760. self.deferred = defer.Deferred()
  761. self.transport.loseConnection()
  762. def connectionLost(self, reason):
  763. if reason.check(ResponseDone):
  764. self.deferred.callback(self.length)
  765. else:
  766. self.deferred.errback(reason)
  767. def _readBodyToFile(response, stream, max_size):
  768. d = defer.Deferred()
  769. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  770. return d
  771. def _flatten_response_never_received(e):
  772. if hasattr(e, "reasons"):
  773. reasons = ", ".join(
  774. _flatten_response_never_received(f.value) for f in e.reasons
  775. )
  776. return "%s:[%s]" % (type(e).__name__, reasons)
  777. else:
  778. return repr(e)
  779. def check_content_type_is_json(headers):
  780. """
  781. Check that a set of HTTP headers have a Content-Type header, and that it
  782. is application/json.
  783. Args:
  784. headers (twisted.web.http_headers.Headers): headers to check
  785. Raises:
  786. RequestSendFailed: if the Content-Type header is missing or isn't JSON
  787. """
  788. c_type = headers.getRawHeaders(b"Content-Type")
  789. if c_type is None:
  790. raise RequestSendFailed(RuntimeError("No Content-Type header"), can_retry=False)
  791. c_type = c_type[0].decode("ascii") # only the first header
  792. val, options = cgi.parse_header(c_type)
  793. if val != "application/json":
  794. raise RequestSendFailed(
  795. RuntimeError("Content-Type not application/json: was '%s'" % c_type),
  796. can_retry=False,
  797. )
  798. def encode_query_args(args):
  799. if args is None:
  800. return b""
  801. encoded_args = {}
  802. for k, vs in args.items():
  803. if isinstance(vs, string_types):
  804. vs = [vs]
  805. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  806. query_bytes = urllib.parse.urlencode(encoded_args, True)
  807. return query_bytes.encode("utf8")