matrixfederationclient.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import cgi
  17. import logging
  18. import random
  19. import sys
  20. import urllib
  21. from io import BytesIO
  22. import attr
  23. import treq
  24. from canonicaljson import encode_canonical_json
  25. from prometheus_client import Counter
  26. from signedjson.sign import sign_json
  27. from zope.interface import implementer
  28. from twisted.internet import defer, protocol
  29. from twisted.internet.error import DNSLookupError
  30. from twisted.internet.interfaces import IReactorPluggableNameResolver
  31. from twisted.internet.task import _EPSILON, Cooperator
  32. from twisted.web._newclient import ResponseDone
  33. from twisted.web.http_headers import Headers
  34. import synapse.metrics
  35. import synapse.util.retryutils
  36. from synapse.api.errors import (
  37. Codes,
  38. FederationDeniedError,
  39. HttpResponseException,
  40. RequestSendFailed,
  41. SynapseError,
  42. )
  43. from synapse.http import QuieterFileBodyProducer
  44. from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver
  45. from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
  46. from synapse.logging.context import make_deferred_yieldable
  47. from synapse.logging.opentracing import (
  48. inject_active_span_byte_dict,
  49. set_tag,
  50. start_active_span,
  51. tags,
  52. )
  53. from synapse.util.async_helpers import timeout_deferred
  54. from synapse.util.metrics import Measure
  55. logger = logging.getLogger(__name__)
  56. outgoing_requests_counter = Counter(
  57. "synapse_http_matrixfederationclient_requests", "", ["method"]
  58. )
  59. incoming_responses_counter = Counter(
  60. "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
  61. )
  62. MAX_LONG_RETRIES = 10
  63. MAX_SHORT_RETRIES = 3
  64. MAXINT = sys.maxsize
  65. _next_id = 1
  66. @attr.s
  67. class MatrixFederationRequest(object):
  68. method = attr.ib()
  69. """HTTP method
  70. :type: str
  71. """
  72. path = attr.ib()
  73. """HTTP path
  74. :type: str
  75. """
  76. destination = attr.ib()
  77. """The remote server to send the HTTP request to.
  78. :type: str"""
  79. json = attr.ib(default=None)
  80. """JSON to send in the body.
  81. :type: dict|None
  82. """
  83. json_callback = attr.ib(default=None)
  84. """A callback to generate the JSON.
  85. :type: func|None
  86. """
  87. query = attr.ib(default=None)
  88. """Query arguments.
  89. :type: dict|None
  90. """
  91. txn_id = attr.ib(default=None)
  92. """Unique ID for this request (for logging)
  93. :type: str|None
  94. """
  95. def __attrs_post_init__(self):
  96. global _next_id
  97. self.txn_id = "%s-O-%s" % (self.method, _next_id)
  98. _next_id = (_next_id + 1) % (MAXINT - 1)
  99. def get_json(self):
  100. if self.json_callback:
  101. return self.json_callback()
  102. return self.json
  103. @defer.inlineCallbacks
  104. def _handle_json_response(reactor, timeout_sec, request, response):
  105. """
  106. Reads the JSON body of a response, with a timeout
  107. Args:
  108. reactor (IReactor): twisted reactor, for the timeout
  109. timeout_sec (float): number of seconds to wait for response to complete
  110. request (MatrixFederationRequest): the request that triggered the response
  111. response (IResponse): response to the request
  112. Returns:
  113. dict: parsed JSON response
  114. """
  115. try:
  116. check_content_type_is_json(response.headers)
  117. d = treq.json_content(response)
  118. d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
  119. body = yield make_deferred_yieldable(d)
  120. except TimeoutError as e:
  121. logger.warning(
  122. "{%s} [%s] Timed out reading response", request.txn_id, request.destination,
  123. )
  124. raise RequestSendFailed(e, can_retry=True) from e
  125. except Exception as e:
  126. logger.warning(
  127. "{%s} [%s] Error reading response: %s",
  128. request.txn_id,
  129. request.destination,
  130. e,
  131. )
  132. raise
  133. logger.info(
  134. "{%s} [%s] Completed: %d %s",
  135. request.txn_id,
  136. request.destination,
  137. response.code,
  138. response.phrase.decode("ascii", errors="replace"),
  139. )
  140. return body
  141. class MatrixFederationHttpClient(object):
  142. """HTTP client used to talk to other homeservers over the federation
  143. protocol. Send client certificates and signs requests.
  144. Attributes:
  145. agent (twisted.web.client.Agent): The twisted Agent used to send the
  146. requests.
  147. """
  148. def __init__(self, hs, tls_client_options_factory):
  149. self.hs = hs
  150. self.signing_key = hs.config.signing_key[0]
  151. self.server_name = hs.hostname
  152. real_reactor = hs.get_reactor()
  153. # We need to use a DNS resolver which filters out blacklisted IP
  154. # addresses, to prevent DNS rebinding.
  155. nameResolver = IPBlacklistingResolver(
  156. real_reactor, None, hs.config.federation_ip_range_blacklist
  157. )
  158. @implementer(IReactorPluggableNameResolver)
  159. class Reactor(object):
  160. def __getattr__(_self, attr):
  161. if attr == "nameResolver":
  162. return nameResolver
  163. else:
  164. return getattr(real_reactor, attr)
  165. self.reactor = Reactor()
  166. self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory)
  167. # Use a BlacklistingAgentWrapper to prevent circumventing the IP
  168. # blacklist via IP literals in server names
  169. self.agent = BlacklistingAgentWrapper(
  170. self.agent,
  171. self.reactor,
  172. ip_blacklist=hs.config.federation_ip_range_blacklist,
  173. )
  174. self.clock = hs.get_clock()
  175. self._store = hs.get_datastore()
  176. self.version_string_bytes = hs.version_string.encode("ascii")
  177. self.default_timeout = 60
  178. def schedule(x):
  179. self.reactor.callLater(_EPSILON, x)
  180. self._cooperator = Cooperator(scheduler=schedule)
  181. @defer.inlineCallbacks
  182. def _send_request_with_optional_trailing_slash(
  183. self, request, try_trailing_slash_on_400=False, **send_request_args
  184. ):
  185. """Wrapper for _send_request which can optionally retry the request
  186. upon receiving a combination of a 400 HTTP response code and a
  187. 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
  188. due to #3622.
  189. Args:
  190. request (MatrixFederationRequest): details of request to be sent
  191. try_trailing_slash_on_400 (bool): Whether on receiving a 400
  192. 'M_UNRECOGNIZED' from the server to retry the request with a
  193. trailing slash appended to the request path.
  194. send_request_args (Dict): A dictionary of arguments to pass to
  195. `_send_request()`.
  196. Raises:
  197. HttpResponseException: If we get an HTTP response code >= 300
  198. (except 429).
  199. Returns:
  200. Deferred[Dict]: Parsed JSON response body.
  201. """
  202. try:
  203. response = yield self._send_request(request, **send_request_args)
  204. except HttpResponseException as e:
  205. # Received an HTTP error > 300. Check if it meets the requirements
  206. # to retry with a trailing slash
  207. if not try_trailing_slash_on_400:
  208. raise
  209. if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
  210. raise
  211. # Retry with a trailing slash if we received a 400 with
  212. # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
  213. # trailing slash on Synapse <= v0.99.3.
  214. logger.info("Retrying request with trailing slash")
  215. request.path += "/"
  216. response = yield self._send_request(request, **send_request_args)
  217. return response
  218. @defer.inlineCallbacks
  219. def _send_request(
  220. self,
  221. request,
  222. retry_on_dns_fail=True,
  223. timeout=None,
  224. long_retries=False,
  225. ignore_backoff=False,
  226. backoff_on_404=False,
  227. ):
  228. """
  229. Sends a request to the given server.
  230. Args:
  231. request (MatrixFederationRequest): details of request to be sent
  232. timeout (int|None): number of milliseconds to wait for the response headers
  233. (including connecting to the server), *for each attempt*.
  234. 60s by default.
  235. long_retries (bool): whether to use the long retry algorithm.
  236. The regular retry algorithm makes 4 attempts, with intervals
  237. [0.5s, 1s, 2s].
  238. The long retry algorithm makes 11 attempts, with intervals
  239. [4s, 16s, 60s, 60s, ...]
  240. Both algorithms add -20%/+40% jitter to the retry intervals.
  241. Note that the above intervals are *in addition* to the time spent
  242. waiting for the request to complete (up to `timeout` ms).
  243. NB: the long retry algorithm takes over 20 minutes to complete, with
  244. a default timeout of 60s!
  245. ignore_backoff (bool): true to ignore the historical backoff data
  246. and try the request anyway.
  247. backoff_on_404 (bool): Back off if we get a 404
  248. Returns:
  249. Deferred[twisted.web.client.Response]: resolves with the HTTP
  250. response object on success.
  251. Raises:
  252. HttpResponseException: If we get an HTTP response code >= 300
  253. (except 429).
  254. NotRetryingDestination: If we are not yet ready to retry this
  255. server.
  256. FederationDeniedError: If this destination is not on our
  257. federation whitelist
  258. RequestSendFailed: If there were problems connecting to the
  259. remote, due to e.g. DNS failures, connection timeouts etc.
  260. """
  261. if timeout:
  262. _sec_timeout = timeout / 1000
  263. else:
  264. _sec_timeout = self.default_timeout
  265. if (
  266. self.hs.config.federation_domain_whitelist is not None
  267. and request.destination not in self.hs.config.federation_domain_whitelist
  268. ):
  269. raise FederationDeniedError(request.destination)
  270. limiter = yield synapse.util.retryutils.get_retry_limiter(
  271. request.destination,
  272. self.clock,
  273. self._store,
  274. backoff_on_404=backoff_on_404,
  275. ignore_backoff=ignore_backoff,
  276. )
  277. method_bytes = request.method.encode("ascii")
  278. destination_bytes = request.destination.encode("ascii")
  279. path_bytes = request.path.encode("ascii")
  280. if request.query:
  281. query_bytes = encode_query_args(request.query)
  282. else:
  283. query_bytes = b""
  284. scope = start_active_span(
  285. "outgoing-federation-request",
  286. tags={
  287. tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
  288. tags.PEER_ADDRESS: request.destination,
  289. tags.HTTP_METHOD: request.method,
  290. tags.HTTP_URL: request.path,
  291. },
  292. finish_on_close=True,
  293. )
  294. # Inject the span into the headers
  295. headers_dict = {}
  296. inject_active_span_byte_dict(headers_dict, request.destination)
  297. headers_dict[b"User-Agent"] = [self.version_string_bytes]
  298. with limiter, scope:
  299. # XXX: Would be much nicer to retry only at the transaction-layer
  300. # (once we have reliable transactions in place)
  301. if long_retries:
  302. retries_left = MAX_LONG_RETRIES
  303. else:
  304. retries_left = MAX_SHORT_RETRIES
  305. url_bytes = urllib.parse.urlunparse(
  306. (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
  307. )
  308. url_str = url_bytes.decode("ascii")
  309. url_to_sign_bytes = urllib.parse.urlunparse(
  310. (b"", b"", path_bytes, None, query_bytes, b"")
  311. )
  312. while True:
  313. try:
  314. json = request.get_json()
  315. if json:
  316. headers_dict[b"Content-Type"] = [b"application/json"]
  317. auth_headers = self.build_auth_headers(
  318. destination_bytes, method_bytes, url_to_sign_bytes, json
  319. )
  320. data = encode_canonical_json(json)
  321. producer = QuieterFileBodyProducer(
  322. BytesIO(data), cooperator=self._cooperator
  323. )
  324. else:
  325. producer = None
  326. auth_headers = self.build_auth_headers(
  327. destination_bytes, method_bytes, url_to_sign_bytes
  328. )
  329. headers_dict[b"Authorization"] = auth_headers
  330. logger.info(
  331. "{%s} [%s] Sending request: %s %s; timeout %fs",
  332. request.txn_id,
  333. request.destination,
  334. request.method,
  335. url_str,
  336. _sec_timeout,
  337. )
  338. outgoing_requests_counter.labels(request.method).inc()
  339. try:
  340. with Measure(self.clock, "outbound_request"):
  341. # we don't want all the fancy cookie and redirect handling
  342. # that treq.request gives: just use the raw Agent.
  343. request_deferred = self.agent.request(
  344. method_bytes,
  345. url_bytes,
  346. headers=Headers(headers_dict),
  347. bodyProducer=producer,
  348. )
  349. request_deferred = timeout_deferred(
  350. request_deferred,
  351. timeout=_sec_timeout,
  352. reactor=self.reactor,
  353. )
  354. response = yield request_deferred
  355. except TimeoutError as e:
  356. raise RequestSendFailed(e, can_retry=True) from e
  357. except DNSLookupError as e:
  358. raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
  359. except Exception as e:
  360. logger.info("Failed to send request: %s", e)
  361. raise RequestSendFailed(e, can_retry=True) from e
  362. incoming_responses_counter.labels(
  363. request.method, response.code
  364. ).inc()
  365. set_tag(tags.HTTP_STATUS_CODE, response.code)
  366. if 200 <= response.code < 300:
  367. logger.debug(
  368. "{%s} [%s] Got response headers: %d %s",
  369. request.txn_id,
  370. request.destination,
  371. response.code,
  372. response.phrase.decode("ascii", errors="replace"),
  373. )
  374. pass
  375. else:
  376. logger.info(
  377. "{%s} [%s] Got response headers: %d %s",
  378. request.txn_id,
  379. request.destination,
  380. response.code,
  381. response.phrase.decode("ascii", errors="replace"),
  382. )
  383. # :'(
  384. # Update transactions table?
  385. d = treq.content(response)
  386. d = timeout_deferred(
  387. d, timeout=_sec_timeout, reactor=self.reactor
  388. )
  389. try:
  390. body = yield make_deferred_yieldable(d)
  391. except Exception as e:
  392. # Eh, we're already going to raise an exception so lets
  393. # ignore if this fails.
  394. logger.warning(
  395. "{%s} [%s] Failed to get error response: %s %s: %s",
  396. request.txn_id,
  397. request.destination,
  398. request.method,
  399. url_str,
  400. _flatten_response_never_received(e),
  401. )
  402. body = None
  403. e = HttpResponseException(response.code, response.phrase, body)
  404. # Retry if the error is a 429 (Too Many Requests),
  405. # otherwise just raise a standard HttpResponseException
  406. if response.code == 429:
  407. raise RequestSendFailed(e, can_retry=True) from e
  408. else:
  409. raise e
  410. break
  411. except RequestSendFailed as e:
  412. logger.warning(
  413. "{%s} [%s] Request failed: %s %s: %s",
  414. request.txn_id,
  415. request.destination,
  416. request.method,
  417. url_str,
  418. _flatten_response_never_received(e.inner_exception),
  419. )
  420. if not e.can_retry:
  421. raise
  422. if retries_left and not timeout:
  423. if long_retries:
  424. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  425. delay = min(delay, 60)
  426. delay *= random.uniform(0.8, 1.4)
  427. else:
  428. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  429. delay = min(delay, 2)
  430. delay *= random.uniform(0.8, 1.4)
  431. logger.debug(
  432. "{%s} [%s] Waiting %ss before re-sending...",
  433. request.txn_id,
  434. request.destination,
  435. delay,
  436. )
  437. yield self.clock.sleep(delay)
  438. retries_left -= 1
  439. else:
  440. raise
  441. except Exception as e:
  442. logger.warning(
  443. "{%s} [%s] Request failed: %s %s: %s",
  444. request.txn_id,
  445. request.destination,
  446. request.method,
  447. url_str,
  448. _flatten_response_never_received(e),
  449. )
  450. raise
  451. return response
  452. def build_auth_headers(
  453. self, destination, method, url_bytes, content=None, destination_is=None
  454. ):
  455. """
  456. Builds the Authorization headers for a federation request
  457. Args:
  458. destination (bytes|None): The desination homeserver of the request.
  459. May be None if the destination is an identity server, in which case
  460. destination_is must be non-None.
  461. method (bytes): The HTTP method of the request
  462. url_bytes (bytes): The URI path of the request
  463. content (object): The body of the request
  464. destination_is (bytes): As 'destination', but if the destination is an
  465. identity server
  466. Returns:
  467. list[bytes]: a list of headers to be added as "Authorization:" headers
  468. """
  469. request = {"method": method, "uri": url_bytes, "origin": self.server_name}
  470. if destination is not None:
  471. request["destination"] = destination
  472. if destination_is is not None:
  473. request["destination_is"] = destination_is
  474. if content is not None:
  475. request["content"] = content
  476. request = sign_json(request, self.server_name, self.signing_key)
  477. auth_headers = []
  478. for key, sig in request["signatures"][self.server_name].items():
  479. auth_headers.append(
  480. (
  481. 'X-Matrix origin=%s,key="%s",sig="%s"'
  482. % (self.server_name, key, sig)
  483. ).encode("ascii")
  484. )
  485. return auth_headers
  486. @defer.inlineCallbacks
  487. def put_json(
  488. self,
  489. destination,
  490. path,
  491. args={},
  492. data={},
  493. json_data_callback=None,
  494. long_retries=False,
  495. timeout=None,
  496. ignore_backoff=False,
  497. backoff_on_404=False,
  498. try_trailing_slash_on_400=False,
  499. ):
  500. """ Sends the specifed json data using PUT
  501. Args:
  502. destination (str): The remote server to send the HTTP request
  503. to.
  504. path (str): The HTTP path.
  505. args (dict): query params
  506. data (dict): A dict containing the data that will be used as
  507. the request body. This will be encoded as JSON.
  508. json_data_callback (callable): A callable returning the dict to
  509. use as the request body.
  510. long_retries (bool): whether to use the long retry algorithm. See
  511. docs on _send_request for details.
  512. timeout (int|None): number of milliseconds to wait for the response headers
  513. (including connecting to the server), *for each attempt*.
  514. self._default_timeout (60s) by default.
  515. ignore_backoff (bool): true to ignore the historical backoff data
  516. and try the request anyway.
  517. backoff_on_404 (bool): True if we should count a 404 response as
  518. a failure of the server (and should therefore back off future
  519. requests).
  520. try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
  521. response we should try appending a trailing slash to the end
  522. of the request. Workaround for #3622 in Synapse <= v0.99.3. This
  523. will be attempted before backing off if backing off has been
  524. enabled.
  525. Returns:
  526. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  527. result will be the decoded JSON body.
  528. Raises:
  529. HttpResponseException: If we get an HTTP response code >= 300
  530. (except 429).
  531. NotRetryingDestination: If we are not yet ready to retry this
  532. server.
  533. FederationDeniedError: If this destination is not on our
  534. federation whitelist
  535. RequestSendFailed: If there were problems connecting to the
  536. remote, due to e.g. DNS failures, connection timeouts etc.
  537. """
  538. request = MatrixFederationRequest(
  539. method="PUT",
  540. destination=destination,
  541. path=path,
  542. query=args,
  543. json_callback=json_data_callback,
  544. json=data,
  545. )
  546. response = yield self._send_request_with_optional_trailing_slash(
  547. request,
  548. try_trailing_slash_on_400,
  549. backoff_on_404=backoff_on_404,
  550. ignore_backoff=ignore_backoff,
  551. long_retries=long_retries,
  552. timeout=timeout,
  553. )
  554. body = yield _handle_json_response(
  555. self.reactor, self.default_timeout, request, response
  556. )
  557. return body
  558. @defer.inlineCallbacks
  559. def post_json(
  560. self,
  561. destination,
  562. path,
  563. data={},
  564. long_retries=False,
  565. timeout=None,
  566. ignore_backoff=False,
  567. args={},
  568. ):
  569. """ Sends the specifed json data using POST
  570. Args:
  571. destination (str): The remote server to send the HTTP request
  572. to.
  573. path (str): The HTTP path.
  574. data (dict): A dict containing the data that will be used as
  575. the request body. This will be encoded as JSON.
  576. long_retries (bool): whether to use the long retry algorithm. See
  577. docs on _send_request for details.
  578. timeout (int|None): number of milliseconds to wait for the response headers
  579. (including connecting to the server), *for each attempt*.
  580. self._default_timeout (60s) by default.
  581. ignore_backoff (bool): true to ignore the historical backoff data and
  582. try the request anyway.
  583. args (dict): query params
  584. Returns:
  585. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  586. result will be the decoded JSON body.
  587. Raises:
  588. HttpResponseException: If we get an HTTP response code >= 300
  589. (except 429).
  590. NotRetryingDestination: If we are not yet ready to retry this
  591. server.
  592. FederationDeniedError: If this destination is not on our
  593. federation whitelist
  594. RequestSendFailed: If there were problems connecting to the
  595. remote, due to e.g. DNS failures, connection timeouts etc.
  596. """
  597. request = MatrixFederationRequest(
  598. method="POST", destination=destination, path=path, query=args, json=data
  599. )
  600. response = yield self._send_request(
  601. request,
  602. long_retries=long_retries,
  603. timeout=timeout,
  604. ignore_backoff=ignore_backoff,
  605. )
  606. if timeout:
  607. _sec_timeout = timeout / 1000
  608. else:
  609. _sec_timeout = self.default_timeout
  610. body = yield _handle_json_response(
  611. self.reactor, _sec_timeout, request, response
  612. )
  613. return body
  614. @defer.inlineCallbacks
  615. def get_json(
  616. self,
  617. destination,
  618. path,
  619. args=None,
  620. retry_on_dns_fail=True,
  621. timeout=None,
  622. ignore_backoff=False,
  623. try_trailing_slash_on_400=False,
  624. ):
  625. """ GETs some json from the given host homeserver and path
  626. Args:
  627. destination (str): The remote server to send the HTTP request
  628. to.
  629. path (str): The HTTP path.
  630. args (dict|None): A dictionary used to create query strings, defaults to
  631. None.
  632. timeout (int|None): number of milliseconds to wait for the response headers
  633. (including connecting to the server), *for each attempt*.
  634. self._default_timeout (60s) by default.
  635. ignore_backoff (bool): true to ignore the historical backoff data
  636. and try the request anyway.
  637. try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
  638. response we should try appending a trailing slash to the end of
  639. the request. Workaround for #3622 in Synapse <= v0.99.3.
  640. Returns:
  641. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  642. result will be the decoded JSON body.
  643. Raises:
  644. HttpResponseException: If we get an HTTP response code >= 300
  645. (except 429).
  646. NotRetryingDestination: If we are not yet ready to retry this
  647. server.
  648. FederationDeniedError: If this destination is not on our
  649. federation whitelist
  650. RequestSendFailed: If there were problems connecting to the
  651. remote, due to e.g. DNS failures, connection timeouts etc.
  652. """
  653. request = MatrixFederationRequest(
  654. method="GET", destination=destination, path=path, query=args
  655. )
  656. response = yield self._send_request_with_optional_trailing_slash(
  657. request,
  658. try_trailing_slash_on_400,
  659. backoff_on_404=False,
  660. ignore_backoff=ignore_backoff,
  661. retry_on_dns_fail=retry_on_dns_fail,
  662. timeout=timeout,
  663. )
  664. body = yield _handle_json_response(
  665. self.reactor, self.default_timeout, request, response
  666. )
  667. return body
  668. @defer.inlineCallbacks
  669. def delete_json(
  670. self,
  671. destination,
  672. path,
  673. long_retries=False,
  674. timeout=None,
  675. ignore_backoff=False,
  676. args={},
  677. ):
  678. """Send a DELETE request to the remote expecting some json response
  679. Args:
  680. destination (str): The remote server to send the HTTP request
  681. to.
  682. path (str): The HTTP path.
  683. long_retries (bool): whether to use the long retry algorithm. See
  684. docs on _send_request for details.
  685. timeout (int|None): number of milliseconds to wait for the response headers
  686. (including connecting to the server), *for each attempt*.
  687. self._default_timeout (60s) by default.
  688. ignore_backoff (bool): true to ignore the historical backoff data and
  689. try the request anyway.
  690. args (dict): query params
  691. Returns:
  692. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  693. result will be the decoded JSON body.
  694. Raises:
  695. HttpResponseException: If we get an HTTP response code >= 300
  696. (except 429).
  697. NotRetryingDestination: If we are not yet ready to retry this
  698. server.
  699. FederationDeniedError: If this destination is not on our
  700. federation whitelist
  701. RequestSendFailed: If there were problems connecting to the
  702. remote, due to e.g. DNS failures, connection timeouts etc.
  703. """
  704. request = MatrixFederationRequest(
  705. method="DELETE", destination=destination, path=path, query=args
  706. )
  707. response = yield self._send_request(
  708. request,
  709. long_retries=long_retries,
  710. timeout=timeout,
  711. ignore_backoff=ignore_backoff,
  712. )
  713. body = yield _handle_json_response(
  714. self.reactor, self.default_timeout, request, response
  715. )
  716. return body
  717. @defer.inlineCallbacks
  718. def get_file(
  719. self,
  720. destination,
  721. path,
  722. output_stream,
  723. args={},
  724. retry_on_dns_fail=True,
  725. max_size=None,
  726. ignore_backoff=False,
  727. ):
  728. """GETs a file from a given homeserver
  729. Args:
  730. destination (str): The remote server to send the HTTP request to.
  731. path (str): The HTTP path to GET.
  732. output_stream (file): File to write the response body to.
  733. args (dict): Optional dictionary used to create the query string.
  734. ignore_backoff (bool): true to ignore the historical backoff data
  735. and try the request anyway.
  736. Returns:
  737. Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of
  738. the file length and a dict of the response headers.
  739. Raises:
  740. HttpResponseException: If we get an HTTP response code >= 300
  741. (except 429).
  742. NotRetryingDestination: If we are not yet ready to retry this
  743. server.
  744. FederationDeniedError: If this destination is not on our
  745. federation whitelist
  746. RequestSendFailed: If there were problems connecting to the
  747. remote, due to e.g. DNS failures, connection timeouts etc.
  748. """
  749. request = MatrixFederationRequest(
  750. method="GET", destination=destination, path=path, query=args
  751. )
  752. response = yield self._send_request(
  753. request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
  754. )
  755. headers = dict(response.headers.getAllRawHeaders())
  756. try:
  757. d = _readBodyToFile(response, output_stream, max_size)
  758. d.addTimeout(self.default_timeout, self.reactor)
  759. length = yield make_deferred_yieldable(d)
  760. except Exception as e:
  761. logger.warning(
  762. "{%s} [%s] Error reading response: %s",
  763. request.txn_id,
  764. request.destination,
  765. e,
  766. )
  767. raise
  768. logger.info(
  769. "{%s} [%s] Completed: %d %s [%d bytes]",
  770. request.txn_id,
  771. request.destination,
  772. response.code,
  773. response.phrase.decode("ascii", errors="replace"),
  774. length,
  775. )
  776. return (length, headers)
  777. class _ReadBodyToFileProtocol(protocol.Protocol):
  778. def __init__(self, stream, deferred, max_size):
  779. self.stream = stream
  780. self.deferred = deferred
  781. self.length = 0
  782. self.max_size = max_size
  783. def dataReceived(self, data):
  784. self.stream.write(data)
  785. self.length += len(data)
  786. if self.max_size is not None and self.length >= self.max_size:
  787. self.deferred.errback(
  788. SynapseError(
  789. 502,
  790. "Requested file is too large > %r bytes" % (self.max_size,),
  791. Codes.TOO_LARGE,
  792. )
  793. )
  794. self.deferred = defer.Deferred()
  795. self.transport.loseConnection()
  796. def connectionLost(self, reason):
  797. if reason.check(ResponseDone):
  798. self.deferred.callback(self.length)
  799. else:
  800. self.deferred.errback(reason)
  801. def _readBodyToFile(response, stream, max_size):
  802. d = defer.Deferred()
  803. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  804. return d
  805. def _flatten_response_never_received(e):
  806. if hasattr(e, "reasons"):
  807. reasons = ", ".join(
  808. _flatten_response_never_received(f.value) for f in e.reasons
  809. )
  810. return "%s:[%s]" % (type(e).__name__, reasons)
  811. else:
  812. return repr(e)
  813. def check_content_type_is_json(headers):
  814. """
  815. Check that a set of HTTP headers have a Content-Type header, and that it
  816. is application/json.
  817. Args:
  818. headers (twisted.web.http_headers.Headers): headers to check
  819. Raises:
  820. RequestSendFailed: if the Content-Type header is missing or isn't JSON
  821. """
  822. c_type = headers.getRawHeaders(b"Content-Type")
  823. if c_type is None:
  824. raise RequestSendFailed(RuntimeError("No Content-Type header"), can_retry=False)
  825. c_type = c_type[0].decode("ascii") # only the first header
  826. val, options = cgi.parse_header(c_type)
  827. if val != "application/json":
  828. raise RequestSendFailed(
  829. RuntimeError("Content-Type not application/json: was '%s'" % c_type),
  830. can_retry=False,
  831. )
  832. def encode_query_args(args):
  833. if args is None:
  834. return b""
  835. encoded_args = {}
  836. for k, vs in args.items():
  837. if isinstance(vs, str):
  838. vs = [vs]
  839. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  840. query_bytes = urllib.parse.urlencode(encoded_args, True)
  841. return query_bytes.encode("utf8")