matrixfederationclient.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import cgi
  17. import logging
  18. import random
  19. import sys
  20. from io import BytesIO
  21. from six import PY3, raise_from, string_types
  22. from six.moves import urllib
  23. import attr
  24. import treq
  25. from canonicaljson import encode_canonical_json
  26. from prometheus_client import Counter
  27. from signedjson.sign import sign_json
  28. from zope.interface import implementer
  29. from twisted.internet import defer, protocol
  30. from twisted.internet.error import DNSLookupError
  31. from twisted.internet.interfaces import IReactorPluggableNameResolver
  32. from twisted.internet.task import _EPSILON, Cooperator
  33. from twisted.web._newclient import ResponseDone
  34. from twisted.web.http_headers import Headers
  35. import synapse.logging.opentracing as opentracing
  36. import synapse.metrics
  37. import synapse.util.retryutils
  38. from synapse.api.errors import (
  39. Codes,
  40. FederationDeniedError,
  41. HttpResponseException,
  42. RequestSendFailed,
  43. SynapseError,
  44. )
  45. from synapse.http import QuieterFileBodyProducer
  46. from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver
  47. from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
  48. from synapse.logging.context import make_deferred_yieldable
  49. from synapse.util.async_helpers import timeout_deferred
  50. from synapse.util.metrics import Measure
  51. logger = logging.getLogger(__name__)
  52. outgoing_requests_counter = Counter(
  53. "synapse_http_matrixfederationclient_requests", "", ["method"]
  54. )
  55. incoming_responses_counter = Counter(
  56. "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
  57. )
  58. MAX_LONG_RETRIES = 10
  59. MAX_SHORT_RETRIES = 3
  60. if PY3:
  61. MAXINT = sys.maxsize
  62. else:
  63. MAXINT = sys.maxint
  64. _next_id = 1
  65. @attr.s
  66. class MatrixFederationRequest(object):
  67. method = attr.ib()
  68. """HTTP method
  69. :type: str
  70. """
  71. path = attr.ib()
  72. """HTTP path
  73. :type: str
  74. """
  75. destination = attr.ib()
  76. """The remote server to send the HTTP request to.
  77. :type: str"""
  78. json = attr.ib(default=None)
  79. """JSON to send in the body.
  80. :type: dict|None
  81. """
  82. json_callback = attr.ib(default=None)
  83. """A callback to generate the JSON.
  84. :type: func|None
  85. """
  86. query = attr.ib(default=None)
  87. """Query arguments.
  88. :type: dict|None
  89. """
  90. txn_id = attr.ib(default=None)
  91. """Unique ID for this request (for logging)
  92. :type: str|None
  93. """
  94. def __attrs_post_init__(self):
  95. global _next_id
  96. self.txn_id = "%s-O-%s" % (self.method, _next_id)
  97. _next_id = (_next_id + 1) % (MAXINT - 1)
  98. def get_json(self):
  99. if self.json_callback:
  100. return self.json_callback()
  101. return self.json
  102. @defer.inlineCallbacks
  103. def _handle_json_response(reactor, timeout_sec, request, response):
  104. """
  105. Reads the JSON body of a response, with a timeout
  106. Args:
  107. reactor (IReactor): twisted reactor, for the timeout
  108. timeout_sec (float): number of seconds to wait for response to complete
  109. request (MatrixFederationRequest): the request that triggered the response
  110. response (IResponse): response to the request
  111. Returns:
  112. dict: parsed JSON response
  113. """
  114. try:
  115. check_content_type_is_json(response.headers)
  116. d = treq.json_content(response)
  117. d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
  118. body = yield make_deferred_yieldable(d)
  119. except Exception as e:
  120. logger.warn(
  121. "{%s} [%s] Error reading response: %s",
  122. request.txn_id,
  123. request.destination,
  124. e,
  125. )
  126. raise
  127. logger.info(
  128. "{%s} [%s] Completed: %d %s",
  129. request.txn_id,
  130. request.destination,
  131. response.code,
  132. response.phrase.decode("ascii", errors="replace"),
  133. )
  134. return body
  135. class MatrixFederationHttpClient(object):
  136. """HTTP client used to talk to other homeservers over the federation
  137. protocol. Send client certificates and signs requests.
  138. Attributes:
  139. agent (twisted.web.client.Agent): The twisted Agent used to send the
  140. requests.
  141. """
  142. def __init__(self, hs, tls_client_options_factory):
  143. self.hs = hs
  144. self.signing_key = hs.config.signing_key[0]
  145. self.server_name = hs.hostname
  146. real_reactor = hs.get_reactor()
  147. # We need to use a DNS resolver which filters out blacklisted IP
  148. # addresses, to prevent DNS rebinding.
  149. nameResolver = IPBlacklistingResolver(
  150. real_reactor, None, hs.config.federation_ip_range_blacklist
  151. )
  152. @implementer(IReactorPluggableNameResolver)
  153. class Reactor(object):
  154. def __getattr__(_self, attr):
  155. if attr == "nameResolver":
  156. return nameResolver
  157. else:
  158. return getattr(real_reactor, attr)
  159. self.reactor = Reactor()
  160. self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory)
  161. # Use a BlacklistingAgentWrapper to prevent circumventing the IP
  162. # blacklist via IP literals in server names
  163. self.agent = BlacklistingAgentWrapper(
  164. self.agent,
  165. self.reactor,
  166. ip_blacklist=hs.config.federation_ip_range_blacklist,
  167. )
  168. self.clock = hs.get_clock()
  169. self._store = hs.get_datastore()
  170. self.version_string_bytes = hs.version_string.encode("ascii")
  171. self.default_timeout = 60
  172. def schedule(x):
  173. self.reactor.callLater(_EPSILON, x)
  174. self._cooperator = Cooperator(scheduler=schedule)
  175. @defer.inlineCallbacks
  176. def _send_request_with_optional_trailing_slash(
  177. self, request, try_trailing_slash_on_400=False, **send_request_args
  178. ):
  179. """Wrapper for _send_request which can optionally retry the request
  180. upon receiving a combination of a 400 HTTP response code and a
  181. 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
  182. due to #3622.
  183. Args:
  184. request (MatrixFederationRequest): details of request to be sent
  185. try_trailing_slash_on_400 (bool): Whether on receiving a 400
  186. 'M_UNRECOGNIZED' from the server to retry the request with a
  187. trailing slash appended to the request path.
  188. send_request_args (Dict): A dictionary of arguments to pass to
  189. `_send_request()`.
  190. Raises:
  191. HttpResponseException: If we get an HTTP response code >= 300
  192. (except 429).
  193. Returns:
  194. Deferred[Dict]: Parsed JSON response body.
  195. """
  196. try:
  197. response = yield self._send_request(request, **send_request_args)
  198. except HttpResponseException as e:
  199. # Received an HTTP error > 300. Check if it meets the requirements
  200. # to retry with a trailing slash
  201. if not try_trailing_slash_on_400:
  202. raise
  203. if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
  204. raise
  205. # Retry with a trailing slash if we received a 400 with
  206. # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
  207. # trailing slash on Synapse <= v0.99.3.
  208. logger.info("Retrying request with trailing slash")
  209. request.path += "/"
  210. response = yield self._send_request(request, **send_request_args)
  211. return response
  212. @defer.inlineCallbacks
  213. def _send_request(
  214. self,
  215. request,
  216. retry_on_dns_fail=True,
  217. timeout=None,
  218. long_retries=False,
  219. ignore_backoff=False,
  220. backoff_on_404=False,
  221. ):
  222. """
  223. Sends a request to the given server.
  224. Args:
  225. request (MatrixFederationRequest): details of request to be sent
  226. timeout (int|None): number of milliseconds to wait for the response headers
  227. (including connecting to the server), *for each attempt*.
  228. 60s by default.
  229. long_retries (bool): whether to use the long retry algorithm.
  230. The regular retry algorithm makes 4 attempts, with intervals
  231. [0.5s, 1s, 2s].
  232. The long retry algorithm makes 11 attempts, with intervals
  233. [4s, 16s, 60s, 60s, ...]
  234. Both algorithms add -20%/+40% jitter to the retry intervals.
  235. Note that the above intervals are *in addition* to the time spent
  236. waiting for the request to complete (up to `timeout` ms).
  237. NB: the long retry algorithm takes over 20 minutes to complete, with
  238. a default timeout of 60s!
  239. ignore_backoff (bool): true to ignore the historical backoff data
  240. and try the request anyway.
  241. backoff_on_404 (bool): Back off if we get a 404
  242. Returns:
  243. Deferred[twisted.web.client.Response]: resolves with the HTTP
  244. response object on success.
  245. Raises:
  246. HttpResponseException: If we get an HTTP response code >= 300
  247. (except 429).
  248. NotRetryingDestination: If we are not yet ready to retry this
  249. server.
  250. FederationDeniedError: If this destination is not on our
  251. federation whitelist
  252. RequestSendFailed: If there were problems connecting to the
  253. remote, due to e.g. DNS failures, connection timeouts etc.
  254. """
  255. if timeout:
  256. _sec_timeout = timeout / 1000
  257. else:
  258. _sec_timeout = self.default_timeout
  259. if (
  260. self.hs.config.federation_domain_whitelist is not None
  261. and request.destination not in self.hs.config.federation_domain_whitelist
  262. ):
  263. raise FederationDeniedError(request.destination)
  264. limiter = yield synapse.util.retryutils.get_retry_limiter(
  265. request.destination,
  266. self.clock,
  267. self._store,
  268. backoff_on_404=backoff_on_404,
  269. ignore_backoff=ignore_backoff,
  270. )
  271. method_bytes = request.method.encode("ascii")
  272. destination_bytes = request.destination.encode("ascii")
  273. path_bytes = request.path.encode("ascii")
  274. if request.query:
  275. query_bytes = encode_query_args(request.query)
  276. else:
  277. query_bytes = b""
  278. # Retreive current span
  279. scope = opentracing.start_active_span(
  280. "outgoing-federation-request",
  281. tags={
  282. opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_CLIENT,
  283. opentracing.tags.PEER_ADDRESS: request.destination,
  284. opentracing.tags.HTTP_METHOD: request.method,
  285. opentracing.tags.HTTP_URL: request.path,
  286. },
  287. finish_on_close=True,
  288. )
  289. # Inject the span into the headers
  290. headers_dict = {}
  291. opentracing.inject_active_span_byte_dict(headers_dict, request.destination)
  292. headers_dict[b"User-Agent"] = [self.version_string_bytes]
  293. with limiter, scope:
  294. # XXX: Would be much nicer to retry only at the transaction-layer
  295. # (once we have reliable transactions in place)
  296. if long_retries:
  297. retries_left = MAX_LONG_RETRIES
  298. else:
  299. retries_left = MAX_SHORT_RETRIES
  300. url_bytes = urllib.parse.urlunparse(
  301. (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
  302. )
  303. url_str = url_bytes.decode("ascii")
  304. url_to_sign_bytes = urllib.parse.urlunparse(
  305. (b"", b"", path_bytes, None, query_bytes, b"")
  306. )
  307. while True:
  308. try:
  309. json = request.get_json()
  310. if json:
  311. headers_dict[b"Content-Type"] = [b"application/json"]
  312. auth_headers = self.build_auth_headers(
  313. destination_bytes, method_bytes, url_to_sign_bytes, json
  314. )
  315. data = encode_canonical_json(json)
  316. producer = QuieterFileBodyProducer(
  317. BytesIO(data), cooperator=self._cooperator
  318. )
  319. else:
  320. producer = None
  321. auth_headers = self.build_auth_headers(
  322. destination_bytes, method_bytes, url_to_sign_bytes
  323. )
  324. headers_dict[b"Authorization"] = auth_headers
  325. logger.info(
  326. "{%s} [%s] Sending request: %s %s; timeout %fs",
  327. request.txn_id,
  328. request.destination,
  329. request.method,
  330. url_str,
  331. _sec_timeout,
  332. )
  333. try:
  334. with Measure(self.clock, "outbound_request"):
  335. # we don't want all the fancy cookie and redirect handling
  336. # that treq.request gives: just use the raw Agent.
  337. request_deferred = self.agent.request(
  338. method_bytes,
  339. url_bytes,
  340. headers=Headers(headers_dict),
  341. bodyProducer=producer,
  342. )
  343. request_deferred = timeout_deferred(
  344. request_deferred,
  345. timeout=_sec_timeout,
  346. reactor=self.reactor,
  347. )
  348. response = yield request_deferred
  349. except DNSLookupError as e:
  350. raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
  351. except Exception as e:
  352. logger.info("Failed to send request: %s", e)
  353. raise_from(RequestSendFailed(e, can_retry=True), e)
  354. logger.info(
  355. "{%s} [%s] Got response headers: %d %s",
  356. request.txn_id,
  357. request.destination,
  358. response.code,
  359. response.phrase.decode("ascii", errors="replace"),
  360. )
  361. opentracing.set_tag(
  362. opentracing.tags.HTTP_STATUS_CODE, response.code
  363. )
  364. if 200 <= response.code < 300:
  365. pass
  366. else:
  367. # :'(
  368. # Update transactions table?
  369. d = treq.content(response)
  370. d = timeout_deferred(
  371. d, timeout=_sec_timeout, reactor=self.reactor
  372. )
  373. try:
  374. body = yield make_deferred_yieldable(d)
  375. except Exception as e:
  376. # Eh, we're already going to raise an exception so lets
  377. # ignore if this fails.
  378. logger.warn(
  379. "{%s} [%s] Failed to get error response: %s %s: %s",
  380. request.txn_id,
  381. request.destination,
  382. request.method,
  383. url_str,
  384. _flatten_response_never_received(e),
  385. )
  386. body = None
  387. e = HttpResponseException(response.code, response.phrase, body)
  388. # Retry if the error is a 429 (Too Many Requests),
  389. # otherwise just raise a standard HttpResponseException
  390. if response.code == 429:
  391. raise_from(RequestSendFailed(e, can_retry=True), e)
  392. else:
  393. raise e
  394. break
  395. except RequestSendFailed as e:
  396. logger.warn(
  397. "{%s} [%s] Request failed: %s %s: %s",
  398. request.txn_id,
  399. request.destination,
  400. request.method,
  401. url_str,
  402. _flatten_response_never_received(e.inner_exception),
  403. )
  404. if not e.can_retry:
  405. raise
  406. if retries_left and not timeout:
  407. if long_retries:
  408. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  409. delay = min(delay, 60)
  410. delay *= random.uniform(0.8, 1.4)
  411. else:
  412. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  413. delay = min(delay, 2)
  414. delay *= random.uniform(0.8, 1.4)
  415. logger.debug(
  416. "{%s} [%s] Waiting %ss before re-sending...",
  417. request.txn_id,
  418. request.destination,
  419. delay,
  420. )
  421. yield self.clock.sleep(delay)
  422. retries_left -= 1
  423. else:
  424. raise
  425. except Exception as e:
  426. logger.warn(
  427. "{%s} [%s] Request failed: %s %s: %s",
  428. request.txn_id,
  429. request.destination,
  430. request.method,
  431. url_str,
  432. _flatten_response_never_received(e),
  433. )
  434. raise
  435. return response
  436. def build_auth_headers(
  437. self, destination, method, url_bytes, content=None, destination_is=None
  438. ):
  439. """
  440. Builds the Authorization headers for a federation request
  441. Args:
  442. destination (bytes|None): The desination home server of the request.
  443. May be None if the destination is an identity server, in which case
  444. destination_is must be non-None.
  445. method (bytes): The HTTP method of the request
  446. url_bytes (bytes): The URI path of the request
  447. content (object): The body of the request
  448. destination_is (bytes): As 'destination', but if the destination is an
  449. identity server
  450. Returns:
  451. list[bytes]: a list of headers to be added as "Authorization:" headers
  452. """
  453. request = {"method": method, "uri": url_bytes, "origin": self.server_name}
  454. if destination is not None:
  455. request["destination"] = destination
  456. if destination_is is not None:
  457. request["destination_is"] = destination_is
  458. if content is not None:
  459. request["content"] = content
  460. request = sign_json(request, self.server_name, self.signing_key)
  461. auth_headers = []
  462. for key, sig in request["signatures"][self.server_name].items():
  463. auth_headers.append(
  464. (
  465. 'X-Matrix origin=%s,key="%s",sig="%s"'
  466. % (self.server_name, key, sig)
  467. ).encode("ascii")
  468. )
  469. return auth_headers
  470. @defer.inlineCallbacks
  471. def put_json(
  472. self,
  473. destination,
  474. path,
  475. args={},
  476. data={},
  477. json_data_callback=None,
  478. long_retries=False,
  479. timeout=None,
  480. ignore_backoff=False,
  481. backoff_on_404=False,
  482. try_trailing_slash_on_400=False,
  483. ):
  484. """ Sends the specifed json data using PUT
  485. Args:
  486. destination (str): The remote server to send the HTTP request
  487. to.
  488. path (str): The HTTP path.
  489. args (dict): query params
  490. data (dict): A dict containing the data that will be used as
  491. the request body. This will be encoded as JSON.
  492. json_data_callback (callable): A callable returning the dict to
  493. use as the request body.
  494. long_retries (bool): whether to use the long retry algorithm. See
  495. docs on _send_request for details.
  496. timeout (int|None): number of milliseconds to wait for the response headers
  497. (including connecting to the server), *for each attempt*.
  498. self._default_timeout (60s) by default.
  499. ignore_backoff (bool): true to ignore the historical backoff data
  500. and try the request anyway.
  501. backoff_on_404 (bool): True if we should count a 404 response as
  502. a failure of the server (and should therefore back off future
  503. requests).
  504. try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
  505. response we should try appending a trailing slash to the end
  506. of the request. Workaround for #3622 in Synapse <= v0.99.3. This
  507. will be attempted before backing off if backing off has been
  508. enabled.
  509. Returns:
  510. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  511. result will be the decoded JSON body.
  512. Raises:
  513. HttpResponseException: If we get an HTTP response code >= 300
  514. (except 429).
  515. NotRetryingDestination: If we are not yet ready to retry this
  516. server.
  517. FederationDeniedError: If this destination is not on our
  518. federation whitelist
  519. RequestSendFailed: If there were problems connecting to the
  520. remote, due to e.g. DNS failures, connection timeouts etc.
  521. """
  522. request = MatrixFederationRequest(
  523. method="PUT",
  524. destination=destination,
  525. path=path,
  526. query=args,
  527. json_callback=json_data_callback,
  528. json=data,
  529. )
  530. response = yield self._send_request_with_optional_trailing_slash(
  531. request,
  532. try_trailing_slash_on_400,
  533. backoff_on_404=backoff_on_404,
  534. ignore_backoff=ignore_backoff,
  535. long_retries=long_retries,
  536. timeout=timeout,
  537. )
  538. body = yield _handle_json_response(
  539. self.reactor, self.default_timeout, request, response
  540. )
  541. return body
  542. @defer.inlineCallbacks
  543. def post_json(
  544. self,
  545. destination,
  546. path,
  547. data={},
  548. long_retries=False,
  549. timeout=None,
  550. ignore_backoff=False,
  551. args={},
  552. ):
  553. """ Sends the specifed json data using POST
  554. Args:
  555. destination (str): The remote server to send the HTTP request
  556. to.
  557. path (str): The HTTP path.
  558. data (dict): A dict containing the data that will be used as
  559. the request body. This will be encoded as JSON.
  560. long_retries (bool): whether to use the long retry algorithm. See
  561. docs on _send_request for details.
  562. timeout (int|None): number of milliseconds to wait for the response headers
  563. (including connecting to the server), *for each attempt*.
  564. self._default_timeout (60s) by default.
  565. ignore_backoff (bool): true to ignore the historical backoff data and
  566. try the request anyway.
  567. args (dict): query params
  568. Returns:
  569. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  570. result will be the decoded JSON body.
  571. Raises:
  572. HttpResponseException: If we get an HTTP response code >= 300
  573. (except 429).
  574. NotRetryingDestination: If we are not yet ready to retry this
  575. server.
  576. FederationDeniedError: If this destination is not on our
  577. federation whitelist
  578. RequestSendFailed: If there were problems connecting to the
  579. remote, due to e.g. DNS failures, connection timeouts etc.
  580. """
  581. request = MatrixFederationRequest(
  582. method="POST", destination=destination, path=path, query=args, json=data
  583. )
  584. response = yield self._send_request(
  585. request,
  586. long_retries=long_retries,
  587. timeout=timeout,
  588. ignore_backoff=ignore_backoff,
  589. )
  590. if timeout:
  591. _sec_timeout = timeout / 1000
  592. else:
  593. _sec_timeout = self.default_timeout
  594. body = yield _handle_json_response(
  595. self.reactor, _sec_timeout, request, response
  596. )
  597. return body
  598. @defer.inlineCallbacks
  599. def get_json(
  600. self,
  601. destination,
  602. path,
  603. args=None,
  604. retry_on_dns_fail=True,
  605. timeout=None,
  606. ignore_backoff=False,
  607. try_trailing_slash_on_400=False,
  608. ):
  609. """ GETs some json from the given host homeserver and path
  610. Args:
  611. destination (str): The remote server to send the HTTP request
  612. to.
  613. path (str): The HTTP path.
  614. args (dict|None): A dictionary used to create query strings, defaults to
  615. None.
  616. timeout (int|None): number of milliseconds to wait for the response headers
  617. (including connecting to the server), *for each attempt*.
  618. self._default_timeout (60s) by default.
  619. ignore_backoff (bool): true to ignore the historical backoff data
  620. and try the request anyway.
  621. try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
  622. response we should try appending a trailing slash to the end of
  623. the request. Workaround for #3622 in Synapse <= v0.99.3.
  624. Returns:
  625. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  626. result will be the decoded JSON body.
  627. Raises:
  628. HttpResponseException: If we get an HTTP response code >= 300
  629. (except 429).
  630. NotRetryingDestination: If we are not yet ready to retry this
  631. server.
  632. FederationDeniedError: If this destination is not on our
  633. federation whitelist
  634. RequestSendFailed: If there were problems connecting to the
  635. remote, due to e.g. DNS failures, connection timeouts etc.
  636. """
  637. request = MatrixFederationRequest(
  638. method="GET", destination=destination, path=path, query=args
  639. )
  640. response = yield self._send_request_with_optional_trailing_slash(
  641. request,
  642. try_trailing_slash_on_400,
  643. backoff_on_404=False,
  644. ignore_backoff=ignore_backoff,
  645. retry_on_dns_fail=retry_on_dns_fail,
  646. timeout=timeout,
  647. )
  648. body = yield _handle_json_response(
  649. self.reactor, self.default_timeout, request, response
  650. )
  651. return body
  652. @defer.inlineCallbacks
  653. def delete_json(
  654. self,
  655. destination,
  656. path,
  657. long_retries=False,
  658. timeout=None,
  659. ignore_backoff=False,
  660. args={},
  661. ):
  662. """Send a DELETE request to the remote expecting some json response
  663. Args:
  664. destination (str): The remote server to send the HTTP request
  665. to.
  666. path (str): The HTTP path.
  667. long_retries (bool): whether to use the long retry algorithm. See
  668. docs on _send_request for details.
  669. timeout (int|None): number of milliseconds to wait for the response headers
  670. (including connecting to the server), *for each attempt*.
  671. self._default_timeout (60s) by default.
  672. ignore_backoff (bool): true to ignore the historical backoff data and
  673. try the request anyway.
  674. args (dict): query params
  675. Returns:
  676. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  677. result will be the decoded JSON body.
  678. Raises:
  679. HttpResponseException: If we get an HTTP response code >= 300
  680. (except 429).
  681. NotRetryingDestination: If we are not yet ready to retry this
  682. server.
  683. FederationDeniedError: If this destination is not on our
  684. federation whitelist
  685. RequestSendFailed: If there were problems connecting to the
  686. remote, due to e.g. DNS failures, connection timeouts etc.
  687. """
  688. request = MatrixFederationRequest(
  689. method="DELETE", destination=destination, path=path, query=args
  690. )
  691. response = yield self._send_request(
  692. request,
  693. long_retries=long_retries,
  694. timeout=timeout,
  695. ignore_backoff=ignore_backoff,
  696. )
  697. body = yield _handle_json_response(
  698. self.reactor, self.default_timeout, request, response
  699. )
  700. return body
  701. @defer.inlineCallbacks
  702. def get_file(
  703. self,
  704. destination,
  705. path,
  706. output_stream,
  707. args={},
  708. retry_on_dns_fail=True,
  709. max_size=None,
  710. ignore_backoff=False,
  711. ):
  712. """GETs a file from a given homeserver
  713. Args:
  714. destination (str): The remote server to send the HTTP request to.
  715. path (str): The HTTP path to GET.
  716. output_stream (file): File to write the response body to.
  717. args (dict): Optional dictionary used to create the query string.
  718. ignore_backoff (bool): true to ignore the historical backoff data
  719. and try the request anyway.
  720. Returns:
  721. Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of
  722. the file length and a dict of the response headers.
  723. Raises:
  724. HttpResponseException: If we get an HTTP response code >= 300
  725. (except 429).
  726. NotRetryingDestination: If we are not yet ready to retry this
  727. server.
  728. FederationDeniedError: If this destination is not on our
  729. federation whitelist
  730. RequestSendFailed: If there were problems connecting to the
  731. remote, due to e.g. DNS failures, connection timeouts etc.
  732. """
  733. request = MatrixFederationRequest(
  734. method="GET", destination=destination, path=path, query=args
  735. )
  736. response = yield self._send_request(
  737. request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
  738. )
  739. headers = dict(response.headers.getAllRawHeaders())
  740. try:
  741. d = _readBodyToFile(response, output_stream, max_size)
  742. d.addTimeout(self.default_timeout, self.reactor)
  743. length = yield make_deferred_yieldable(d)
  744. except Exception as e:
  745. logger.warn(
  746. "{%s} [%s] Error reading response: %s",
  747. request.txn_id,
  748. request.destination,
  749. e,
  750. )
  751. raise
  752. logger.info(
  753. "{%s} [%s] Completed: %d %s [%d bytes]",
  754. request.txn_id,
  755. request.destination,
  756. response.code,
  757. response.phrase.decode("ascii", errors="replace"),
  758. length,
  759. )
  760. return (length, headers)
  761. class _ReadBodyToFileProtocol(protocol.Protocol):
  762. def __init__(self, stream, deferred, max_size):
  763. self.stream = stream
  764. self.deferred = deferred
  765. self.length = 0
  766. self.max_size = max_size
  767. def dataReceived(self, data):
  768. self.stream.write(data)
  769. self.length += len(data)
  770. if self.max_size is not None and self.length >= self.max_size:
  771. self.deferred.errback(
  772. SynapseError(
  773. 502,
  774. "Requested file is too large > %r bytes" % (self.max_size,),
  775. Codes.TOO_LARGE,
  776. )
  777. )
  778. self.deferred = defer.Deferred()
  779. self.transport.loseConnection()
  780. def connectionLost(self, reason):
  781. if reason.check(ResponseDone):
  782. self.deferred.callback(self.length)
  783. else:
  784. self.deferred.errback(reason)
  785. def _readBodyToFile(response, stream, max_size):
  786. d = defer.Deferred()
  787. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  788. return d
  789. def _flatten_response_never_received(e):
  790. if hasattr(e, "reasons"):
  791. reasons = ", ".join(
  792. _flatten_response_never_received(f.value) for f in e.reasons
  793. )
  794. return "%s:[%s]" % (type(e).__name__, reasons)
  795. else:
  796. return repr(e)
  797. def check_content_type_is_json(headers):
  798. """
  799. Check that a set of HTTP headers have a Content-Type header, and that it
  800. is application/json.
  801. Args:
  802. headers (twisted.web.http_headers.Headers): headers to check
  803. Raises:
  804. RequestSendFailed: if the Content-Type header is missing or isn't JSON
  805. """
  806. c_type = headers.getRawHeaders(b"Content-Type")
  807. if c_type is None:
  808. raise RequestSendFailed(RuntimeError("No Content-Type header"), can_retry=False)
  809. c_type = c_type[0].decode("ascii") # only the first header
  810. val, options = cgi.parse_header(c_type)
  811. if val != "application/json":
  812. raise RequestSendFailed(
  813. RuntimeError("Content-Type not application/json: was '%s'" % c_type),
  814. can_retry=False,
  815. )
  816. def encode_query_args(args):
  817. if args is None:
  818. return b""
  819. encoded_args = {}
  820. for k, vs in args.items():
  821. if isinstance(vs, string_types):
  822. vs = [vs]
  823. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  824. query_bytes = urllib.parse.urlencode(encoded_args, True)
  825. return query_bytes.encode("utf8")