matrixfederationclient.py 34 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import cgi
  17. import logging
  18. import random
  19. import sys
  20. from io import BytesIO
  21. from six import PY3, raise_from, string_types
  22. from six.moves import urllib
  23. import attr
  24. import treq
  25. from canonicaljson import encode_canonical_json
  26. from prometheus_client import Counter
  27. from signedjson.sign import sign_json
  28. from zope.interface import implementer
  29. from twisted.internet import defer, protocol
  30. from twisted.internet.error import DNSLookupError
  31. from twisted.internet.interfaces import IReactorPluggableNameResolver
  32. from twisted.internet.task import _EPSILON, Cooperator
  33. from twisted.web._newclient import ResponseDone
  34. from twisted.web.http_headers import Headers
  35. import synapse.metrics
  36. import synapse.util.retryutils
  37. from synapse.api.errors import (
  38. Codes,
  39. FederationDeniedError,
  40. HttpResponseException,
  41. RequestSendFailed,
  42. SynapseError,
  43. )
  44. from synapse.http import QuieterFileBodyProducer
  45. from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver
  46. from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
  47. from synapse.logging.context import make_deferred_yieldable
  48. from synapse.logging.opentracing import (
  49. inject_active_span_byte_dict,
  50. set_tag,
  51. start_active_span,
  52. tags,
  53. )
  54. from synapse.util.async_helpers import timeout_deferred
  55. from synapse.util.metrics import Measure
  56. logger = logging.getLogger(__name__)
  57. outgoing_requests_counter = Counter(
  58. "synapse_http_matrixfederationclient_requests", "", ["method"]
  59. )
  60. incoming_responses_counter = Counter(
  61. "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
  62. )
  63. MAX_LONG_RETRIES = 10
  64. MAX_SHORT_RETRIES = 3
  65. if PY3:
  66. MAXINT = sys.maxsize
  67. else:
  68. MAXINT = sys.maxint
  69. _next_id = 1
  70. @attr.s
  71. class MatrixFederationRequest(object):
  72. method = attr.ib()
  73. """HTTP method
  74. :type: str
  75. """
  76. path = attr.ib()
  77. """HTTP path
  78. :type: str
  79. """
  80. destination = attr.ib()
  81. """The remote server to send the HTTP request to.
  82. :type: str"""
  83. json = attr.ib(default=None)
  84. """JSON to send in the body.
  85. :type: dict|None
  86. """
  87. json_callback = attr.ib(default=None)
  88. """A callback to generate the JSON.
  89. :type: func|None
  90. """
  91. query = attr.ib(default=None)
  92. """Query arguments.
  93. :type: dict|None
  94. """
  95. txn_id = attr.ib(default=None)
  96. """Unique ID for this request (for logging)
  97. :type: str|None
  98. """
  99. def __attrs_post_init__(self):
  100. global _next_id
  101. self.txn_id = "%s-O-%s" % (self.method, _next_id)
  102. _next_id = (_next_id + 1) % (MAXINT - 1)
  103. def get_json(self):
  104. if self.json_callback:
  105. return self.json_callback()
  106. return self.json
  107. @defer.inlineCallbacks
  108. def _handle_json_response(reactor, timeout_sec, request, response):
  109. """
  110. Reads the JSON body of a response, with a timeout
  111. Args:
  112. reactor (IReactor): twisted reactor, for the timeout
  113. timeout_sec (float): number of seconds to wait for response to complete
  114. request (MatrixFederationRequest): the request that triggered the response
  115. response (IResponse): response to the request
  116. Returns:
  117. dict: parsed JSON response
  118. """
  119. try:
  120. check_content_type_is_json(response.headers)
  121. d = treq.json_content(response)
  122. d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
  123. body = yield make_deferred_yieldable(d)
  124. except Exception as e:
  125. logger.warning(
  126. "{%s} [%s] Error reading response: %s",
  127. request.txn_id,
  128. request.destination,
  129. e,
  130. )
  131. raise
  132. logger.info(
  133. "{%s} [%s] Completed: %d %s",
  134. request.txn_id,
  135. request.destination,
  136. response.code,
  137. response.phrase.decode("ascii", errors="replace"),
  138. )
  139. return body
  140. class MatrixFederationHttpClient(object):
  141. """HTTP client used to talk to other homeservers over the federation
  142. protocol. Send client certificates and signs requests.
  143. Attributes:
  144. agent (twisted.web.client.Agent): The twisted Agent used to send the
  145. requests.
  146. """
  147. def __init__(self, hs, tls_client_options_factory):
  148. self.hs = hs
  149. self.signing_key = hs.config.signing_key[0]
  150. self.server_name = hs.hostname
  151. real_reactor = hs.get_reactor()
  152. # We need to use a DNS resolver which filters out blacklisted IP
  153. # addresses, to prevent DNS rebinding.
  154. nameResolver = IPBlacklistingResolver(
  155. real_reactor, None, hs.config.federation_ip_range_blacklist
  156. )
  157. @implementer(IReactorPluggableNameResolver)
  158. class Reactor(object):
  159. def __getattr__(_self, attr):
  160. if attr == "nameResolver":
  161. return nameResolver
  162. else:
  163. return getattr(real_reactor, attr)
  164. self.reactor = Reactor()
  165. self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory)
  166. # Use a BlacklistingAgentWrapper to prevent circumventing the IP
  167. # blacklist via IP literals in server names
  168. self.agent = BlacklistingAgentWrapper(
  169. self.agent,
  170. self.reactor,
  171. ip_blacklist=hs.config.federation_ip_range_blacklist,
  172. )
  173. self.clock = hs.get_clock()
  174. self._store = hs.get_datastore()
  175. self.version_string_bytes = hs.version_string.encode("ascii")
  176. self.default_timeout = 60
  177. def schedule(x):
  178. self.reactor.callLater(_EPSILON, x)
  179. self._cooperator = Cooperator(scheduler=schedule)
  180. @defer.inlineCallbacks
  181. def _send_request_with_optional_trailing_slash(
  182. self, request, try_trailing_slash_on_400=False, **send_request_args
  183. ):
  184. """Wrapper for _send_request which can optionally retry the request
  185. upon receiving a combination of a 400 HTTP response code and a
  186. 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
  187. due to #3622.
  188. Args:
  189. request (MatrixFederationRequest): details of request to be sent
  190. try_trailing_slash_on_400 (bool): Whether on receiving a 400
  191. 'M_UNRECOGNIZED' from the server to retry the request with a
  192. trailing slash appended to the request path.
  193. send_request_args (Dict): A dictionary of arguments to pass to
  194. `_send_request()`.
  195. Raises:
  196. HttpResponseException: If we get an HTTP response code >= 300
  197. (except 429).
  198. Returns:
  199. Deferred[Dict]: Parsed JSON response body.
  200. """
  201. try:
  202. response = yield self._send_request(request, **send_request_args)
  203. except HttpResponseException as e:
  204. # Received an HTTP error > 300. Check if it meets the requirements
  205. # to retry with a trailing slash
  206. if not try_trailing_slash_on_400:
  207. raise
  208. if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
  209. raise
  210. # Retry with a trailing slash if we received a 400 with
  211. # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
  212. # trailing slash on Synapse <= v0.99.3.
  213. logger.info("Retrying request with trailing slash")
  214. request.path += "/"
  215. response = yield self._send_request(request, **send_request_args)
  216. return response
  217. @defer.inlineCallbacks
  218. def _send_request(
  219. self,
  220. request,
  221. retry_on_dns_fail=True,
  222. timeout=None,
  223. long_retries=False,
  224. ignore_backoff=False,
  225. backoff_on_404=False,
  226. ):
  227. """
  228. Sends a request to the given server.
  229. Args:
  230. request (MatrixFederationRequest): details of request to be sent
  231. timeout (int|None): number of milliseconds to wait for the response headers
  232. (including connecting to the server), *for each attempt*.
  233. 60s by default.
  234. long_retries (bool): whether to use the long retry algorithm.
  235. The regular retry algorithm makes 4 attempts, with intervals
  236. [0.5s, 1s, 2s].
  237. The long retry algorithm makes 11 attempts, with intervals
  238. [4s, 16s, 60s, 60s, ...]
  239. Both algorithms add -20%/+40% jitter to the retry intervals.
  240. Note that the above intervals are *in addition* to the time spent
  241. waiting for the request to complete (up to `timeout` ms).
  242. NB: the long retry algorithm takes over 20 minutes to complete, with
  243. a default timeout of 60s!
  244. ignore_backoff (bool): true to ignore the historical backoff data
  245. and try the request anyway.
  246. backoff_on_404 (bool): Back off if we get a 404
  247. Returns:
  248. Deferred[twisted.web.client.Response]: resolves with the HTTP
  249. response object on success.
  250. Raises:
  251. HttpResponseException: If we get an HTTP response code >= 300
  252. (except 429).
  253. NotRetryingDestination: If we are not yet ready to retry this
  254. server.
  255. FederationDeniedError: If this destination is not on our
  256. federation whitelist
  257. RequestSendFailed: If there were problems connecting to the
  258. remote, due to e.g. DNS failures, connection timeouts etc.
  259. """
  260. if timeout:
  261. _sec_timeout = timeout / 1000
  262. else:
  263. _sec_timeout = self.default_timeout
  264. if (
  265. self.hs.config.federation_domain_whitelist is not None
  266. and request.destination not in self.hs.config.federation_domain_whitelist
  267. ):
  268. raise FederationDeniedError(request.destination)
  269. limiter = yield synapse.util.retryutils.get_retry_limiter(
  270. request.destination,
  271. self.clock,
  272. self._store,
  273. backoff_on_404=backoff_on_404,
  274. ignore_backoff=ignore_backoff,
  275. )
  276. method_bytes = request.method.encode("ascii")
  277. destination_bytes = request.destination.encode("ascii")
  278. path_bytes = request.path.encode("ascii")
  279. if request.query:
  280. query_bytes = encode_query_args(request.query)
  281. else:
  282. query_bytes = b""
  283. scope = start_active_span(
  284. "outgoing-federation-request",
  285. tags={
  286. tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
  287. tags.PEER_ADDRESS: request.destination,
  288. tags.HTTP_METHOD: request.method,
  289. tags.HTTP_URL: request.path,
  290. },
  291. finish_on_close=True,
  292. )
  293. # Inject the span into the headers
  294. headers_dict = {}
  295. inject_active_span_byte_dict(headers_dict, request.destination)
  296. headers_dict[b"User-Agent"] = [self.version_string_bytes]
  297. with limiter, scope:
  298. # XXX: Would be much nicer to retry only at the transaction-layer
  299. # (once we have reliable transactions in place)
  300. if long_retries:
  301. retries_left = MAX_LONG_RETRIES
  302. else:
  303. retries_left = MAX_SHORT_RETRIES
  304. url_bytes = urllib.parse.urlunparse(
  305. (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
  306. )
  307. url_str = url_bytes.decode("ascii")
  308. url_to_sign_bytes = urllib.parse.urlunparse(
  309. (b"", b"", path_bytes, None, query_bytes, b"")
  310. )
  311. while True:
  312. try:
  313. json = request.get_json()
  314. if json:
  315. headers_dict[b"Content-Type"] = [b"application/json"]
  316. auth_headers = self.build_auth_headers(
  317. destination_bytes, method_bytes, url_to_sign_bytes, json
  318. )
  319. data = encode_canonical_json(json)
  320. producer = QuieterFileBodyProducer(
  321. BytesIO(data), cooperator=self._cooperator
  322. )
  323. else:
  324. producer = None
  325. auth_headers = self.build_auth_headers(
  326. destination_bytes, method_bytes, url_to_sign_bytes
  327. )
  328. headers_dict[b"Authorization"] = auth_headers
  329. logger.info(
  330. "{%s} [%s] Sending request: %s %s; timeout %fs",
  331. request.txn_id,
  332. request.destination,
  333. request.method,
  334. url_str,
  335. _sec_timeout,
  336. )
  337. outgoing_requests_counter.labels(method_bytes).inc()
  338. try:
  339. with Measure(self.clock, "outbound_request"):
  340. # we don't want all the fancy cookie and redirect handling
  341. # that treq.request gives: just use the raw Agent.
  342. request_deferred = self.agent.request(
  343. method_bytes,
  344. url_bytes,
  345. headers=Headers(headers_dict),
  346. bodyProducer=producer,
  347. )
  348. request_deferred = timeout_deferred(
  349. request_deferred,
  350. timeout=_sec_timeout,
  351. reactor=self.reactor,
  352. )
  353. response = yield request_deferred
  354. except DNSLookupError as e:
  355. raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
  356. except Exception as e:
  357. logger.info("Failed to send request: %s", e)
  358. raise_from(RequestSendFailed(e, can_retry=True), e)
  359. logger.info(
  360. "{%s} [%s] Got response headers: %d %s",
  361. request.txn_id,
  362. request.destination,
  363. response.code,
  364. response.phrase.decode("ascii", errors="replace"),
  365. )
  366. incoming_responses_counter.labels(method_bytes, response.code).inc()
  367. set_tag(tags.HTTP_STATUS_CODE, response.code)
  368. if 200 <= response.code < 300:
  369. pass
  370. else:
  371. # :'(
  372. # Update transactions table?
  373. d = treq.content(response)
  374. d = timeout_deferred(
  375. d, timeout=_sec_timeout, reactor=self.reactor
  376. )
  377. try:
  378. body = yield make_deferred_yieldable(d)
  379. except Exception as e:
  380. # Eh, we're already going to raise an exception so lets
  381. # ignore if this fails.
  382. logger.warning(
  383. "{%s} [%s] Failed to get error response: %s %s: %s",
  384. request.txn_id,
  385. request.destination,
  386. request.method,
  387. url_str,
  388. _flatten_response_never_received(e),
  389. )
  390. body = None
  391. e = HttpResponseException(response.code, response.phrase, body)
  392. # Retry if the error is a 429 (Too Many Requests),
  393. # otherwise just raise a standard HttpResponseException
  394. if response.code == 429:
  395. raise_from(RequestSendFailed(e, can_retry=True), e)
  396. else:
  397. raise e
  398. break
  399. except RequestSendFailed as e:
  400. logger.warning(
  401. "{%s} [%s] Request failed: %s %s: %s",
  402. request.txn_id,
  403. request.destination,
  404. request.method,
  405. url_str,
  406. _flatten_response_never_received(e.inner_exception),
  407. )
  408. if not e.can_retry:
  409. raise
  410. if retries_left and not timeout:
  411. if long_retries:
  412. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  413. delay = min(delay, 60)
  414. delay *= random.uniform(0.8, 1.4)
  415. else:
  416. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  417. delay = min(delay, 2)
  418. delay *= random.uniform(0.8, 1.4)
  419. logger.debug(
  420. "{%s} [%s] Waiting %ss before re-sending...",
  421. request.txn_id,
  422. request.destination,
  423. delay,
  424. )
  425. yield self.clock.sleep(delay)
  426. retries_left -= 1
  427. else:
  428. raise
  429. except Exception as e:
  430. logger.warning(
  431. "{%s} [%s] Request failed: %s %s: %s",
  432. request.txn_id,
  433. request.destination,
  434. request.method,
  435. url_str,
  436. _flatten_response_never_received(e),
  437. )
  438. raise
  439. return response
  440. def build_auth_headers(
  441. self, destination, method, url_bytes, content=None, destination_is=None
  442. ):
  443. """
  444. Builds the Authorization headers for a federation request
  445. Args:
  446. destination (bytes|None): The desination homeserver of the request.
  447. May be None if the destination is an identity server, in which case
  448. destination_is must be non-None.
  449. method (bytes): The HTTP method of the request
  450. url_bytes (bytes): The URI path of the request
  451. content (object): The body of the request
  452. destination_is (bytes): As 'destination', but if the destination is an
  453. identity server
  454. Returns:
  455. list[bytes]: a list of headers to be added as "Authorization:" headers
  456. """
  457. request = {"method": method, "uri": url_bytes, "origin": self.server_name}
  458. if destination is not None:
  459. request["destination"] = destination
  460. if destination_is is not None:
  461. request["destination_is"] = destination_is
  462. if content is not None:
  463. request["content"] = content
  464. request = sign_json(request, self.server_name, self.signing_key)
  465. auth_headers = []
  466. for key, sig in request["signatures"][self.server_name].items():
  467. auth_headers.append(
  468. (
  469. 'X-Matrix origin=%s,key="%s",sig="%s"'
  470. % (self.server_name, key, sig)
  471. ).encode("ascii")
  472. )
  473. return auth_headers
  474. @defer.inlineCallbacks
  475. def put_json(
  476. self,
  477. destination,
  478. path,
  479. args={},
  480. data={},
  481. json_data_callback=None,
  482. long_retries=False,
  483. timeout=None,
  484. ignore_backoff=False,
  485. backoff_on_404=False,
  486. try_trailing_slash_on_400=False,
  487. ):
  488. """ Sends the specifed json data using PUT
  489. Args:
  490. destination (str): The remote server to send the HTTP request
  491. to.
  492. path (str): The HTTP path.
  493. args (dict): query params
  494. data (dict): A dict containing the data that will be used as
  495. the request body. This will be encoded as JSON.
  496. json_data_callback (callable): A callable returning the dict to
  497. use as the request body.
  498. long_retries (bool): whether to use the long retry algorithm. See
  499. docs on _send_request for details.
  500. timeout (int|None): number of milliseconds to wait for the response headers
  501. (including connecting to the server), *for each attempt*.
  502. self._default_timeout (60s) by default.
  503. ignore_backoff (bool): true to ignore the historical backoff data
  504. and try the request anyway.
  505. backoff_on_404 (bool): True if we should count a 404 response as
  506. a failure of the server (and should therefore back off future
  507. requests).
  508. try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
  509. response we should try appending a trailing slash to the end
  510. of the request. Workaround for #3622 in Synapse <= v0.99.3. This
  511. will be attempted before backing off if backing off has been
  512. enabled.
  513. Returns:
  514. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  515. result will be the decoded JSON body.
  516. Raises:
  517. HttpResponseException: If we get an HTTP response code >= 300
  518. (except 429).
  519. NotRetryingDestination: If we are not yet ready to retry this
  520. server.
  521. FederationDeniedError: If this destination is not on our
  522. federation whitelist
  523. RequestSendFailed: If there were problems connecting to the
  524. remote, due to e.g. DNS failures, connection timeouts etc.
  525. """
  526. request = MatrixFederationRequest(
  527. method="PUT",
  528. destination=destination,
  529. path=path,
  530. query=args,
  531. json_callback=json_data_callback,
  532. json=data,
  533. )
  534. response = yield self._send_request_with_optional_trailing_slash(
  535. request,
  536. try_trailing_slash_on_400,
  537. backoff_on_404=backoff_on_404,
  538. ignore_backoff=ignore_backoff,
  539. long_retries=long_retries,
  540. timeout=timeout,
  541. )
  542. body = yield _handle_json_response(
  543. self.reactor, self.default_timeout, request, response
  544. )
  545. return body
  546. @defer.inlineCallbacks
  547. def post_json(
  548. self,
  549. destination,
  550. path,
  551. data={},
  552. long_retries=False,
  553. timeout=None,
  554. ignore_backoff=False,
  555. args={},
  556. ):
  557. """ Sends the specifed json data using POST
  558. Args:
  559. destination (str): The remote server to send the HTTP request
  560. to.
  561. path (str): The HTTP path.
  562. data (dict): A dict containing the data that will be used as
  563. the request body. This will be encoded as JSON.
  564. long_retries (bool): whether to use the long retry algorithm. See
  565. docs on _send_request for details.
  566. timeout (int|None): number of milliseconds to wait for the response headers
  567. (including connecting to the server), *for each attempt*.
  568. self._default_timeout (60s) by default.
  569. ignore_backoff (bool): true to ignore the historical backoff data and
  570. try the request anyway.
  571. args (dict): query params
  572. Returns:
  573. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  574. result will be the decoded JSON body.
  575. Raises:
  576. HttpResponseException: If we get an HTTP response code >= 300
  577. (except 429).
  578. NotRetryingDestination: If we are not yet ready to retry this
  579. server.
  580. FederationDeniedError: If this destination is not on our
  581. federation whitelist
  582. RequestSendFailed: If there were problems connecting to the
  583. remote, due to e.g. DNS failures, connection timeouts etc.
  584. """
  585. request = MatrixFederationRequest(
  586. method="POST", destination=destination, path=path, query=args, json=data
  587. )
  588. response = yield self._send_request(
  589. request,
  590. long_retries=long_retries,
  591. timeout=timeout,
  592. ignore_backoff=ignore_backoff,
  593. )
  594. if timeout:
  595. _sec_timeout = timeout / 1000
  596. else:
  597. _sec_timeout = self.default_timeout
  598. body = yield _handle_json_response(
  599. self.reactor, _sec_timeout, request, response
  600. )
  601. return body
  602. @defer.inlineCallbacks
  603. def get_json(
  604. self,
  605. destination,
  606. path,
  607. args=None,
  608. retry_on_dns_fail=True,
  609. timeout=None,
  610. ignore_backoff=False,
  611. try_trailing_slash_on_400=False,
  612. ):
  613. """ GETs some json from the given host homeserver and path
  614. Args:
  615. destination (str): The remote server to send the HTTP request
  616. to.
  617. path (str): The HTTP path.
  618. args (dict|None): A dictionary used to create query strings, defaults to
  619. None.
  620. timeout (int|None): number of milliseconds to wait for the response headers
  621. (including connecting to the server), *for each attempt*.
  622. self._default_timeout (60s) by default.
  623. ignore_backoff (bool): true to ignore the historical backoff data
  624. and try the request anyway.
  625. try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
  626. response we should try appending a trailing slash to the end of
  627. the request. Workaround for #3622 in Synapse <= v0.99.3.
  628. Returns:
  629. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  630. result will be the decoded JSON body.
  631. Raises:
  632. HttpResponseException: If we get an HTTP response code >= 300
  633. (except 429).
  634. NotRetryingDestination: If we are not yet ready to retry this
  635. server.
  636. FederationDeniedError: If this destination is not on our
  637. federation whitelist
  638. RequestSendFailed: If there were problems connecting to the
  639. remote, due to e.g. DNS failures, connection timeouts etc.
  640. """
  641. request = MatrixFederationRequest(
  642. method="GET", destination=destination, path=path, query=args
  643. )
  644. response = yield self._send_request_with_optional_trailing_slash(
  645. request,
  646. try_trailing_slash_on_400,
  647. backoff_on_404=False,
  648. ignore_backoff=ignore_backoff,
  649. retry_on_dns_fail=retry_on_dns_fail,
  650. timeout=timeout,
  651. )
  652. body = yield _handle_json_response(
  653. self.reactor, self.default_timeout, request, response
  654. )
  655. return body
  656. @defer.inlineCallbacks
  657. def delete_json(
  658. self,
  659. destination,
  660. path,
  661. long_retries=False,
  662. timeout=None,
  663. ignore_backoff=False,
  664. args={},
  665. ):
  666. """Send a DELETE request to the remote expecting some json response
  667. Args:
  668. destination (str): The remote server to send the HTTP request
  669. to.
  670. path (str): The HTTP path.
  671. long_retries (bool): whether to use the long retry algorithm. See
  672. docs on _send_request for details.
  673. timeout (int|None): number of milliseconds to wait for the response headers
  674. (including connecting to the server), *for each attempt*.
  675. self._default_timeout (60s) by default.
  676. ignore_backoff (bool): true to ignore the historical backoff data and
  677. try the request anyway.
  678. args (dict): query params
  679. Returns:
  680. Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
  681. result will be the decoded JSON body.
  682. Raises:
  683. HttpResponseException: If we get an HTTP response code >= 300
  684. (except 429).
  685. NotRetryingDestination: If we are not yet ready to retry this
  686. server.
  687. FederationDeniedError: If this destination is not on our
  688. federation whitelist
  689. RequestSendFailed: If there were problems connecting to the
  690. remote, due to e.g. DNS failures, connection timeouts etc.
  691. """
  692. request = MatrixFederationRequest(
  693. method="DELETE", destination=destination, path=path, query=args
  694. )
  695. response = yield self._send_request(
  696. request,
  697. long_retries=long_retries,
  698. timeout=timeout,
  699. ignore_backoff=ignore_backoff,
  700. )
  701. body = yield _handle_json_response(
  702. self.reactor, self.default_timeout, request, response
  703. )
  704. return body
  705. @defer.inlineCallbacks
  706. def get_file(
  707. self,
  708. destination,
  709. path,
  710. output_stream,
  711. args={},
  712. retry_on_dns_fail=True,
  713. max_size=None,
  714. ignore_backoff=False,
  715. ):
  716. """GETs a file from a given homeserver
  717. Args:
  718. destination (str): The remote server to send the HTTP request to.
  719. path (str): The HTTP path to GET.
  720. output_stream (file): File to write the response body to.
  721. args (dict): Optional dictionary used to create the query string.
  722. ignore_backoff (bool): true to ignore the historical backoff data
  723. and try the request anyway.
  724. Returns:
  725. Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of
  726. the file length and a dict of the response headers.
  727. Raises:
  728. HttpResponseException: If we get an HTTP response code >= 300
  729. (except 429).
  730. NotRetryingDestination: If we are not yet ready to retry this
  731. server.
  732. FederationDeniedError: If this destination is not on our
  733. federation whitelist
  734. RequestSendFailed: If there were problems connecting to the
  735. remote, due to e.g. DNS failures, connection timeouts etc.
  736. """
  737. request = MatrixFederationRequest(
  738. method="GET", destination=destination, path=path, query=args
  739. )
  740. response = yield self._send_request(
  741. request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
  742. )
  743. headers = dict(response.headers.getAllRawHeaders())
  744. try:
  745. d = _readBodyToFile(response, output_stream, max_size)
  746. d.addTimeout(self.default_timeout, self.reactor)
  747. length = yield make_deferred_yieldable(d)
  748. except Exception as e:
  749. logger.warning(
  750. "{%s} [%s] Error reading response: %s",
  751. request.txn_id,
  752. request.destination,
  753. e,
  754. )
  755. raise
  756. logger.info(
  757. "{%s} [%s] Completed: %d %s [%d bytes]",
  758. request.txn_id,
  759. request.destination,
  760. response.code,
  761. response.phrase.decode("ascii", errors="replace"),
  762. length,
  763. )
  764. return (length, headers)
  765. class _ReadBodyToFileProtocol(protocol.Protocol):
  766. def __init__(self, stream, deferred, max_size):
  767. self.stream = stream
  768. self.deferred = deferred
  769. self.length = 0
  770. self.max_size = max_size
  771. def dataReceived(self, data):
  772. self.stream.write(data)
  773. self.length += len(data)
  774. if self.max_size is not None and self.length >= self.max_size:
  775. self.deferred.errback(
  776. SynapseError(
  777. 502,
  778. "Requested file is too large > %r bytes" % (self.max_size,),
  779. Codes.TOO_LARGE,
  780. )
  781. )
  782. self.deferred = defer.Deferred()
  783. self.transport.loseConnection()
  784. def connectionLost(self, reason):
  785. if reason.check(ResponseDone):
  786. self.deferred.callback(self.length)
  787. else:
  788. self.deferred.errback(reason)
  789. def _readBodyToFile(response, stream, max_size):
  790. d = defer.Deferred()
  791. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  792. return d
  793. def _flatten_response_never_received(e):
  794. if hasattr(e, "reasons"):
  795. reasons = ", ".join(
  796. _flatten_response_never_received(f.value) for f in e.reasons
  797. )
  798. return "%s:[%s]" % (type(e).__name__, reasons)
  799. else:
  800. return repr(e)
  801. def check_content_type_is_json(headers):
  802. """
  803. Check that a set of HTTP headers have a Content-Type header, and that it
  804. is application/json.
  805. Args:
  806. headers (twisted.web.http_headers.Headers): headers to check
  807. Raises:
  808. RequestSendFailed: if the Content-Type header is missing or isn't JSON
  809. """
  810. c_type = headers.getRawHeaders(b"Content-Type")
  811. if c_type is None:
  812. raise RequestSendFailed(RuntimeError("No Content-Type header"), can_retry=False)
  813. c_type = c_type[0].decode("ascii") # only the first header
  814. val, options = cgi.parse_header(c_type)
  815. if val != "application/json":
  816. raise RequestSendFailed(
  817. RuntimeError("Content-Type not application/json: was '%s'" % c_type),
  818. can_retry=False,
  819. )
  820. def encode_query_args(args):
  821. if args is None:
  822. return b""
  823. encoded_args = {}
  824. for k, vs in args.items():
  825. if isinstance(vs, string_types):
  826. vs = [vs]
  827. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  828. query_bytes = urllib.parse.urlencode(encoded_args, True)
  829. return query_bytes.encode("utf8")