matrixfederationclient.py 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307
  1. # Copyright 2014-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import abc
  15. import cgi
  16. import codecs
  17. import logging
  18. import random
  19. import sys
  20. import typing
  21. import urllib.parse
  22. from http import HTTPStatus
  23. from io import BytesIO, StringIO
  24. from typing import (
  25. TYPE_CHECKING,
  26. Any,
  27. BinaryIO,
  28. Callable,
  29. Dict,
  30. Generic,
  31. List,
  32. Optional,
  33. Tuple,
  34. TypeVar,
  35. Union,
  36. overload,
  37. )
  38. import attr
  39. import treq
  40. from canonicaljson import encode_canonical_json
  41. from prometheus_client import Counter
  42. from signedjson.sign import sign_json
  43. from typing_extensions import Literal
  44. from twisted.internet import defer
  45. from twisted.internet.error import DNSLookupError
  46. from twisted.internet.interfaces import IReactorTime
  47. from twisted.internet.task import Cooperator
  48. from twisted.web.client import ResponseFailed
  49. from twisted.web.http_headers import Headers
  50. from twisted.web.iweb import IBodyProducer, IResponse
  51. import synapse.metrics
  52. import synapse.util.retryutils
  53. from synapse.api.errors import (
  54. Codes,
  55. FederationDeniedError,
  56. HttpResponseException,
  57. RequestSendFailed,
  58. SynapseError,
  59. )
  60. from synapse.crypto.context_factory import FederationPolicyForHTTPS
  61. from synapse.http import QuieterFileBodyProducer
  62. from synapse.http.client import (
  63. BlacklistingAgentWrapper,
  64. BodyExceededMaxSize,
  65. ByteWriteable,
  66. _make_scheduler,
  67. encode_query_args,
  68. read_body_with_max_size,
  69. )
  70. from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
  71. from synapse.http.types import QueryParams
  72. from synapse.logging import opentracing
  73. from synapse.logging.context import make_deferred_yieldable, run_in_background
  74. from synapse.logging.opentracing import set_tag, start_active_span, tags
  75. from synapse.types import JsonDict
  76. from synapse.util import json_decoder
  77. from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
  78. from synapse.util.metrics import Measure
  79. from synapse.util.stringutils import parse_and_validate_server_name
  80. if TYPE_CHECKING:
  81. from synapse.server import HomeServer
  82. logger = logging.getLogger(__name__)
  83. outgoing_requests_counter = Counter(
  84. "synapse_http_matrixfederationclient_requests", "", ["method"]
  85. )
  86. incoming_responses_counter = Counter(
  87. "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
  88. )
  89. MAX_LONG_RETRIES = 10
  90. MAX_SHORT_RETRIES = 3
  91. MAXINT = sys.maxsize
  92. _next_id = 1
  93. T = TypeVar("T")
  94. class ByteParser(ByteWriteable, Generic[T], abc.ABC):
  95. """A `ByteWriteable` that has an additional `finish` function that returns
  96. the parsed data.
  97. """
  98. CONTENT_TYPE: str = abc.abstractproperty() # type: ignore
  99. """The expected content type of the response, e.g. `application/json`. If
  100. the content type doesn't match we fail the request.
  101. """
  102. # a federation response can be rather large (eg a big state_ids is 50M or so), so we
  103. # need a generous limit here.
  104. MAX_RESPONSE_SIZE: int = 100 * 1024 * 1024
  105. """The largest response this parser will accept."""
  106. @abc.abstractmethod
  107. def finish(self) -> T:
  108. """Called when response has finished streaming and the parser should
  109. return the final result (or error).
  110. """
  111. @attr.s(slots=True, frozen=True, auto_attribs=True)
  112. class MatrixFederationRequest:
  113. method: str
  114. """HTTP method
  115. """
  116. path: str
  117. """HTTP path
  118. """
  119. destination: str
  120. """The remote server to send the HTTP request to.
  121. """
  122. json: Optional[JsonDict] = None
  123. """JSON to send in the body.
  124. """
  125. json_callback: Optional[Callable[[], JsonDict]] = None
  126. """A callback to generate the JSON.
  127. """
  128. query: Optional[QueryParams] = None
  129. """Query arguments.
  130. """
  131. txn_id: Optional[str] = None
  132. """Unique ID for this request (for logging)
  133. """
  134. uri: bytes = attr.ib(init=False)
  135. """The URI of this request
  136. """
  137. def __attrs_post_init__(self) -> None:
  138. global _next_id
  139. txn_id = "%s-O-%s" % (self.method, _next_id)
  140. _next_id = (_next_id + 1) % (MAXINT - 1)
  141. object.__setattr__(self, "txn_id", txn_id)
  142. destination_bytes = self.destination.encode("ascii")
  143. path_bytes = self.path.encode("ascii")
  144. query_bytes = encode_query_args(self.query)
  145. # The object is frozen so we can pre-compute this.
  146. uri = urllib.parse.urlunparse(
  147. (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
  148. )
  149. object.__setattr__(self, "uri", uri)
  150. def get_json(self) -> Optional[JsonDict]:
  151. if self.json_callback:
  152. return self.json_callback()
  153. return self.json
  154. class JsonParser(ByteParser[Union[JsonDict, list]]):
  155. """A parser that buffers the response and tries to parse it as JSON."""
  156. CONTENT_TYPE = "application/json"
  157. def __init__(self) -> None:
  158. self._buffer = StringIO()
  159. self._binary_wrapper = BinaryIOWrapper(self._buffer)
  160. def write(self, data: bytes) -> int:
  161. return self._binary_wrapper.write(data)
  162. def finish(self) -> Union[JsonDict, list]:
  163. return json_decoder.decode(self._buffer.getvalue())
  164. async def _handle_response(
  165. reactor: IReactorTime,
  166. timeout_sec: float,
  167. request: MatrixFederationRequest,
  168. response: IResponse,
  169. start_ms: int,
  170. parser: ByteParser[T],
  171. ) -> T:
  172. """
  173. Reads the body of a response with a timeout and sends it to a parser
  174. Args:
  175. reactor: twisted reactor, for the timeout
  176. timeout_sec: number of seconds to wait for response to complete
  177. request: the request that triggered the response
  178. response: response to the request
  179. start_ms: Timestamp when request was made
  180. parser: The parser for the response
  181. Returns:
  182. The parsed response
  183. """
  184. max_response_size = parser.MAX_RESPONSE_SIZE
  185. finished = False
  186. try:
  187. check_content_type_is(response.headers, parser.CONTENT_TYPE)
  188. d = read_body_with_max_size(response, parser, max_response_size)
  189. d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
  190. length = await make_deferred_yieldable(d)
  191. finished = True
  192. value = parser.finish()
  193. except BodyExceededMaxSize as e:
  194. # The response was too big.
  195. logger.warning(
  196. "{%s} [%s] JSON response exceeded max size %i - %s %s",
  197. request.txn_id,
  198. request.destination,
  199. max_response_size,
  200. request.method,
  201. request.uri.decode("ascii"),
  202. )
  203. raise RequestSendFailed(e, can_retry=False) from e
  204. except ValueError as e:
  205. # The content was invalid.
  206. logger.warning(
  207. "{%s} [%s] Failed to parse response - %s %s",
  208. request.txn_id,
  209. request.destination,
  210. request.method,
  211. request.uri.decode("ascii"),
  212. )
  213. raise RequestSendFailed(e, can_retry=False) from e
  214. except defer.TimeoutError as e:
  215. logger.warning(
  216. "{%s} [%s] Timed out reading response - %s %s",
  217. request.txn_id,
  218. request.destination,
  219. request.method,
  220. request.uri.decode("ascii"),
  221. )
  222. raise RequestSendFailed(e, can_retry=True) from e
  223. except ResponseFailed as e:
  224. logger.warning(
  225. "{%s} [%s] Failed to read response - %s %s",
  226. request.txn_id,
  227. request.destination,
  228. request.method,
  229. request.uri.decode("ascii"),
  230. )
  231. raise RequestSendFailed(e, can_retry=True) from e
  232. except Exception as e:
  233. logger.warning(
  234. "{%s} [%s] Error reading response %s %s: %s",
  235. request.txn_id,
  236. request.destination,
  237. request.method,
  238. request.uri.decode("ascii"),
  239. e,
  240. )
  241. raise
  242. finally:
  243. if not finished:
  244. # There was an exception and we didn't `finish()` the parse.
  245. # Let the parser know that it can free up any resources.
  246. try:
  247. parser.finish()
  248. except Exception:
  249. # Ignore any additional exceptions.
  250. pass
  251. time_taken_secs = reactor.seconds() - start_ms / 1000
  252. logger.info(
  253. "{%s} [%s] Completed request: %d %s in %.2f secs, got %d bytes - %s %s",
  254. request.txn_id,
  255. request.destination,
  256. response.code,
  257. response.phrase.decode("ascii", errors="replace"),
  258. time_taken_secs,
  259. length,
  260. request.method,
  261. request.uri.decode("ascii"),
  262. )
  263. return value
  264. class BinaryIOWrapper:
  265. """A wrapper for a TextIO which converts from bytes on the fly."""
  266. def __init__(
  267. self, file: typing.TextIO, encoding: str = "utf-8", errors: str = "strict"
  268. ):
  269. self.decoder = codecs.getincrementaldecoder(encoding)(errors)
  270. self.file = file
  271. def write(self, b: Union[bytes, bytearray]) -> int:
  272. self.file.write(self.decoder.decode(b))
  273. return len(b)
  274. class MatrixFederationHttpClient:
  275. """HTTP client used to talk to other homeservers over the federation
  276. protocol. Send client certificates and signs requests.
  277. Attributes:
  278. agent (twisted.web.client.Agent): The twisted Agent used to send the
  279. requests.
  280. """
  281. def __init__(
  282. self,
  283. hs: "HomeServer",
  284. tls_client_options_factory: Optional[FederationPolicyForHTTPS],
  285. ):
  286. self.hs = hs
  287. self.signing_key = hs.signing_key
  288. self.server_name = hs.hostname
  289. self.reactor = hs.get_reactor()
  290. user_agent = hs.version_string
  291. if hs.config.server.user_agent_suffix:
  292. user_agent = "%s %s" % (user_agent, hs.config.server.user_agent_suffix)
  293. federation_agent = MatrixFederationAgent(
  294. self.reactor,
  295. tls_client_options_factory,
  296. user_agent.encode("ascii"),
  297. hs.config.server.federation_ip_range_whitelist,
  298. hs.config.server.federation_ip_range_blacklist,
  299. )
  300. # Use a BlacklistingAgentWrapper to prevent circumventing the IP
  301. # blacklist via IP literals in server names
  302. self.agent = BlacklistingAgentWrapper(
  303. federation_agent,
  304. ip_blacklist=hs.config.server.federation_ip_range_blacklist,
  305. )
  306. self.clock = hs.get_clock()
  307. self._store = hs.get_datastores().main
  308. self.version_string_bytes = hs.version_string.encode("ascii")
  309. self.default_timeout = 60
  310. self._cooperator = Cooperator(scheduler=_make_scheduler(self.reactor))
  311. self._sleeper = AwakenableSleeper(self.reactor)
  312. def wake_destination(self, destination: str) -> None:
  313. """Called when the remote server may have come back online."""
  314. self._sleeper.wake(destination)
  315. async def _send_request_with_optional_trailing_slash(
  316. self,
  317. request: MatrixFederationRequest,
  318. try_trailing_slash_on_400: bool = False,
  319. **send_request_args: Any,
  320. ) -> IResponse:
  321. """Wrapper for _send_request which can optionally retry the request
  322. upon receiving a combination of a 400 HTTP response code and a
  323. 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
  324. due to #3622.
  325. Args:
  326. request: details of request to be sent
  327. try_trailing_slash_on_400: Whether on receiving a 400
  328. 'M_UNRECOGNIZED' from the server to retry the request with a
  329. trailing slash appended to the request path.
  330. send_request_args: A dictionary of arguments to pass to `_send_request()`.
  331. Raises:
  332. HttpResponseException: If we get an HTTP response code >= 300
  333. (except 429).
  334. Returns:
  335. Parsed JSON response body.
  336. """
  337. try:
  338. response = await self._send_request(request, **send_request_args)
  339. except HttpResponseException as e:
  340. # Received an HTTP error > 300. Check if it meets the requirements
  341. # to retry with a trailing slash
  342. if not try_trailing_slash_on_400:
  343. raise
  344. if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
  345. raise
  346. # Retry with a trailing slash if we received a 400 with
  347. # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
  348. # trailing slash on Synapse <= v0.99.3.
  349. logger.info("Retrying request with trailing slash")
  350. # Request is frozen so we create a new instance
  351. request = attr.evolve(request, path=request.path + "/")
  352. response = await self._send_request(request, **send_request_args)
  353. return response
  354. async def _send_request(
  355. self,
  356. request: MatrixFederationRequest,
  357. retry_on_dns_fail: bool = True,
  358. timeout: Optional[int] = None,
  359. long_retries: bool = False,
  360. ignore_backoff: bool = False,
  361. backoff_on_404: bool = False,
  362. ) -> IResponse:
  363. """
  364. Sends a request to the given server.
  365. Args:
  366. request: details of request to be sent
  367. retry_on_dns_fail: true if the request should be retried on DNS failures
  368. timeout: number of milliseconds to wait for the response headers
  369. (including connecting to the server), *for each attempt*.
  370. 60s by default.
  371. long_retries: whether to use the long retry algorithm.
  372. The regular retry algorithm makes 4 attempts, with intervals
  373. [0.5s, 1s, 2s].
  374. The long retry algorithm makes 11 attempts, with intervals
  375. [4s, 16s, 60s, 60s, ...]
  376. Both algorithms add -20%/+40% jitter to the retry intervals.
  377. Note that the above intervals are *in addition* to the time spent
  378. waiting for the request to complete (up to `timeout` ms).
  379. NB: the long retry algorithm takes over 20 minutes to complete, with
  380. a default timeout of 60s!
  381. ignore_backoff: true to ignore the historical backoff data
  382. and try the request anyway.
  383. backoff_on_404: Back off if we get a 404
  384. Returns:
  385. Resolves with the HTTP response object on success.
  386. Raises:
  387. HttpResponseException: If we get an HTTP response code >= 300
  388. (except 429).
  389. NotRetryingDestination: If we are not yet ready to retry this
  390. server.
  391. FederationDeniedError: If this destination is not on our
  392. federation whitelist
  393. RequestSendFailed: If there were problems connecting to the
  394. remote, due to e.g. DNS failures, connection timeouts etc.
  395. """
  396. # Validate server name and log if it is an invalid destination, this is
  397. # partially to help track down code paths where we haven't validated before here
  398. try:
  399. parse_and_validate_server_name(request.destination)
  400. except ValueError:
  401. logger.exception(f"Invalid destination: {request.destination}.")
  402. raise FederationDeniedError(request.destination)
  403. if timeout:
  404. _sec_timeout = timeout / 1000
  405. else:
  406. _sec_timeout = self.default_timeout
  407. if (
  408. self.hs.config.federation.federation_domain_whitelist is not None
  409. and request.destination
  410. not in self.hs.config.federation.federation_domain_whitelist
  411. ):
  412. raise FederationDeniedError(request.destination)
  413. limiter = await synapse.util.retryutils.get_retry_limiter(
  414. request.destination,
  415. self.clock,
  416. self._store,
  417. backoff_on_404=backoff_on_404,
  418. ignore_backoff=ignore_backoff,
  419. notifier=self.hs.get_notifier(),
  420. replication_client=self.hs.get_replication_command_handler(),
  421. )
  422. method_bytes = request.method.encode("ascii")
  423. destination_bytes = request.destination.encode("ascii")
  424. path_bytes = request.path.encode("ascii")
  425. query_bytes = encode_query_args(request.query)
  426. scope = start_active_span(
  427. "outgoing-federation-request",
  428. tags={
  429. tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
  430. tags.PEER_ADDRESS: request.destination,
  431. tags.HTTP_METHOD: request.method,
  432. tags.HTTP_URL: request.path,
  433. },
  434. finish_on_close=True,
  435. )
  436. # Inject the span into the headers
  437. headers_dict: Dict[bytes, List[bytes]] = {}
  438. opentracing.inject_header_dict(headers_dict, request.destination)
  439. headers_dict[b"User-Agent"] = [self.version_string_bytes]
  440. with limiter, scope:
  441. # XXX: Would be much nicer to retry only at the transaction-layer
  442. # (once we have reliable transactions in place)
  443. if long_retries:
  444. retries_left = MAX_LONG_RETRIES
  445. else:
  446. retries_left = MAX_SHORT_RETRIES
  447. url_bytes = request.uri
  448. url_str = url_bytes.decode("ascii")
  449. url_to_sign_bytes = urllib.parse.urlunparse(
  450. (b"", b"", path_bytes, None, query_bytes, b"")
  451. )
  452. while True:
  453. try:
  454. json = request.get_json()
  455. if json:
  456. headers_dict[b"Content-Type"] = [b"application/json"]
  457. auth_headers = self.build_auth_headers(
  458. destination_bytes, method_bytes, url_to_sign_bytes, json
  459. )
  460. data = encode_canonical_json(json)
  461. producer: Optional[IBodyProducer] = QuieterFileBodyProducer(
  462. BytesIO(data), cooperator=self._cooperator
  463. )
  464. else:
  465. producer = None
  466. auth_headers = self.build_auth_headers(
  467. destination_bytes, method_bytes, url_to_sign_bytes
  468. )
  469. headers_dict[b"Authorization"] = auth_headers
  470. logger.debug(
  471. "{%s} [%s] Sending request: %s %s; timeout %fs",
  472. request.txn_id,
  473. request.destination,
  474. request.method,
  475. url_str,
  476. _sec_timeout,
  477. )
  478. outgoing_requests_counter.labels(request.method).inc()
  479. try:
  480. with Measure(self.clock, "outbound_request"):
  481. # we don't want all the fancy cookie and redirect handling
  482. # that treq.request gives: just use the raw Agent.
  483. # To preserve the logging context, the timeout is treated
  484. # in a similar way to `defer.gatherResults`:
  485. # * Each logging context-preserving fork is wrapped in
  486. # `run_in_background`. In this case there is only one,
  487. # since the timeout fork is not logging-context aware.
  488. # * The `Deferred` that joins the forks back together is
  489. # wrapped in `make_deferred_yieldable` to restore the
  490. # logging context regardless of the path taken.
  491. request_deferred = run_in_background(
  492. self.agent.request,
  493. method_bytes,
  494. url_bytes,
  495. headers=Headers(headers_dict),
  496. bodyProducer=producer,
  497. )
  498. request_deferred = timeout_deferred(
  499. request_deferred,
  500. timeout=_sec_timeout,
  501. reactor=self.reactor,
  502. )
  503. response = await make_deferred_yieldable(request_deferred)
  504. except DNSLookupError as e:
  505. raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
  506. except Exception as e:
  507. raise RequestSendFailed(e, can_retry=True) from e
  508. incoming_responses_counter.labels(
  509. request.method, response.code
  510. ).inc()
  511. set_tag(tags.HTTP_STATUS_CODE, response.code)
  512. response_phrase = response.phrase.decode("ascii", errors="replace")
  513. if 200 <= response.code < 300:
  514. logger.debug(
  515. "{%s} [%s] Got response headers: %d %s",
  516. request.txn_id,
  517. request.destination,
  518. response.code,
  519. response_phrase,
  520. )
  521. else:
  522. logger.info(
  523. "{%s} [%s] Got response headers: %d %s",
  524. request.txn_id,
  525. request.destination,
  526. response.code,
  527. response_phrase,
  528. )
  529. # :'(
  530. # Update transactions table?
  531. d = treq.content(response)
  532. d = timeout_deferred(
  533. d, timeout=_sec_timeout, reactor=self.reactor
  534. )
  535. try:
  536. body = await make_deferred_yieldable(d)
  537. except Exception as e:
  538. # Eh, we're already going to raise an exception so lets
  539. # ignore if this fails.
  540. logger.warning(
  541. "{%s} [%s] Failed to get error response: %s %s: %s",
  542. request.txn_id,
  543. request.destination,
  544. request.method,
  545. url_str,
  546. _flatten_response_never_received(e),
  547. )
  548. body = None
  549. exc = HttpResponseException(
  550. response.code, response_phrase, body
  551. )
  552. # Retry if the error is a 5xx or a 429 (Too Many
  553. # Requests), otherwise just raise a standard
  554. # `HttpResponseException`
  555. if 500 <= response.code < 600 or response.code == 429:
  556. raise RequestSendFailed(exc, can_retry=True) from exc
  557. else:
  558. raise exc
  559. break
  560. except RequestSendFailed as e:
  561. logger.info(
  562. "{%s} [%s] Request failed: %s %s: %s",
  563. request.txn_id,
  564. request.destination,
  565. request.method,
  566. url_str,
  567. _flatten_response_never_received(e.inner_exception),
  568. )
  569. if not e.can_retry:
  570. raise
  571. if retries_left and not timeout:
  572. if long_retries:
  573. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  574. delay = min(delay, 60)
  575. delay *= random.uniform(0.8, 1.4)
  576. else:
  577. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  578. delay = min(delay, 2)
  579. delay *= random.uniform(0.8, 1.4)
  580. logger.debug(
  581. "{%s} [%s] Waiting %ss before re-sending...",
  582. request.txn_id,
  583. request.destination,
  584. delay,
  585. )
  586. # Sleep for the calculated delay, or wake up immediately
  587. # if we get notified that the server is back up.
  588. await self._sleeper.sleep(request.destination, delay * 1000)
  589. retries_left -= 1
  590. else:
  591. raise
  592. except Exception as e:
  593. logger.warning(
  594. "{%s} [%s] Request failed: %s %s: %s",
  595. request.txn_id,
  596. request.destination,
  597. request.method,
  598. url_str,
  599. _flatten_response_never_received(e),
  600. )
  601. raise
  602. return response
  603. def build_auth_headers(
  604. self,
  605. destination: Optional[bytes],
  606. method: bytes,
  607. url_bytes: bytes,
  608. content: Optional[JsonDict] = None,
  609. destination_is: Optional[bytes] = None,
  610. ) -> List[bytes]:
  611. """
  612. Builds the Authorization headers for a federation request
  613. Args:
  614. destination: The destination homeserver of the request.
  615. May be None if the destination is an identity server, in which case
  616. destination_is must be non-None.
  617. method: The HTTP method of the request
  618. url_bytes: The URI path of the request
  619. content: The body of the request
  620. destination_is: As 'destination', but if the destination is an
  621. identity server
  622. Returns:
  623. A list of headers to be added as "Authorization:" headers
  624. """
  625. if not destination and not destination_is:
  626. raise ValueError(
  627. "At least one of the arguments destination and destination_is "
  628. "must be a nonempty bytestring."
  629. )
  630. request: JsonDict = {
  631. "method": method.decode("ascii"),
  632. "uri": url_bytes.decode("ascii"),
  633. "origin": self.server_name,
  634. }
  635. if destination is not None:
  636. request["destination"] = destination.decode("ascii")
  637. if destination_is is not None:
  638. request["destination_is"] = destination_is.decode("ascii")
  639. if content is not None:
  640. request["content"] = content
  641. request = sign_json(request, self.server_name, self.signing_key)
  642. auth_headers = []
  643. for key, sig in request["signatures"][self.server_name].items():
  644. auth_headers.append(
  645. (
  646. 'X-Matrix origin="%s",key="%s",sig="%s",destination="%s"'
  647. % (
  648. self.server_name,
  649. key,
  650. sig,
  651. request.get("destination") or request["destination_is"],
  652. )
  653. ).encode("ascii")
  654. )
  655. return auth_headers
  656. @overload
  657. async def put_json(
  658. self,
  659. destination: str,
  660. path: str,
  661. args: Optional[QueryParams] = None,
  662. data: Optional[JsonDict] = None,
  663. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  664. long_retries: bool = False,
  665. timeout: Optional[int] = None,
  666. ignore_backoff: bool = False,
  667. backoff_on_404: bool = False,
  668. try_trailing_slash_on_400: bool = False,
  669. parser: Literal[None] = None,
  670. ) -> Union[JsonDict, list]:
  671. ...
  672. @overload
  673. async def put_json(
  674. self,
  675. destination: str,
  676. path: str,
  677. args: Optional[QueryParams] = None,
  678. data: Optional[JsonDict] = None,
  679. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  680. long_retries: bool = False,
  681. timeout: Optional[int] = None,
  682. ignore_backoff: bool = False,
  683. backoff_on_404: bool = False,
  684. try_trailing_slash_on_400: bool = False,
  685. parser: Optional[ByteParser[T]] = None,
  686. ) -> T:
  687. ...
  688. async def put_json(
  689. self,
  690. destination: str,
  691. path: str,
  692. args: Optional[QueryParams] = None,
  693. data: Optional[JsonDict] = None,
  694. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  695. long_retries: bool = False,
  696. timeout: Optional[int] = None,
  697. ignore_backoff: bool = False,
  698. backoff_on_404: bool = False,
  699. try_trailing_slash_on_400: bool = False,
  700. parser: Optional[ByteParser] = None,
  701. ):
  702. """Sends the specified json data using PUT
  703. Args:
  704. destination: The remote server to send the HTTP request to.
  705. path: The HTTP path.
  706. args: query params
  707. data: A dict containing the data that will be used as
  708. the request body. This will be encoded as JSON.
  709. json_data_callback: A callable returning the dict to
  710. use as the request body.
  711. long_retries: whether to use the long retry algorithm. See
  712. docs on _send_request for details.
  713. timeout: number of milliseconds to wait for the response.
  714. self._default_timeout (60s) by default.
  715. Note that we may make several attempts to send the request; this
  716. timeout applies to the time spent waiting for response headers for
  717. *each* attempt (including connection time) as well as the time spent
  718. reading the response body after a 200 response.
  719. ignore_backoff: true to ignore the historical backoff data
  720. and try the request anyway.
  721. backoff_on_404: True if we should count a 404 response as
  722. a failure of the server (and should therefore back off future
  723. requests).
  724. try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
  725. response we should try appending a trailing slash to the end
  726. of the request. Workaround for #3622 in Synapse <= v0.99.3. This
  727. will be attempted before backing off if backing off has been
  728. enabled.
  729. parser: The parser to use to decode the response. Defaults to
  730. parsing as JSON.
  731. Returns:
  732. Succeeds when we get a 2xx HTTP response. The
  733. result will be the decoded JSON body.
  734. Raises:
  735. HttpResponseException: If we get an HTTP response code >= 300
  736. (except 429).
  737. NotRetryingDestination: If we are not yet ready to retry this
  738. server.
  739. FederationDeniedError: If this destination is not on our
  740. federation whitelist
  741. RequestSendFailed: If there were problems connecting to the
  742. remote, due to e.g. DNS failures, connection timeouts etc.
  743. """
  744. request = MatrixFederationRequest(
  745. method="PUT",
  746. destination=destination,
  747. path=path,
  748. query=args,
  749. json_callback=json_data_callback,
  750. json=data,
  751. )
  752. start_ms = self.clock.time_msec()
  753. response = await self._send_request_with_optional_trailing_slash(
  754. request,
  755. try_trailing_slash_on_400,
  756. backoff_on_404=backoff_on_404,
  757. ignore_backoff=ignore_backoff,
  758. long_retries=long_retries,
  759. timeout=timeout,
  760. )
  761. if timeout is not None:
  762. _sec_timeout = timeout / 1000
  763. else:
  764. _sec_timeout = self.default_timeout
  765. if parser is None:
  766. parser = JsonParser()
  767. body = await _handle_response(
  768. self.reactor,
  769. _sec_timeout,
  770. request,
  771. response,
  772. start_ms,
  773. parser=parser,
  774. )
  775. return body
  776. async def post_json(
  777. self,
  778. destination: str,
  779. path: str,
  780. data: Optional[JsonDict] = None,
  781. long_retries: bool = False,
  782. timeout: Optional[int] = None,
  783. ignore_backoff: bool = False,
  784. args: Optional[QueryParams] = None,
  785. ) -> Union[JsonDict, list]:
  786. """Sends the specified json data using POST
  787. Args:
  788. destination: The remote server to send the HTTP request to.
  789. path: The HTTP path.
  790. data: A dict containing the data that will be used as
  791. the request body. This will be encoded as JSON.
  792. long_retries: whether to use the long retry algorithm. See
  793. docs on _send_request for details.
  794. timeout: number of milliseconds to wait for the response.
  795. self._default_timeout (60s) by default.
  796. Note that we may make several attempts to send the request; this
  797. timeout applies to the time spent waiting for response headers for
  798. *each* attempt (including connection time) as well as the time spent
  799. reading the response body after a 200 response.
  800. ignore_backoff: true to ignore the historical backoff data and
  801. try the request anyway.
  802. args: query params
  803. Returns:
  804. Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body.
  805. Raises:
  806. HttpResponseException: If we get an HTTP response code >= 300
  807. (except 429).
  808. NotRetryingDestination: If we are not yet ready to retry this
  809. server.
  810. FederationDeniedError: If this destination is not on our
  811. federation whitelist
  812. RequestSendFailed: If there were problems connecting to the
  813. remote, due to e.g. DNS failures, connection timeouts etc.
  814. """
  815. request = MatrixFederationRequest(
  816. method="POST", destination=destination, path=path, query=args, json=data
  817. )
  818. start_ms = self.clock.time_msec()
  819. response = await self._send_request(
  820. request,
  821. long_retries=long_retries,
  822. timeout=timeout,
  823. ignore_backoff=ignore_backoff,
  824. )
  825. if timeout:
  826. _sec_timeout = timeout / 1000
  827. else:
  828. _sec_timeout = self.default_timeout
  829. body = await _handle_response(
  830. self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
  831. )
  832. return body
  833. @overload
  834. async def get_json(
  835. self,
  836. destination: str,
  837. path: str,
  838. args: Optional[QueryParams] = None,
  839. retry_on_dns_fail: bool = True,
  840. timeout: Optional[int] = None,
  841. ignore_backoff: bool = False,
  842. try_trailing_slash_on_400: bool = False,
  843. parser: Literal[None] = None,
  844. ) -> Union[JsonDict, list]:
  845. ...
  846. @overload
  847. async def get_json(
  848. self,
  849. destination: str,
  850. path: str,
  851. args: Optional[QueryParams] = ...,
  852. retry_on_dns_fail: bool = ...,
  853. timeout: Optional[int] = ...,
  854. ignore_backoff: bool = ...,
  855. try_trailing_slash_on_400: bool = ...,
  856. parser: ByteParser[T] = ...,
  857. ) -> T:
  858. ...
  859. async def get_json(
  860. self,
  861. destination: str,
  862. path: str,
  863. args: Optional[QueryParams] = None,
  864. retry_on_dns_fail: bool = True,
  865. timeout: Optional[int] = None,
  866. ignore_backoff: bool = False,
  867. try_trailing_slash_on_400: bool = False,
  868. parser: Optional[ByteParser] = None,
  869. ):
  870. """GETs some json from the given host homeserver and path
  871. Args:
  872. destination: The remote server to send the HTTP request to.
  873. path: The HTTP path.
  874. args: A dictionary used to create query strings, defaults to
  875. None.
  876. retry_on_dns_fail: true if the request should be retried on DNS failures
  877. timeout: number of milliseconds to wait for the response.
  878. self._default_timeout (60s) by default.
  879. Note that we may make several attempts to send the request; this
  880. timeout applies to the time spent waiting for response headers for
  881. *each* attempt (including connection time) as well as the time spent
  882. reading the response body after a 200 response.
  883. ignore_backoff: true to ignore the historical backoff data
  884. and try the request anyway.
  885. try_trailing_slash_on_400: True if on a 400 M_UNRECOGNIZED
  886. response we should try appending a trailing slash to the end of
  887. the request. Workaround for #3622 in Synapse <= v0.99.3.
  888. parser: The parser to use to decode the response. Defaults to
  889. parsing as JSON.
  890. Returns:
  891. Succeeds when we get a 2xx HTTP response. The
  892. result will be the decoded JSON body.
  893. Raises:
  894. HttpResponseException: If we get an HTTP response code >= 300
  895. (except 429).
  896. NotRetryingDestination: If we are not yet ready to retry this
  897. server.
  898. FederationDeniedError: If this destination is not on our
  899. federation whitelist
  900. RequestSendFailed: If there were problems connecting to the
  901. remote, due to e.g. DNS failures, connection timeouts etc.
  902. """
  903. request = MatrixFederationRequest(
  904. method="GET", destination=destination, path=path, query=args
  905. )
  906. start_ms = self.clock.time_msec()
  907. response = await self._send_request_with_optional_trailing_slash(
  908. request,
  909. try_trailing_slash_on_400,
  910. backoff_on_404=False,
  911. ignore_backoff=ignore_backoff,
  912. retry_on_dns_fail=retry_on_dns_fail,
  913. timeout=timeout,
  914. )
  915. if timeout is not None:
  916. _sec_timeout = timeout / 1000
  917. else:
  918. _sec_timeout = self.default_timeout
  919. if parser is None:
  920. parser = JsonParser()
  921. body = await _handle_response(
  922. self.reactor,
  923. _sec_timeout,
  924. request,
  925. response,
  926. start_ms,
  927. parser=parser,
  928. )
  929. return body
  930. async def delete_json(
  931. self,
  932. destination: str,
  933. path: str,
  934. long_retries: bool = False,
  935. timeout: Optional[int] = None,
  936. ignore_backoff: bool = False,
  937. args: Optional[QueryParams] = None,
  938. ) -> Union[JsonDict, list]:
  939. """Send a DELETE request to the remote expecting some json response
  940. Args:
  941. destination: The remote server to send the HTTP request to.
  942. path: The HTTP path.
  943. long_retries: whether to use the long retry algorithm. See
  944. docs on _send_request for details.
  945. timeout: number of milliseconds to wait for the response.
  946. self._default_timeout (60s) by default.
  947. Note that we may make several attempts to send the request; this
  948. timeout applies to the time spent waiting for response headers for
  949. *each* attempt (including connection time) as well as the time spent
  950. reading the response body after a 200 response.
  951. ignore_backoff: true to ignore the historical backoff data and
  952. try the request anyway.
  953. args: query params
  954. Returns:
  955. Succeeds when we get a 2xx HTTP response. The
  956. result will be the decoded JSON body.
  957. Raises:
  958. HttpResponseException: If we get an HTTP response code >= 300
  959. (except 429).
  960. NotRetryingDestination: If we are not yet ready to retry this
  961. server.
  962. FederationDeniedError: If this destination is not on our
  963. federation whitelist
  964. RequestSendFailed: If there were problems connecting to the
  965. remote, due to e.g. DNS failures, connection timeouts etc.
  966. """
  967. request = MatrixFederationRequest(
  968. method="DELETE", destination=destination, path=path, query=args
  969. )
  970. start_ms = self.clock.time_msec()
  971. response = await self._send_request(
  972. request,
  973. long_retries=long_retries,
  974. timeout=timeout,
  975. ignore_backoff=ignore_backoff,
  976. )
  977. if timeout is not None:
  978. _sec_timeout = timeout / 1000
  979. else:
  980. _sec_timeout = self.default_timeout
  981. body = await _handle_response(
  982. self.reactor, _sec_timeout, request, response, start_ms, parser=JsonParser()
  983. )
  984. return body
  985. async def get_file(
  986. self,
  987. destination: str,
  988. path: str,
  989. output_stream: BinaryIO,
  990. args: Optional[QueryParams] = None,
  991. retry_on_dns_fail: bool = True,
  992. max_size: Optional[int] = None,
  993. ignore_backoff: bool = False,
  994. ) -> Tuple[int, Dict[bytes, List[bytes]]]:
  995. """GETs a file from a given homeserver
  996. Args:
  997. destination: The remote server to send the HTTP request to.
  998. path: The HTTP path to GET.
  999. output_stream: File to write the response body to.
  1000. args: Optional dictionary used to create the query string.
  1001. ignore_backoff: true to ignore the historical backoff data
  1002. and try the request anyway.
  1003. Returns:
  1004. Resolves with an (int,dict) tuple of
  1005. the file length and a dict of the response headers.
  1006. Raises:
  1007. HttpResponseException: If we get an HTTP response code >= 300
  1008. (except 429).
  1009. NotRetryingDestination: If we are not yet ready to retry this
  1010. server.
  1011. FederationDeniedError: If this destination is not on our
  1012. federation whitelist
  1013. RequestSendFailed: If there were problems connecting to the
  1014. remote, due to e.g. DNS failures, connection timeouts etc.
  1015. """
  1016. request = MatrixFederationRequest(
  1017. method="GET", destination=destination, path=path, query=args
  1018. )
  1019. response = await self._send_request(
  1020. request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
  1021. )
  1022. headers = dict(response.headers.getAllRawHeaders())
  1023. try:
  1024. d = read_body_with_max_size(response, output_stream, max_size)
  1025. d.addTimeout(self.default_timeout, self.reactor)
  1026. length = await make_deferred_yieldable(d)
  1027. except BodyExceededMaxSize:
  1028. msg = "Requested file is too large > %r bytes" % (max_size,)
  1029. logger.warning(
  1030. "{%s} [%s] %s",
  1031. request.txn_id,
  1032. request.destination,
  1033. msg,
  1034. )
  1035. raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
  1036. except defer.TimeoutError as e:
  1037. logger.warning(
  1038. "{%s} [%s] Timed out reading response - %s %s",
  1039. request.txn_id,
  1040. request.destination,
  1041. request.method,
  1042. request.uri.decode("ascii"),
  1043. )
  1044. raise RequestSendFailed(e, can_retry=True) from e
  1045. except ResponseFailed as e:
  1046. logger.warning(
  1047. "{%s} [%s] Failed to read response - %s %s",
  1048. request.txn_id,
  1049. request.destination,
  1050. request.method,
  1051. request.uri.decode("ascii"),
  1052. )
  1053. raise RequestSendFailed(e, can_retry=True) from e
  1054. except Exception as e:
  1055. logger.warning(
  1056. "{%s} [%s] Error reading response: %s",
  1057. request.txn_id,
  1058. request.destination,
  1059. e,
  1060. )
  1061. raise
  1062. logger.info(
  1063. "{%s} [%s] Completed: %d %s [%d bytes] %s %s",
  1064. request.txn_id,
  1065. request.destination,
  1066. response.code,
  1067. response.phrase.decode("ascii", errors="replace"),
  1068. length,
  1069. request.method,
  1070. request.uri.decode("ascii"),
  1071. )
  1072. return length, headers
  1073. def _flatten_response_never_received(e: BaseException) -> str:
  1074. if hasattr(e, "reasons"):
  1075. reasons = ", ".join(
  1076. _flatten_response_never_received(f.value) for f in e.reasons
  1077. )
  1078. return "%s:[%s]" % (type(e).__name__, reasons)
  1079. else:
  1080. return repr(e)
  1081. def check_content_type_is(headers: Headers, expected_content_type: str) -> None:
  1082. """
  1083. Check that a set of HTTP headers have a Content-Type header, and that it
  1084. is the expected value..
  1085. Args:
  1086. headers: headers to check
  1087. Raises:
  1088. RequestSendFailed: if the Content-Type header is missing or doesn't match
  1089. """
  1090. content_type_headers = headers.getRawHeaders(b"Content-Type")
  1091. if content_type_headers is None:
  1092. raise RequestSendFailed(
  1093. RuntimeError("No Content-Type header received from remote server"),
  1094. can_retry=False,
  1095. )
  1096. c_type = content_type_headers[0].decode("ascii") # only the first header
  1097. val, options = cgi.parse_header(c_type)
  1098. if val != expected_content_type:
  1099. raise RequestSendFailed(
  1100. RuntimeError(
  1101. f"Remote server sent Content-Type header of '{c_type}', not '{expected_content_type}'",
  1102. ),
  1103. can_retry=False,
  1104. )