1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057 |
- # -*- coding: utf-8 -*-
- # Copyright 2014-2016 OpenMarket Ltd
- # Copyright 2018 New Vector Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import cgi
- import logging
- import random
- import sys
- import urllib
- from io import BytesIO
- import attr
- import treq
- from canonicaljson import encode_canonical_json
- from prometheus_client import Counter
- from signedjson.sign import sign_json
- from zope.interface import implementer
- from twisted.internet import defer, protocol
- from twisted.internet.error import DNSLookupError
- from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime
- from twisted.internet.task import _EPSILON, Cooperator
- from twisted.web._newclient import ResponseDone
- from twisted.web.http_headers import Headers
- from twisted.web.iweb import IResponse
- import synapse.metrics
- import synapse.util.retryutils
- from synapse.api.errors import (
- Codes,
- FederationDeniedError,
- HttpResponseException,
- RequestSendFailed,
- SynapseError,
- )
- from synapse.http import QuieterFileBodyProducer
- from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver
- from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
- from synapse.logging.context import make_deferred_yieldable
- from synapse.logging.opentracing import (
- inject_active_span_byte_dict,
- set_tag,
- start_active_span,
- tags,
- )
- from synapse.util.async_helpers import timeout_deferred
- from synapse.util.metrics import Measure
- logger = logging.getLogger(__name__)
- outgoing_requests_counter = Counter(
- "synapse_http_matrixfederationclient_requests", "", ["method"]
- )
- incoming_responses_counter = Counter(
- "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
- )
- MAX_LONG_RETRIES = 10
- MAX_SHORT_RETRIES = 3
- MAXINT = sys.maxsize
- _next_id = 1
- @attr.s(frozen=True)
- class MatrixFederationRequest:
- method = attr.ib()
- """HTTP method
- :type: str
- """
- path = attr.ib()
- """HTTP path
- :type: str
- """
- destination = attr.ib()
- """The remote server to send the HTTP request to.
- :type: str"""
- json = attr.ib(default=None)
- """JSON to send in the body.
- :type: dict|None
- """
- json_callback = attr.ib(default=None)
- """A callback to generate the JSON.
- :type: func|None
- """
- query = attr.ib(default=None)
- """Query arguments.
- :type: dict|None
- """
- txn_id = attr.ib(default=None)
- """Unique ID for this request (for logging)
- :type: str|None
- """
- uri = attr.ib(init=False, type=bytes)
- """The URI of this request
- """
- def __attrs_post_init__(self):
- global _next_id
- txn_id = "%s-O-%s" % (self.method, _next_id)
- _next_id = (_next_id + 1) % (MAXINT - 1)
- object.__setattr__(self, "txn_id", txn_id)
- destination_bytes = self.destination.encode("ascii")
- path_bytes = self.path.encode("ascii")
- if self.query:
- query_bytes = encode_query_args(self.query)
- else:
- query_bytes = b""
- # The object is frozen so we can pre-compute this.
- uri = urllib.parse.urlunparse(
- (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"")
- )
- object.__setattr__(self, "uri", uri)
- def get_json(self):
- if self.json_callback:
- return self.json_callback()
- return self.json
- async def _handle_json_response(
- reactor: IReactorTime,
- timeout_sec: float,
- request: MatrixFederationRequest,
- response: IResponse,
- start_ms: int,
- ):
- """
- Reads the JSON body of a response, with a timeout
- Args:
- reactor: twisted reactor, for the timeout
- timeout_sec: number of seconds to wait for response to complete
- request: the request that triggered the response
- response: response to the request
- start_ms: Timestamp when request was made
- Returns:
- dict: parsed JSON response
- """
- try:
- check_content_type_is_json(response.headers)
- d = treq.json_content(response)
- d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
- body = await make_deferred_yieldable(d)
- except TimeoutError as e:
- logger.warning(
- "{%s} [%s] Timed out reading response - %s %s",
- request.txn_id,
- request.destination,
- request.method,
- request.uri.decode("ascii"),
- )
- raise RequestSendFailed(e, can_retry=True) from e
- except Exception as e:
- logger.warning(
- "{%s} [%s] Error reading response %s %s: %s",
- request.txn_id,
- request.destination,
- request.method,
- request.uri.decode("ascii"),
- e,
- )
- raise
- time_taken_secs = reactor.seconds() - start_ms / 1000
- logger.info(
- "{%s} [%s] Completed request: %d %s in %.2f secs - %s %s",
- request.txn_id,
- request.destination,
- response.code,
- response.phrase.decode("ascii", errors="replace"),
- time_taken_secs,
- request.method,
- request.uri.decode("ascii"),
- )
- return body
- class MatrixFederationHttpClient:
- """HTTP client used to talk to other homeservers over the federation
- protocol. Send client certificates and signs requests.
- Attributes:
- agent (twisted.web.client.Agent): The twisted Agent used to send the
- requests.
- """
- def __init__(self, hs, tls_client_options_factory):
- self.hs = hs
- self.signing_key = hs.signing_key
- self.server_name = hs.hostname
- real_reactor = hs.get_reactor()
- # We need to use a DNS resolver which filters out blacklisted IP
- # addresses, to prevent DNS rebinding.
- nameResolver = IPBlacklistingResolver(
- real_reactor, None, hs.config.federation_ip_range_blacklist
- )
- @implementer(IReactorPluggableNameResolver)
- class Reactor:
- def __getattr__(_self, attr):
- if attr == "nameResolver":
- return nameResolver
- else:
- return getattr(real_reactor, attr)
- self.reactor = Reactor()
- user_agent = hs.version_string
- if hs.config.user_agent_suffix:
- user_agent = "%s %s" % (user_agent, hs.config.user_agent_suffix)
- user_agent = user_agent.encode("ascii")
- self.agent = MatrixFederationAgent(
- self.reactor, tls_client_options_factory, user_agent
- )
- # Use a BlacklistingAgentWrapper to prevent circumventing the IP
- # blacklist via IP literals in server names
- self.agent = BlacklistingAgentWrapper(
- self.agent,
- self.reactor,
- ip_blacklist=hs.config.federation_ip_range_blacklist,
- )
- self.clock = hs.get_clock()
- self._store = hs.get_datastore()
- self.version_string_bytes = hs.version_string.encode("ascii")
- self.default_timeout = 60
- def schedule(x):
- self.reactor.callLater(_EPSILON, x)
- self._cooperator = Cooperator(scheduler=schedule)
- async def _send_request_with_optional_trailing_slash(
- self, request, try_trailing_slash_on_400=False, **send_request_args
- ):
- """Wrapper for _send_request which can optionally retry the request
- upon receiving a combination of a 400 HTTP response code and a
- 'M_UNRECOGNIZED' errcode. This is a workaround for Synapse <= v0.99.3
- due to #3622.
- Args:
- request (MatrixFederationRequest): details of request to be sent
- try_trailing_slash_on_400 (bool): Whether on receiving a 400
- 'M_UNRECOGNIZED' from the server to retry the request with a
- trailing slash appended to the request path.
- send_request_args (Dict): A dictionary of arguments to pass to
- `_send_request()`.
- Raises:
- HttpResponseException: If we get an HTTP response code >= 300
- (except 429).
- Returns:
- Dict: Parsed JSON response body.
- """
- try:
- response = await self._send_request(request, **send_request_args)
- except HttpResponseException as e:
- # Received an HTTP error > 300. Check if it meets the requirements
- # to retry with a trailing slash
- if not try_trailing_slash_on_400:
- raise
- if e.code != 400 or e.to_synapse_error().errcode != "M_UNRECOGNIZED":
- raise
- # Retry with a trailing slash if we received a 400 with
- # 'M_UNRECOGNIZED' which some endpoints can return when omitting a
- # trailing slash on Synapse <= v0.99.3.
- logger.info("Retrying request with trailing slash")
- # Request is frozen so we create a new instance
- request = attr.evolve(request, path=request.path + "/")
- response = await self._send_request(request, **send_request_args)
- return response
- async def _send_request(
- self,
- request,
- retry_on_dns_fail=True,
- timeout=None,
- long_retries=False,
- ignore_backoff=False,
- backoff_on_404=False,
- ):
- """
- Sends a request to the given server.
- Args:
- request (MatrixFederationRequest): details of request to be sent
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
- 60s by default.
- long_retries (bool): whether to use the long retry algorithm.
- The regular retry algorithm makes 4 attempts, with intervals
- [0.5s, 1s, 2s].
- The long retry algorithm makes 11 attempts, with intervals
- [4s, 16s, 60s, 60s, ...]
- Both algorithms add -20%/+40% jitter to the retry intervals.
- Note that the above intervals are *in addition* to the time spent
- waiting for the request to complete (up to `timeout` ms).
- NB: the long retry algorithm takes over 20 minutes to complete, with
- a default timeout of 60s!
- ignore_backoff (bool): true to ignore the historical backoff data
- and try the request anyway.
- backoff_on_404 (bool): Back off if we get a 404
- Returns:
- twisted.web.client.Response: resolves with the HTTP
- response object on success.
- Raises:
- HttpResponseException: If we get an HTTP response code >= 300
- (except 429).
- NotRetryingDestination: If we are not yet ready to retry this
- server.
- FederationDeniedError: If this destination is not on our
- federation whitelist
- RequestSendFailed: If there were problems connecting to the
- remote, due to e.g. DNS failures, connection timeouts etc.
- """
- if timeout:
- _sec_timeout = timeout / 1000
- else:
- _sec_timeout = self.default_timeout
- if (
- self.hs.config.federation_domain_whitelist is not None
- and request.destination not in self.hs.config.federation_domain_whitelist
- ):
- raise FederationDeniedError(request.destination)
- limiter = await synapse.util.retryutils.get_retry_limiter(
- request.destination,
- self.clock,
- self._store,
- backoff_on_404=backoff_on_404,
- ignore_backoff=ignore_backoff,
- )
- method_bytes = request.method.encode("ascii")
- destination_bytes = request.destination.encode("ascii")
- path_bytes = request.path.encode("ascii")
- if request.query:
- query_bytes = encode_query_args(request.query)
- else:
- query_bytes = b""
- scope = start_active_span(
- "outgoing-federation-request",
- tags={
- tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
- tags.PEER_ADDRESS: request.destination,
- tags.HTTP_METHOD: request.method,
- tags.HTTP_URL: request.path,
- },
- finish_on_close=True,
- )
- # Inject the span into the headers
- headers_dict = {}
- inject_active_span_byte_dict(headers_dict, request.destination)
- headers_dict[b"User-Agent"] = [self.version_string_bytes]
- with limiter, scope:
- # XXX: Would be much nicer to retry only at the transaction-layer
- # (once we have reliable transactions in place)
- if long_retries:
- retries_left = MAX_LONG_RETRIES
- else:
- retries_left = MAX_SHORT_RETRIES
- url_bytes = request.uri
- url_str = url_bytes.decode("ascii")
- url_to_sign_bytes = urllib.parse.urlunparse(
- (b"", b"", path_bytes, None, query_bytes, b"")
- )
- while True:
- try:
- json = request.get_json()
- if json:
- headers_dict[b"Content-Type"] = [b"application/json"]
- auth_headers = self.build_auth_headers(
- destination_bytes, method_bytes, url_to_sign_bytes, json
- )
- data = encode_canonical_json(json)
- producer = QuieterFileBodyProducer(
- BytesIO(data), cooperator=self._cooperator
- )
- else:
- producer = None
- auth_headers = self.build_auth_headers(
- destination_bytes, method_bytes, url_to_sign_bytes
- )
- headers_dict[b"Authorization"] = auth_headers
- logger.debug(
- "{%s} [%s] Sending request: %s %s; timeout %fs",
- request.txn_id,
- request.destination,
- request.method,
- url_str,
- _sec_timeout,
- )
- outgoing_requests_counter.labels(request.method).inc()
- try:
- with Measure(self.clock, "outbound_request"):
- # we don't want all the fancy cookie and redirect handling
- # that treq.request gives: just use the raw Agent.
- request_deferred = self.agent.request(
- method_bytes,
- url_bytes,
- headers=Headers(headers_dict),
- bodyProducer=producer,
- )
- request_deferred = timeout_deferred(
- request_deferred,
- timeout=_sec_timeout,
- reactor=self.reactor,
- )
- response = await request_deferred
- except TimeoutError as e:
- raise RequestSendFailed(e, can_retry=True) from e
- except DNSLookupError as e:
- raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
- except Exception as e:
- raise RequestSendFailed(e, can_retry=True) from e
- incoming_responses_counter.labels(
- request.method, response.code
- ).inc()
- set_tag(tags.HTTP_STATUS_CODE, response.code)
- response_phrase = response.phrase.decode("ascii", errors="replace")
- if 200 <= response.code < 300:
- logger.debug(
- "{%s} [%s] Got response headers: %d %s",
- request.txn_id,
- request.destination,
- response.code,
- response_phrase,
- )
- pass
- else:
- logger.info(
- "{%s} [%s] Got response headers: %d %s",
- request.txn_id,
- request.destination,
- response.code,
- response_phrase,
- )
- # :'(
- # Update transactions table?
- d = treq.content(response)
- d = timeout_deferred(
- d, timeout=_sec_timeout, reactor=self.reactor
- )
- try:
- body = await make_deferred_yieldable(d)
- except Exception as e:
- # Eh, we're already going to raise an exception so lets
- # ignore if this fails.
- logger.warning(
- "{%s} [%s] Failed to get error response: %s %s: %s",
- request.txn_id,
- request.destination,
- request.method,
- url_str,
- _flatten_response_never_received(e),
- )
- body = None
- e = HttpResponseException(response.code, response_phrase, body)
- # Retry if the error is a 429 (Too Many Requests),
- # otherwise just raise a standard HttpResponseException
- if response.code == 429:
- raise RequestSendFailed(e, can_retry=True) from e
- else:
- raise e
- break
- except RequestSendFailed as e:
- logger.info(
- "{%s} [%s] Request failed: %s %s: %s",
- request.txn_id,
- request.destination,
- request.method,
- url_str,
- _flatten_response_never_received(e.inner_exception),
- )
- if not e.can_retry:
- raise
- if retries_left and not timeout:
- if long_retries:
- delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
- delay = min(delay, 60)
- delay *= random.uniform(0.8, 1.4)
- else:
- delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
- delay = min(delay, 2)
- delay *= random.uniform(0.8, 1.4)
- logger.debug(
- "{%s} [%s] Waiting %ss before re-sending...",
- request.txn_id,
- request.destination,
- delay,
- )
- await self.clock.sleep(delay)
- retries_left -= 1
- else:
- raise
- except Exception as e:
- logger.warning(
- "{%s} [%s] Request failed: %s %s: %s",
- request.txn_id,
- request.destination,
- request.method,
- url_str,
- _flatten_response_never_received(e),
- )
- raise
- return response
- def build_auth_headers(
- self, destination, method, url_bytes, content=None, destination_is=None
- ):
- """
- Builds the Authorization headers for a federation request
- Args:
- destination (bytes|None): The desination homeserver of the request.
- May be None if the destination is an identity server, in which case
- destination_is must be non-None.
- method (bytes): The HTTP method of the request
- url_bytes (bytes): The URI path of the request
- content (object): The body of the request
- destination_is (bytes): As 'destination', but if the destination is an
- identity server
- Returns:
- list[bytes]: a list of headers to be added as "Authorization:" headers
- """
- request = {
- "method": method.decode("ascii"),
- "uri": url_bytes.decode("ascii"),
- "origin": self.server_name,
- }
- if destination is not None:
- request["destination"] = destination.decode("ascii")
- if destination_is is not None:
- request["destination_is"] = destination_is.decode("ascii")
- if content is not None:
- request["content"] = content
- request = sign_json(request, self.server_name, self.signing_key)
- auth_headers = []
- for key, sig in request["signatures"][self.server_name].items():
- auth_headers.append(
- (
- 'X-Matrix origin=%s,key="%s",sig="%s"'
- % (self.server_name, key, sig)
- ).encode("ascii")
- )
- return auth_headers
- async def put_json(
- self,
- destination,
- path,
- args={},
- data={},
- json_data_callback=None,
- long_retries=False,
- timeout=None,
- ignore_backoff=False,
- backoff_on_404=False,
- try_trailing_slash_on_400=False,
- ):
- """ Sends the specifed json data using PUT
- Args:
- destination (str): The remote server to send the HTTP request
- to.
- path (str): The HTTP path.
- args (dict): query params
- data (dict): A dict containing the data that will be used as
- the request body. This will be encoded as JSON.
- json_data_callback (callable): A callable returning the dict to
- use as the request body.
- long_retries (bool): whether to use the long retry algorithm. See
- docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
- self._default_timeout (60s) by default.
- ignore_backoff (bool): true to ignore the historical backoff data
- and try the request anyway.
- backoff_on_404 (bool): True if we should count a 404 response as
- a failure of the server (and should therefore back off future
- requests).
- try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
- response we should try appending a trailing slash to the end
- of the request. Workaround for #3622 in Synapse <= v0.99.3. This
- will be attempted before backing off if backing off has been
- enabled.
- Returns:
- dict|list: Succeeds when we get a 2xx HTTP response. The
- result will be the decoded JSON body.
- Raises:
- HttpResponseException: If we get an HTTP response code >= 300
- (except 429).
- NotRetryingDestination: If we are not yet ready to retry this
- server.
- FederationDeniedError: If this destination is not on our
- federation whitelist
- RequestSendFailed: If there were problems connecting to the
- remote, due to e.g. DNS failures, connection timeouts etc.
- """
- request = MatrixFederationRequest(
- method="PUT",
- destination=destination,
- path=path,
- query=args,
- json_callback=json_data_callback,
- json=data,
- )
- start_ms = self.clock.time_msec()
- response = await self._send_request_with_optional_trailing_slash(
- request,
- try_trailing_slash_on_400,
- backoff_on_404=backoff_on_404,
- ignore_backoff=ignore_backoff,
- long_retries=long_retries,
- timeout=timeout,
- )
- body = await _handle_json_response(
- self.reactor, self.default_timeout, request, response, start_ms
- )
- return body
- async def post_json(
- self,
- destination,
- path,
- data={},
- long_retries=False,
- timeout=None,
- ignore_backoff=False,
- args={},
- ):
- """ Sends the specifed json data using POST
- Args:
- destination (str): The remote server to send the HTTP request
- to.
- path (str): The HTTP path.
- data (dict): A dict containing the data that will be used as
- the request body. This will be encoded as JSON.
- long_retries (bool): whether to use the long retry algorithm. See
- docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
- self._default_timeout (60s) by default.
- ignore_backoff (bool): true to ignore the historical backoff data and
- try the request anyway.
- args (dict): query params
- Returns:
- dict|list: Succeeds when we get a 2xx HTTP response. The
- result will be the decoded JSON body.
- Raises:
- HttpResponseException: If we get an HTTP response code >= 300
- (except 429).
- NotRetryingDestination: If we are not yet ready to retry this
- server.
- FederationDeniedError: If this destination is not on our
- federation whitelist
- RequestSendFailed: If there were problems connecting to the
- remote, due to e.g. DNS failures, connection timeouts etc.
- """
- request = MatrixFederationRequest(
- method="POST", destination=destination, path=path, query=args, json=data
- )
- start_ms = self.clock.time_msec()
- response = await self._send_request(
- request,
- long_retries=long_retries,
- timeout=timeout,
- ignore_backoff=ignore_backoff,
- )
- if timeout:
- _sec_timeout = timeout / 1000
- else:
- _sec_timeout = self.default_timeout
- body = await _handle_json_response(
- self.reactor, _sec_timeout, request, response, start_ms,
- )
- return body
- async def get_json(
- self,
- destination,
- path,
- args=None,
- retry_on_dns_fail=True,
- timeout=None,
- ignore_backoff=False,
- try_trailing_slash_on_400=False,
- ):
- """ GETs some json from the given host homeserver and path
- Args:
- destination (str): The remote server to send the HTTP request
- to.
- path (str): The HTTP path.
- args (dict|None): A dictionary used to create query strings, defaults to
- None.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
- self._default_timeout (60s) by default.
- ignore_backoff (bool): true to ignore the historical backoff data
- and try the request anyway.
- try_trailing_slash_on_400 (bool): True if on a 400 M_UNRECOGNIZED
- response we should try appending a trailing slash to the end of
- the request. Workaround for #3622 in Synapse <= v0.99.3.
- Returns:
- dict|list: Succeeds when we get a 2xx HTTP response. The
- result will be the decoded JSON body.
- Raises:
- HttpResponseException: If we get an HTTP response code >= 300
- (except 429).
- NotRetryingDestination: If we are not yet ready to retry this
- server.
- FederationDeniedError: If this destination is not on our
- federation whitelist
- RequestSendFailed: If there were problems connecting to the
- remote, due to e.g. DNS failures, connection timeouts etc.
- """
- request = MatrixFederationRequest(
- method="GET", destination=destination, path=path, query=args
- )
- start_ms = self.clock.time_msec()
- response = await self._send_request_with_optional_trailing_slash(
- request,
- try_trailing_slash_on_400,
- backoff_on_404=False,
- ignore_backoff=ignore_backoff,
- retry_on_dns_fail=retry_on_dns_fail,
- timeout=timeout,
- )
- body = await _handle_json_response(
- self.reactor, self.default_timeout, request, response, start_ms
- )
- return body
- async def delete_json(
- self,
- destination,
- path,
- long_retries=False,
- timeout=None,
- ignore_backoff=False,
- args={},
- ):
- """Send a DELETE request to the remote expecting some json response
- Args:
- destination (str): The remote server to send the HTTP request
- to.
- path (str): The HTTP path.
- long_retries (bool): whether to use the long retry algorithm. See
- docs on _send_request for details.
- timeout (int|None): number of milliseconds to wait for the response headers
- (including connecting to the server), *for each attempt*.
- self._default_timeout (60s) by default.
- ignore_backoff (bool): true to ignore the historical backoff data and
- try the request anyway.
- args (dict): query params
- Returns:
- dict|list: Succeeds when we get a 2xx HTTP response. The
- result will be the decoded JSON body.
- Raises:
- HttpResponseException: If we get an HTTP response code >= 300
- (except 429).
- NotRetryingDestination: If we are not yet ready to retry this
- server.
- FederationDeniedError: If this destination is not on our
- federation whitelist
- RequestSendFailed: If there were problems connecting to the
- remote, due to e.g. DNS failures, connection timeouts etc.
- """
- request = MatrixFederationRequest(
- method="DELETE", destination=destination, path=path, query=args
- )
- start_ms = self.clock.time_msec()
- response = await self._send_request(
- request,
- long_retries=long_retries,
- timeout=timeout,
- ignore_backoff=ignore_backoff,
- )
- body = await _handle_json_response(
- self.reactor, self.default_timeout, request, response, start_ms
- )
- return body
- async def get_file(
- self,
- destination,
- path,
- output_stream,
- args={},
- retry_on_dns_fail=True,
- max_size=None,
- ignore_backoff=False,
- ):
- """GETs a file from a given homeserver
- Args:
- destination (str): The remote server to send the HTTP request to.
- path (str): The HTTP path to GET.
- output_stream (file): File to write the response body to.
- args (dict): Optional dictionary used to create the query string.
- ignore_backoff (bool): true to ignore the historical backoff data
- and try the request anyway.
- Returns:
- tuple[int, dict]: Resolves with an (int,dict) tuple of
- the file length and a dict of the response headers.
- Raises:
- HttpResponseException: If we get an HTTP response code >= 300
- (except 429).
- NotRetryingDestination: If we are not yet ready to retry this
- server.
- FederationDeniedError: If this destination is not on our
- federation whitelist
- RequestSendFailed: If there were problems connecting to the
- remote, due to e.g. DNS failures, connection timeouts etc.
- """
- request = MatrixFederationRequest(
- method="GET", destination=destination, path=path, query=args
- )
- response = await self._send_request(
- request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff
- )
- headers = dict(response.headers.getAllRawHeaders())
- try:
- d = _readBodyToFile(response, output_stream, max_size)
- d.addTimeout(self.default_timeout, self.reactor)
- length = await make_deferred_yieldable(d)
- except Exception as e:
- logger.warning(
- "{%s} [%s] Error reading response: %s",
- request.txn_id,
- request.destination,
- e,
- )
- raise
- logger.info(
- "{%s} [%s] Completed: %d %s [%d bytes] %s %s",
- request.txn_id,
- request.destination,
- response.code,
- response.phrase.decode("ascii", errors="replace"),
- length,
- request.method,
- request.uri.decode("ascii"),
- )
- return (length, headers)
- class _ReadBodyToFileProtocol(protocol.Protocol):
- def __init__(self, stream, deferred, max_size):
- self.stream = stream
- self.deferred = deferred
- self.length = 0
- self.max_size = max_size
- def dataReceived(self, data):
- self.stream.write(data)
- self.length += len(data)
- if self.max_size is not None and self.length >= self.max_size:
- self.deferred.errback(
- SynapseError(
- 502,
- "Requested file is too large > %r bytes" % (self.max_size,),
- Codes.TOO_LARGE,
- )
- )
- self.deferred = defer.Deferred()
- self.transport.loseConnection()
- def connectionLost(self, reason):
- if reason.check(ResponseDone):
- self.deferred.callback(self.length)
- else:
- self.deferred.errback(reason)
- def _readBodyToFile(response, stream, max_size):
- d = defer.Deferred()
- response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
- return d
- def _flatten_response_never_received(e):
- if hasattr(e, "reasons"):
- reasons = ", ".join(
- _flatten_response_never_received(f.value) for f in e.reasons
- )
- return "%s:[%s]" % (type(e).__name__, reasons)
- else:
- return repr(e)
- def check_content_type_is_json(headers):
- """
- Check that a set of HTTP headers have a Content-Type header, and that it
- is application/json.
- Args:
- headers (twisted.web.http_headers.Headers): headers to check
- Raises:
- RequestSendFailed: if the Content-Type header is missing or isn't JSON
- """
- c_type = headers.getRawHeaders(b"Content-Type")
- if c_type is None:
- raise RequestSendFailed(RuntimeError("No Content-Type header"), can_retry=False)
- c_type = c_type[0].decode("ascii") # only the first header
- val, options = cgi.parse_header(c_type)
- if val != "application/json":
- raise RequestSendFailed(
- RuntimeError("Content-Type not application/json: was '%s'" % c_type),
- can_retry=False,
- )
- def encode_query_args(args):
- if args is None:
- return b""
- encoded_args = {}
- for k, vs in args.items():
- if isinstance(vs, str):
- vs = [vs]
- encoded_args[k] = [v.encode("UTF-8") for v in vs]
- query_bytes = urllib.parse.urlencode(encoded_args, True)
- return query_bytes.encode("utf8")
|