matrixfederationclient.py 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import cgi
  17. import logging
  18. import random
  19. import sys
  20. import urllib
  21. from io import BytesIO
  22. import attr
  23. import treq
  24. from canonicaljson import encode_canonical_json
  25. from prometheus_client import Counter
  26. from signedjson.sign import sign_json
  27. from zope.interface import implementer
  28. from twisted.internet import defer, protocol
  29. from twisted.internet.error import DNSLookupError
  30. from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime
  31. from twisted.internet.task import _EPSILON, Cooperator
  32. from twisted.web._newclient import ResponseDone
  33. from twisted.web.http_headers import Headers
  34. from twisted.web.iweb import IResponse
  35. import synapse.metrics
  36. import synapse.util.retryutils
  37. from synapse.api.errors import (
  38. Codes,
  39. FederationDeniedError,
  40. HttpResponseException,
  41. RequestSendFailed,
  42. SynapseError,
  43. )
  44. from synapse.http import QuieterFileBodyProducer
  45. from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver
  46. from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
  47. from synapse.logging.context import make_deferred_yieldable
  48. from synapse.logging.opentracing import (
  49. inject_active_span_byte_dict,
  50. set_tag,
  51. start_active_span,
  52. tags,
  53. )
  54. from synapse.util.async_helpers import timeout_deferred
  55. from synapse.util.metrics import Measure
  56. logger = logging.getLogger(__name__)
  57. outgoing_requests_counter = Counter(
  58. "synapse_http_matrixfederationclient_requests", "", ["method"]
  59. )
  60. incoming_responses_counter = Counter(
  61. "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
  62. )
  63. MAX_LONG_RETRIES = 10
  64. MAX_SHORT_RETRIES = 3
  65. MAXINT = sys.maxsize
  66. _next_id = 1
  67. @attr.s(frozen=True)
  68. class MatrixFederationRequest:
  69. method = attr.ib()
  70. """HTTP method
  71. :type: str
  72. """
  73. path = attr.ib()
  74. """HTTP path
  75. :type: str
  76. """
  77. destination = attr.ib()
  78. """The remote server to send the HTTP request to.
  79. :type: str"""
  80. json = attr.ib(default=None)
  81. """JSON to send in the body.
  82. :type: dict|None
  83. """
  84. json_callback = attr.ib(default=None)
  85. """A callback to generate the JSON.
  86. :type: func|None
  87. """
  88. query = attr.ib(default=None)
  89. """Query arguments.
  90. :type: dict|None
  91. """
  92. txn_id = attr.ib(default=None)
  93. """Unique ID for this request (for logging)
  94. :type: str|None
  95. """
  96. uri = attr.ib(init=False, type=bytes)
  97. """The URI of this request
  98. """
  99. def __attrs_post_init__(self):
  100. global _next_id
  101. txn_id = "%s-O-%s" % (self.method, _next_id)
  102. _next_id = (_next_id + 1) % (MAXINT - 1)
  103. object.__setattr__(self, "txn_id", txn_id)
  104. destination_bytes = self.destination.encode("ascii")
  105. path_bytes = self.path.encode("ascii")
  106. if self.query:
  107. query_bytes = encode_query_args(self.query)
  108. else:
  109. query_bytes = b""
  110. # The object is frozen so we can pre-compute this.
  111. uri = urllib.parse.urlunparse(
  112. (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
  113. )
  114. object.__setattr__(self, "uri", uri)
  115. def get_json(self):
  116. if self.json_callback:
  117. return self.json_callback()
  118. return self.json
  119. async def _handle_json_response(
  120. reactor: IReactorTime,
  121. timeout_sec: float,
  122. request: MatrixFederationRequest,
  123. response: IResponse,
  124. start_ms: int,
  125. ):
  126. """
  127. Reads the JSON body of a response, with a timeout
  128. Args:
  129. reactor: twisted reactor, for the timeout
  130. timeout_sec: number of seconds to wait for response to complete
  131. request: the request that triggered the response
  132. response: response to the request
  133. start_ms: Timestamp when request was made
  134. Returns:
  135. dict: parsed JSON response
  136. """
  137. try:
  138. check_content_type_is_json(response.headers)
  139. d = treq.json_content(response)
  140. d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
  141. body = await make_deferred_yieldable(d)
  142. except TimeoutError as e:
  143. logger.warning(
  144. "{%s} [%s] Timed out reading response - %s %s",
  145. request.txn_id,
  146. request.destination,
  147. request.method,
  148. request.uri.decode("ascii"),
  149. )
  150. raise RequestSendFailed(e, can_retry=True) from e
  151. except Exception as e:
  152. logger.warning(
  153. "{%s} [%s] Error reading response %s %s: %s",
  154. request.txn_id,
  155. request.destination,
  156. request.method,
  157. request.uri.decode("ascii"),
  158. e,
  159. )
  160. raise
  161. time_taken_secs = reactor.seconds() - start_ms / 1000
  162. logger.info(
  163. "{%s} [%s] Completed request: %d %s in %.2f secs - %s %s",
  164. request.txn_id,
  165. request.destination,
  166. response.code,
  167. response.phrase.decode("ascii", errors="replace"),
  168. time_taken_secs,
  169. request.method,
  170. request.uri.decode("ascii"),
  171. )
  172. return body
  173. class MatrixFederationHttpClient:
  174. """HTTP client used to talk to other homeservers over the federation
  175. protocol. Send client certificates and signs requests.
  176. Attributes:
  177. agent (twisted.web.client.Agent): The twisted Agent used to send the
  178. requests.
  179. """
  180. def __init__(self, hs, tls_client_options_factory):
  181. self.hs = hs
  182. self.signing_key = hs.signing_key
  183. self.server_name = hs.hostname
  184. real_reactor = hs.get_reactor()
  185. # We need to use a DNS resolver which filters out blacklisted IP
  186. # addresses, to prevent DNS rebinding.
  187. nameResolver = IPBlacklistingResolver(
  188. real_reactor, None, hs.config.federation_ip_range_blacklist
  189. )
  190. @implementer(IReactorPluggableNameResolver)
  191. class Reactor:
  192. def __getattr__(_self, attr):
  193. if attr == "nameResolver":
  194. return nameResolver
  195. else:
  196. return getattr(real_reactor, attr)
  197. self.reactor = Reactor()
  198. user_agent = hs.version_string
  199. if hs.config.user_agent_suffix:
  200. user_agent = "%s %s" % (user_agent, hs.config.user_agent_suffix)
  201. user_agent = user_agent.encode("ascii")
  202. self.agent = MatrixFederationAgent(
  203. self.reactor, tls_client_options_factory, user_agent
  204. )
  205. # Use a BlacklistingAgentWrapper to prevent circumventing the IP
  206. # blacklist via IP literals in server names
  207. self.agent = BlacklistingAgentWrapper(
  208. self.agent,
  209. self.reactor,
  210. ip_blacklist=hs.config.federation_ip_range_blacklist,
  211. )
  212. self.clock = hs.get_clock()
  213. self._store = hs.get_datastore()
  214. self.version_string_bytes = hs.version_string.encode("ascii")
  215. self.default_timeout = 60
  216. def schedule(x):
  217. self.reactor.callLater(_EPSILON, x)
  218. self._cooperator = Cooperator(scheduler=schedule)
  219. async def _send_request_with_optional_trailing_slash(
  220. self, request, try_trailing_slash_on_400=False, **send_request_args
  221. ):
  222. """Wrapper for _send_request which can optionally retry the request
  223. upon receiving a combination of a 400 HTTP response code and a
  224. 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
  225. due to #3622.
  226. Args:
  227. request (MatrixFederationRequest): details of request to be sent
  228. try_trailing_slash_on_400 (bool): Whether on receiving a 400
  229. 'M_UNRECOGNIZED' from the server to retry the request with a
  230. trailing slash appended to the request path.
  231. send_request_args (Dict): A dictionary of arguments to pass to
  232. `_send_request()`.
  233. Raises:
  234. HttpResponseException: If we get an HTTP response code >= 300
  235. (except 429).
  236. Returns:
  237. Dict: Parsed JSON response body.
  238. """
  239. try:
  240. response = await self._send_request(request, **send_request_args)
  241. except HttpResponseException as e:
  242. # Received an HTTP error > 300. Check if it meets the requirements
  243. # to retry with a trailing slash
  244. if not try_trailing_slash_on_400:
  245. raise
  246. if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
  247. raise
  248. # Retry with a trailing slash if we received a 400 with
  249. # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
  250. # trailing slash on Synapse <= v0.99.3.
  251. logger.info("Retrying request with trailing slash")
  252. # Request is frozen so we create a new instance
  253. request = attr.evolve(request, path=request.path + "/")
  254. response = await self._send_request(request, **send_request_args)
  255. return response
  256. async def _send_request(
  257. self,
  258. request,
  259. retry_on_dns_fail=True,
  260. timeout=None,
  261. long_retries=False,
  262. ignore_backoff=False,
  263. backoff_on_404=False,
  264. ):
  265. """
  266. Sends a request to the given server.
  267. Args:
  268. request (MatrixFederationRequest): details of request to be sent
  269. timeout (int|None): number of milliseconds to wait for the response headers
  270. (including connecting to the server), *for each attempt*.
  271. 60s by default.
  272. long_retries (bool): whether to use the long retry algorithm.
  273. The regular retry algorithm makes 4 attempts, with intervals
  274. [0.5s, 1s, 2s].
  275. The long retry algorithm makes 11 attempts, with intervals
  276. [4s, 16s, 60s, 60s, ...]
  277. Both algorithms add -20%/+40% jitter to the retry intervals.
  278. Note that the above intervals are *in addition* to the time spent
  279. waiting for the request to complete (up to `timeout` ms).
  280. NB: the long retry algorithm takes over 20 minutes to complete, with
  281. a default timeout of 60s!
  282. ignore_backoff (bool): true to ignore the historical backoff data
  283. and try the request anyway.
  284. backoff_on_404 (bool): Back off if we get a 404
  285. Returns:
  286. twisted.web.client.Response: resolves with the HTTP
  287. response object on success.
  288. Raises:
  289. HttpResponseException: If we get an HTTP response code >= 300
  290. (except 429).
  291. NotRetryingDestination: If we are not yet ready to retry this
  292. server.
  293. FederationDeniedError: If this destination is not on our
  294. federation whitelist
  295. RequestSendFailed: If there were problems connecting to the
  296. remote, due to e.g. DNS failures, connection timeouts etc.
  297. """
  298. if timeout:
  299. _sec_timeout = timeout / 1000
  300. else:
  301. _sec_timeout = self.default_timeout
  302. if (
  303. self.hs.config.federation_domain_whitelist is not None
  304. and request.destination not in self.hs.config.federation_domain_whitelist
  305. ):
  306. raise FederationDeniedError(request.destination)
  307. limiter = await synapse.util.retryutils.get_retry_limiter(
  308. request.destination,
  309. self.clock,
  310. self._store,
  311. backoff_on_404=backoff_on_404,
  312. ignore_backoff=ignore_backoff,
  313. )
  314. method_bytes = request.method.encode("ascii")
  315. destination_bytes = request.destination.encode("ascii")
  316. path_bytes = request.path.encode("ascii")
  317. if request.query:
  318. query_bytes = encode_query_args(request.query)
  319. else:
  320. query_bytes = b""
  321. scope = start_active_span(
  322. "outgoing-federation-request",
  323. tags={
  324. tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
  325. tags.PEER_ADDRESS: request.destination,
  326. tags.HTTP_METHOD: request.method,
  327. tags.HTTP_URL: request.path,
  328. },
  329. finish_on_close=True,
  330. )
  331. # Inject the span into the headers
  332. headers_dict = {}
  333. inject_active_span_byte_dict(headers_dict, request.destination)
  334. headers_dict[b"User-Agent"] = [self.version_string_bytes]
  335. with limiter, scope:
  336. # XXX: Would be much nicer to retry only at the transaction-layer
  337. # (once we have reliable transactions in place)
  338. if long_retries:
  339. retries_left = MAX_LONG_RETRIES
  340. else:
  341. retries_left = MAX_SHORT_RETRIES
  342. url_bytes = request.uri
  343. url_str = url_bytes.decode("ascii")
  344. url_to_sign_bytes = urllib.parse.urlunparse(
  345. (b"", b"", path_bytes, None, query_bytes, b"")
  346. )
  347. while True:
  348. try:
  349. json = request.get_json()
  350. if json:
  351. headers_dict[b"Content-Type"] = [b"application/json"]
  352. auth_headers = self.build_auth_headers(
  353. destination_bytes, method_bytes, url_to_sign_bytes, json
  354. )
  355. data = encode_canonical_json(json)
  356. producer = QuieterFileBodyProducer(
  357. BytesIO(data), cooperator=self._cooperator
  358. )
  359. else:
  360. producer = None
  361. auth_headers = self.build_auth_headers(
  362. destination_bytes, method_bytes, url_to_sign_bytes
  363. )
  364. headers_dict[b"Authorization"] = auth_headers
  365. logger.debug(
  366. "{%s} [%s] Sending request: %s %s; timeout %fs",
  367. request.txn_id,
  368. request.destination,
  369. request.method,
  370. url_str,
  371. _sec_timeout,
  372. )
  373. outgoing_requests_counter.labels(request.method).inc()
  374. try:
  375. with Measure(self.clock, "outbound_request"):
  376. # we don't want all the fancy cookie and redirect handling
  377. # that treq.request gives: just use the raw Agent.
  378. request_deferred = self.agent.request(
  379. method_bytes,
  380. url_bytes,
  381. headers=Headers(headers_dict),
  382. bodyProducer=producer,
  383. )
  384. request_deferred = timeout_deferred(
  385. request_deferred,
  386. timeout=_sec_timeout,
  387. reactor=self.reactor,
  388. )
  389. response = await request_deferred
  390. except TimeoutError as e:
  391. raise RequestSendFailed(e, can_retry=True) from e
  392. except DNSLookupError as e:
  393. raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
  394. except Exception as e:
  395. raise RequestSendFailed(e, can_retry=True) from e
  396. incoming_responses_counter.labels(
  397. request.method, response.code
  398. ).inc()
  399. set_tag(tags.HTTP_STATUS_CODE, response.code)
  400. response_phrase = response.phrase.decode("ascii", errors="replace")
  401. if 200 <= response.code < 300:
  402. logger.debug(
  403. "{%s} [%s] Got response headers: %d %s",
  404. request.txn_id,
  405. request.destination,
  406. response.code,
  407. response_phrase,
  408. )
  409. pass
  410. else:
  411. logger.info(
  412. "{%s} [%s] Got response headers: %d %s",
  413. request.txn_id,
  414. request.destination,
  415. response.code,
  416. response_phrase,
  417. )
  418. # :'(
  419. # Update transactions table?
  420. d = treq.content(response)
  421. d = timeout_deferred(
  422. d, timeout=_sec_timeout, reactor=self.reactor
  423. )
  424. try:
  425. body = await make_deferred_yieldable(d)
  426. except Exception as e:
  427. # Eh, we're already going to raise an exception so lets
  428. # ignore if this fails.
  429. logger.warning(
  430. "{%s} [%s] Failed to get error response: %s %s: %s",
  431. request.txn_id,
  432. request.destination,
  433. request.method,
  434. url_str,
  435. _flatten_response_never_received(e),
  436. )
  437. body = None
  438. e = HttpResponseException(response.code, response_phrase, body)
  439. # Retry if the error is a 429 (Too Many Requests),
  440. # otherwise just raise a standard HttpResponseException
  441. if response.code == 429:
  442. raise RequestSendFailed(e, can_retry=True) from e
  443. else:
  444. raise e
  445. break
  446. except RequestSendFailed as e:
  447. logger.info(
  448. "{%s} [%s] Request failed: %s %s: %s",
  449. request.txn_id,
  450. request.destination,
  451. request.method,
  452. url_str,
  453. _flatten_response_never_received(e.inner_exception),
  454. )
  455. if not e.can_retry:
  456. raise
  457. if retries_left and not timeout:
  458. if long_retries:
  459. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  460. delay = min(delay, 60)
  461. delay *= random.uniform(0.8, 1.4)
  462. else:
  463. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  464. delay = min(delay, 2)
  465. delay *= random.uniform(0.8, 1.4)
  466. logger.debug(
  467. "{%s} [%s] Waiting %ss before re-sending...",
  468. request.txn_id,
  469. request.destination,
  470. delay,
  471. )
  472. await self.clock.sleep(delay)
  473. retries_left -= 1
  474. else:
  475. raise
  476. except Exception as e:
  477. logger.warning(
  478. "{%s} [%s] Request failed: %s %s: %s",
  479. request.txn_id,
  480. request.destination,
  481. request.method,
  482. url_str,
  483. _flatten_response_never_received(e),
  484. )
  485. raise
  486. return response
  487. def build_auth_headers(
  488. self, destination, method, url_bytes, content=None, destination_is=None
  489. ):
  490. """
  491. Builds the Authorization headers for a federation request
  492. Args:
  493. destination (bytes|None): The desination homeserver of the request.
  494. May be None if the destination is an identity server, in which case
  495. destination_is must be non-None.
  496. method (bytes): The HTTP method of the request
  497. url_bytes (bytes): The URI path of the request
  498. content (object): The body of the request
  499. destination_is (bytes): As 'destination', but if the destination is an
  500. identity server
  501. Returns:
  502. list[bytes]: a list of headers to be added as "Authorization:" headers
  503. """
  504. request = {
  505. "method": method.decode("ascii"),
  506. "uri": url_bytes.decode("ascii"),
  507. "origin": self.server_name,
  508. }
  509. if destination is not None:
  510. request["destination"] = destination.decode("ascii")
  511. if destination_is is not None:
  512. request["destination_is"] = destination_is.decode("ascii")
  513. if content is not None:
  514. request["content"] = content
  515. request = sign_json(request, self.server_name, self.signing_key)
  516. auth_headers = []
  517. for key, sig in request["signatures"][self.server_name].items():
  518. auth_headers.append(
  519. (
  520. 'X-Matrix origin=%s,key="%s",sig="%s"'
  521. % (self.server_name, key, sig)
  522. ).encode("ascii")
  523. )
  524. return auth_headers
  525. async def put_json(
  526. self,
  527. destination,
  528. path,
  529. args={},
  530. data={},
  531. json_data_callback=None,
  532. long_retries=False,
  533. timeout=None,
  534. ignore_backoff=False,
  535. backoff_on_404=False,
  536. try_trailing_slash_on_400=False,
  537. ):
  538. """ Sends the specifed json data using PUT
  539. Args:
  540. destination (str): The remote server to send the HTTP request
  541. to.
  542. path (str): The HTTP path.
  543. args (dict): query params
  544. data (dict): A dict containing the data that will be used as
  545. the request body. This will be encoded as JSON.
  546. json_data_callback (callable): A callable returning the dict to
  547. use as the request body.
  548. long_retries (bool): whether to use the long retry algorithm. See
  549. docs on _send_request for details.
  550. timeout (int|None): number of milliseconds to wait for the response headers
  551. (including connecting to the server), *for each attempt*.
  552. self._default_timeout (60s) by default.
  553. ignore_backoff (bool): true to ignore the historical backoff data
  554. and try the request anyway.
  555. backoff_on_404 (bool): True if we should count a 404 response as
  556. a failure of the server (and should therefore back off future
  557. requests).
  558. try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
  559. response we should try appending a trailing slash to the end
  560. of the request. Workaround for #3622 in Synapse <= v0.99.3. This
  561. will be attempted before backing off if backing off has been
  562. enabled.
  563. Returns:
  564. dict|list: Succeeds when we get a 2xx HTTP response. The
  565. result will be the decoded JSON body.
  566. Raises:
  567. HttpResponseException: If we get an HTTP response code >= 300
  568. (except 429).
  569. NotRetryingDestination: If we are not yet ready to retry this
  570. server.
  571. FederationDeniedError: If this destination is not on our
  572. federation whitelist
  573. RequestSendFailed: If there were problems connecting to the
  574. remote, due to e.g. DNS failures, connection timeouts etc.
  575. """
  576. request = MatrixFederationRequest(
  577. method="PUT",
  578. destination=destination,
  579. path=path,
  580. query=args,
  581. json_callback=json_data_callback,
  582. json=data,
  583. )
  584. start_ms = self.clock.time_msec()
  585. response = await self._send_request_with_optional_trailing_slash(
  586. request,
  587. try_trailing_slash_on_400,
  588. backoff_on_404=backoff_on_404,
  589. ignore_backoff=ignore_backoff,
  590. long_retries=long_retries,
  591. timeout=timeout,
  592. )
  593. body = await _handle_json_response(
  594. self.reactor, self.default_timeout, request, response, start_ms
  595. )
  596. return body
  597. async def post_json(
  598. self,
  599. destination,
  600. path,
  601. data={},
  602. long_retries=False,
  603. timeout=None,
  604. ignore_backoff=False,
  605. args={},
  606. ):
  607. """ Sends the specifed json data using POST
  608. Args:
  609. destination (str): The remote server to send the HTTP request
  610. to.
  611. path (str): The HTTP path.
  612. data (dict): A dict containing the data that will be used as
  613. the request body. This will be encoded as JSON.
  614. long_retries (bool): whether to use the long retry algorithm. See
  615. docs on _send_request for details.
  616. timeout (int|None): number of milliseconds to wait for the response headers
  617. (including connecting to the server), *for each attempt*.
  618. self._default_timeout (60s) by default.
  619. ignore_backoff (bool): true to ignore the historical backoff data and
  620. try the request anyway.
  621. args (dict): query params
  622. Returns:
  623. dict|list: Succeeds when we get a 2xx HTTP response. The
  624. result will be the decoded JSON body.
  625. Raises:
  626. HttpResponseException: If we get an HTTP response code >= 300
  627. (except 429).
  628. NotRetryingDestination: If we are not yet ready to retry this
  629. server.
  630. FederationDeniedError: If this destination is not on our
  631. federation whitelist
  632. RequestSendFailed: If there were problems connecting to the
  633. remote, due to e.g. DNS failures, connection timeouts etc.
  634. """
  635. request = MatrixFederationRequest(
  636. method="POST", destination=destination, path=path, query=args, json=data
  637. )
  638. start_ms = self.clock.time_msec()
  639. response = await self._send_request(
  640. request,
  641. long_retries=long_retries,
  642. timeout=timeout,
  643. ignore_backoff=ignore_backoff,
  644. )
  645. if timeout:
  646. _sec_timeout = timeout / 1000
  647. else:
  648. _sec_timeout = self.default_timeout
  649. body = await _handle_json_response(
  650. self.reactor, _sec_timeout, request, response, start_ms,
  651. )
  652. return body
  653. async def get_json(
  654. self,
  655. destination,
  656. path,
  657. args=None,
  658. retry_on_dns_fail=True,
  659. timeout=None,
  660. ignore_backoff=False,
  661. try_trailing_slash_on_400=False,
  662. ):
  663. """ GETs some json from the given host homeserver and path
  664. Args:
  665. destination (str): The remote server to send the HTTP request
  666. to.
  667. path (str): The HTTP path.
  668. args (dict|None): A dictionary used to create query strings, defaults to
  669. None.
  670. timeout (int|None): number of milliseconds to wait for the response headers
  671. (including connecting to the server), *for each attempt*.
  672. self._default_timeout (60s) by default.
  673. ignore_backoff (bool): true to ignore the historical backoff data
  674. and try the request anyway.
  675. try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
  676. response we should try appending a trailing slash to the end of
  677. the request. Workaround for #3622 in Synapse <= v0.99.3.
  678. Returns:
  679. dict|list: Succeeds when we get a 2xx HTTP response. The
  680. result will be the decoded JSON body.
  681. Raises:
  682. HttpResponseException: If we get an HTTP response code >= 300
  683. (except 429).
  684. NotRetryingDestination: If we are not yet ready to retry this
  685. server.
  686. FederationDeniedError: If this destination is not on our
  687. federation whitelist
  688. RequestSendFailed: If there were problems connecting to the
  689. remote, due to e.g. DNS failures, connection timeouts etc.
  690. """
  691. request = MatrixFederationRequest(
  692. method="GET", destination=destination, path=path, query=args
  693. )
  694. start_ms = self.clock.time_msec()
  695. response = await self._send_request_with_optional_trailing_slash(
  696. request,
  697. try_trailing_slash_on_400,
  698. backoff_on_404=False,
  699. ignore_backoff=ignore_backoff,
  700. retry_on_dns_fail=retry_on_dns_fail,
  701. timeout=timeout,
  702. )
  703. body = await _handle_json_response(
  704. self.reactor, self.default_timeout, request, response, start_ms
  705. )
  706. return body
  707. async def delete_json(
  708. self,
  709. destination,
  710. path,
  711. long_retries=False,
  712. timeout=None,
  713. ignore_backoff=False,
  714. args={},
  715. ):
  716. """Send a DELETE request to the remote expecting some json response
  717. Args:
  718. destination (str): The remote server to send the HTTP request
  719. to.
  720. path (str): The HTTP path.
  721. long_retries (bool): whether to use the long retry algorithm. See
  722. docs on _send_request for details.
  723. timeout (int|None): number of milliseconds to wait for the response headers
  724. (including connecting to the server), *for each attempt*.
  725. self._default_timeout (60s) by default.
  726. ignore_backoff (bool): true to ignore the historical backoff data and
  727. try the request anyway.
  728. args (dict): query params
  729. Returns:
  730. dict|list: Succeeds when we get a 2xx HTTP response. The
  731. result will be the decoded JSON body.
  732. Raises:
  733. HttpResponseException: If we get an HTTP response code >= 300
  734. (except 429).
  735. NotRetryingDestination: If we are not yet ready to retry this
  736. server.
  737. FederationDeniedError: If this destination is not on our
  738. federation whitelist
  739. RequestSendFailed: If there were problems connecting to the
  740. remote, due to e.g. DNS failures, connection timeouts etc.
  741. """
  742. request = MatrixFederationRequest(
  743. method="DELETE", destination=destination, path=path, query=args
  744. )
  745. start_ms = self.clock.time_msec()
  746. response = await self._send_request(
  747. request,
  748. long_retries=long_retries,
  749. timeout=timeout,
  750. ignore_backoff=ignore_backoff,
  751. )
  752. body = await _handle_json_response(
  753. self.reactor, self.default_timeout, request, response, start_ms
  754. )
  755. return body
  756. async def get_file(
  757. self,
  758. destination,
  759. path,
  760. output_stream,
  761. args={},
  762. retry_on_dns_fail=True,
  763. max_size=None,
  764. ignore_backoff=False,
  765. ):
  766. """GETs a file from a given homeserver
  767. Args:
  768. destination (str): The remote server to send the HTTP request to.
  769. path (str): The HTTP path to GET.
  770. output_stream (file): File to write the response body to.
  771. args (dict): Optional dictionary used to create the query string.
  772. ignore_backoff (bool): true to ignore the historical backoff data
  773. and try the request anyway.
  774. Returns:
  775. tuple[int, dict]: Resolves with an (int,dict) tuple of
  776. the file length and a dict of the response headers.
  777. Raises:
  778. HttpResponseException: If we get an HTTP response code >= 300
  779. (except 429).
  780. NotRetryingDestination: If we are not yet ready to retry this
  781. server.
  782. FederationDeniedError: If this destination is not on our
  783. federation whitelist
  784. RequestSendFailed: If there were problems connecting to the
  785. remote, due to e.g. DNS failures, connection timeouts etc.
  786. """
  787. request = MatrixFederationRequest(
  788. method="GET", destination=destination, path=path, query=args
  789. )
  790. response = await self._send_request(
  791. request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
  792. )
  793. headers = dict(response.headers.getAllRawHeaders())
  794. try:
  795. d = _readBodyToFile(response, output_stream, max_size)
  796. d.addTimeout(self.default_timeout, self.reactor)
  797. length = await make_deferred_yieldable(d)
  798. except Exception as e:
  799. logger.warning(
  800. "{%s} [%s] Error reading response: %s",
  801. request.txn_id,
  802. request.destination,
  803. e,
  804. )
  805. raise
  806. logger.info(
  807. "{%s} [%s] Completed: %d %s [%d bytes] %s %s",
  808. request.txn_id,
  809. request.destination,
  810. response.code,
  811. response.phrase.decode("ascii", errors="replace"),
  812. length,
  813. request.method,
  814. request.uri.decode("ascii"),
  815. )
  816. return (length, headers)
  817. class _ReadBodyToFileProtocol(protocol.Protocol):
  818. def __init__(self, stream, deferred, max_size):
  819. self.stream = stream
  820. self.deferred = deferred
  821. self.length = 0
  822. self.max_size = max_size
  823. def dataReceived(self, data):
  824. self.stream.write(data)
  825. self.length += len(data)
  826. if self.max_size is not None and self.length >= self.max_size:
  827. self.deferred.errback(
  828. SynapseError(
  829. 502,
  830. "Requested file is too large > %r bytes" % (self.max_size,),
  831. Codes.TOO_LARGE,
  832. )
  833. )
  834. self.deferred = defer.Deferred()
  835. self.transport.loseConnection()
  836. def connectionLost(self, reason):
  837. if reason.check(ResponseDone):
  838. self.deferred.callback(self.length)
  839. else:
  840. self.deferred.errback(reason)
  841. def _readBodyToFile(response, stream, max_size):
  842. d = defer.Deferred()
  843. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  844. return d
  845. def _flatten_response_never_received(e):
  846. if hasattr(e, "reasons"):
  847. reasons = ", ".join(
  848. _flatten_response_never_received(f.value) for f in e.reasons
  849. )
  850. return "%s:[%s]" % (type(e).__name__, reasons)
  851. else:
  852. return repr(e)
  853. def check_content_type_is_json(headers):
  854. """
  855. Check that a set of HTTP headers have a Content-Type header, and that it
  856. is application/json.
  857. Args:
  858. headers (twisted.web.http_headers.Headers): headers to check
  859. Raises:
  860. RequestSendFailed: if the Content-Type header is missing or isn't JSON
  861. """
  862. c_type = headers.getRawHeaders(b"Content-Type")
  863. if c_type is None:
  864. raise RequestSendFailed(RuntimeError("No Content-Type header"), can_retry=False)
  865. c_type = c_type[0].decode("ascii") # only the first header
  866. val, options = cgi.parse_header(c_type)
  867. if val != "application/json":
  868. raise RequestSendFailed(
  869. RuntimeError("Content-Type not application/json: was '%s'" % c_type),
  870. can_retry=False,
  871. )
  872. def encode_query_args(args):
  873. if args is None:
  874. return b""
  875. encoded_args = {}
  876. for k, vs in args.items():
  877. if isinstance(vs, str):
  878. vs = [vs]
  879. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  880. query_bytes = urllib.parse.urlencode(encoded_args, True)
  881. return query_bytes.encode("utf8")