|
@@ -19,7 +19,7 @@ import random
|
|
|
import sys
|
|
|
from io import BytesIO
|
|
|
|
|
|
-from six import PY3, string_types
|
|
|
+from six import PY3, raise_from, string_types
|
|
|
from six.moves import urllib
|
|
|
|
|
|
import attr
|
|
@@ -41,6 +41,7 @@ from synapse.api.errors import (
|
|
|
Codes,
|
|
|
FederationDeniedError,
|
|
|
HttpResponseException,
|
|
|
+ RequestSendFailed,
|
|
|
SynapseError,
|
|
|
)
|
|
|
from synapse.http.endpoint import matrix_federation_endpoint
|
|
@@ -228,19 +229,18 @@ class MatrixFederationHttpClient(object):
|
|
|
backoff_on_404 (bool): Back off if we get a 404
|
|
|
|
|
|
Returns:
|
|
|
- Deferred: resolves with the http response object on success.
|
|
|
-
|
|
|
- Fails with ``HttpResponseException``: if we get an HTTP response
|
|
|
- code >= 300.
|
|
|
-
|
|
|
- Fails with ``NotRetryingDestination`` if we are not yet ready
|
|
|
- to retry this server.
|
|
|
-
|
|
|
- Fails with ``FederationDeniedError`` if this destination
|
|
|
- is not on our federation whitelist
|
|
|
-
|
|
|
- (May also fail with plenty of other Exceptions for things like DNS
|
|
|
- failures, connection failures, SSL failures.)
|
|
|
+ Deferred[twisted.web.client.Response]: resolves with the HTTP
|
|
|
+ response object on success.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ HttpResponseException: If we get an HTTP response code >= 300
|
|
|
+ (except 429).
|
|
|
+ NotRetryingDestination: If we are not yet ready to retry this
|
|
|
+ server.
|
|
|
+ FederationDeniedError: If this destination is not on our
|
|
|
+ federation whitelist
|
|
|
+ RequestSendFailed: If there were problems connecting to the
|
|
|
+ remote, due to e.g. DNS failures, connection timeouts etc.
|
|
|
"""
|
|
|
if timeout:
|
|
|
_sec_timeout = timeout / 1000
|
|
@@ -298,9 +298,9 @@ class MatrixFederationHttpClient(object):
|
|
|
json = request.get_json()
|
|
|
if json:
|
|
|
headers_dict[b"Content-Type"] = [b"application/json"]
|
|
|
- self.sign_request(
|
|
|
+ auth_headers = self.build_auth_headers(
|
|
|
destination_bytes, method_bytes, url_to_sign_bytes,
|
|
|
- headers_dict, json,
|
|
|
+ json,
|
|
|
)
|
|
|
data = encode_canonical_json(json)
|
|
|
producer = FileBodyProducer(
|
|
@@ -309,11 +309,12 @@ class MatrixFederationHttpClient(object):
|
|
|
)
|
|
|
else:
|
|
|
producer = None
|
|
|
- self.sign_request(
|
|
|
+ auth_headers = self.build_auth_headers(
|
|
|
destination_bytes, method_bytes, url_to_sign_bytes,
|
|
|
- headers_dict,
|
|
|
)
|
|
|
|
|
|
+ headers_dict[b"Authorization"] = auth_headers
|
|
|
+
|
|
|
logger.info(
|
|
|
"{%s} [%s] Sending request: %s %s",
|
|
|
request.txn_id, request.destination, request.method,
|
|
@@ -335,23 +336,74 @@ class MatrixFederationHttpClient(object):
|
|
|
reactor=self.hs.get_reactor(),
|
|
|
)
|
|
|
|
|
|
- with Measure(self.clock, "outbound_request"):
|
|
|
- response = yield make_deferred_yieldable(
|
|
|
- request_deferred,
|
|
|
+ try:
|
|
|
+ with Measure(self.clock, "outbound_request"):
|
|
|
+ response = yield make_deferred_yieldable(
|
|
|
+ request_deferred,
|
|
|
+ )
|
|
|
+ except DNSLookupError as e:
|
|
|
+ raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
|
|
|
+ except Exception as e:
|
|
|
+ raise_from(RequestSendFailed(e, can_retry=True), e)
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ "{%s} [%s] Got response headers: %d %s",
|
|
|
+ request.txn_id,
|
|
|
+ request.destination,
|
|
|
+ response.code,
|
|
|
+ response.phrase.decode('ascii', errors='replace'),
|
|
|
+ )
|
|
|
+
|
|
|
+ if 200 <= response.code < 300:
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ # :'(
|
|
|
+ # Update transactions table?
|
|
|
+ d = treq.content(response)
|
|
|
+ d = timeout_deferred(
|
|
|
+ d,
|
|
|
+ timeout=_sec_timeout,
|
|
|
+ reactor=self.hs.get_reactor(),
|
|
|
+ )
|
|
|
+
|
|
|
+ try:
|
|
|
+ body = yield make_deferred_yieldable(d)
|
|
|
+ except Exception as e:
|
|
|
+ # Eh, we're already going to raise an exception so lets
|
|
|
+ # ignore if this fails.
|
|
|
+ logger.warn(
|
|
|
+ "{%s} [%s] Failed to get error response: %s %s: %s",
|
|
|
+ request.txn_id,
|
|
|
+ request.destination,
|
|
|
+ request.method,
|
|
|
+ url_str,
|
|
|
+ _flatten_response_never_received(e),
|
|
|
+ )
|
|
|
+ body = None
|
|
|
+
|
|
|
+ e = HttpResponseException(
|
|
|
+ response.code, response.phrase, body
|
|
|
)
|
|
|
|
|
|
+ # Retry if the error is a 429 (Too Many Requests),
|
|
|
+ # otherwise just raise a standard HttpResponseException
|
|
|
+ if response.code == 429:
|
|
|
+ raise_from(RequestSendFailed(e, can_retry=True), e)
|
|
|
+ else:
|
|
|
+ raise e
|
|
|
+
|
|
|
break
|
|
|
- except Exception as e:
|
|
|
+ except RequestSendFailed as e:
|
|
|
logger.warn(
|
|
|
"{%s} [%s] Request failed: %s %s: %s",
|
|
|
request.txn_id,
|
|
|
request.destination,
|
|
|
request.method,
|
|
|
url_str,
|
|
|
- _flatten_response_never_received(e),
|
|
|
+ _flatten_response_never_received(e.inner_exception),
|
|
|
)
|
|
|
|
|
|
- if not retry_on_dns_fail and isinstance(e, DNSLookupError):
|
|
|
+ if not e.can_retry:
|
|
|
raise
|
|
|
|
|
|
if retries_left and not timeout:
|
|
@@ -376,50 +428,36 @@ class MatrixFederationHttpClient(object):
|
|
|
else:
|
|
|
raise
|
|
|
|
|
|
- logger.info(
|
|
|
- "{%s} [%s] Got response headers: %d %s",
|
|
|
- request.txn_id,
|
|
|
- request.destination,
|
|
|
- response.code,
|
|
|
- response.phrase.decode('ascii', errors='replace'),
|
|
|
- )
|
|
|
-
|
|
|
- if 200 <= response.code < 300:
|
|
|
- pass
|
|
|
- else:
|
|
|
- # :'(
|
|
|
- # Update transactions table?
|
|
|
- d = treq.content(response)
|
|
|
- d = timeout_deferred(
|
|
|
- d,
|
|
|
- timeout=_sec_timeout,
|
|
|
- reactor=self.hs.get_reactor(),
|
|
|
- )
|
|
|
- body = yield make_deferred_yieldable(d)
|
|
|
- raise HttpResponseException(
|
|
|
- response.code, response.phrase, body
|
|
|
- )
|
|
|
+ except Exception as e:
|
|
|
+ logger.warn(
|
|
|
+ "{%s} [%s] Request failed: %s %s: %s",
|
|
|
+ request.txn_id,
|
|
|
+ request.destination,
|
|
|
+ request.method,
|
|
|
+ url_str,
|
|
|
+ _flatten_response_never_received(e),
|
|
|
+ )
|
|
|
+ raise
|
|
|
|
|
|
defer.returnValue(response)
|
|
|
|
|
|
- def sign_request(self, destination, method, url_bytes, headers_dict,
|
|
|
- content=None, destination_is=None):
|
|
|
+ def build_auth_headers(
|
|
|
+ self, destination, method, url_bytes, content=None, destination_is=None,
|
|
|
+ ):
|
|
|
"""
|
|
|
- Signs a request by adding an Authorization header to headers_dict
|
|
|
+ Builds the Authorization headers for a federation request
|
|
|
Args:
|
|
|
destination (bytes|None): The desination home server of the request.
|
|
|
May be None if the destination is an identity server, in which case
|
|
|
destination_is must be non-None.
|
|
|
method (bytes): The HTTP method of the request
|
|
|
url_bytes (bytes): The URI path of the request
|
|
|
- headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to
|
|
|
- append to
|
|
|
content (object): The body of the request
|
|
|
destination_is (bytes): As 'destination', but if the destination is an
|
|
|
identity server
|
|
|
|
|
|
Returns:
|
|
|
- None
|
|
|
+ list[bytes]: a list of headers to be added as "Authorization:" headers
|
|
|
"""
|
|
|
request = {
|
|
|
"method": method,
|
|
@@ -446,8 +484,7 @@ class MatrixFederationHttpClient(object):
|
|
|
self.server_name, key, sig,
|
|
|
)).encode('ascii')
|
|
|
)
|
|
|
-
|
|
|
- headers_dict[b"Authorization"] = auth_headers
|
|
|
+ return auth_headers
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
def put_json(self, destination, path, args={}, data={},
|
|
@@ -477,17 +514,18 @@ class MatrixFederationHttpClient(object):
|
|
|
requests)
|
|
|
|
|
|
Returns:
|
|
|
- Deferred: Succeeds when we get a 2xx HTTP response. The result
|
|
|
- will be the decoded JSON body.
|
|
|
-
|
|
|
- Fails with ``HttpResponseException`` if we get an HTTP response
|
|
|
- code >= 300.
|
|
|
-
|
|
|
- Fails with ``NotRetryingDestination`` if we are not yet ready
|
|
|
- to retry this server.
|
|
|
-
|
|
|
- Fails with ``FederationDeniedError`` if this destination
|
|
|
- is not on our federation whitelist
|
|
|
+ Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
|
|
|
+ result will be the decoded JSON body.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ HttpResponseException: If we get an HTTP response code >= 300
|
|
|
+ (except 429).
|
|
|
+ NotRetryingDestination: If we are not yet ready to retry this
|
|
|
+ server.
|
|
|
+ FederationDeniedError: If this destination is not on our
|
|
|
+ federation whitelist
|
|
|
+ RequestSendFailed: If there were problems connecting to the
|
|
|
+ remote, due to e.g. DNS failures, connection timeouts etc.
|
|
|
"""
|
|
|
|
|
|
request = MatrixFederationRequest(
|
|
@@ -531,17 +569,18 @@ class MatrixFederationHttpClient(object):
|
|
|
try the request anyway.
|
|
|
args (dict): query params
|
|
|
Returns:
|
|
|
- Deferred: Succeeds when we get a 2xx HTTP response. The result
|
|
|
- will be the decoded JSON body.
|
|
|
-
|
|
|
- Fails with ``HttpResponseException`` if we get an HTTP response
|
|
|
- code >= 300.
|
|
|
-
|
|
|
- Fails with ``NotRetryingDestination`` if we are not yet ready
|
|
|
- to retry this server.
|
|
|
-
|
|
|
- Fails with ``FederationDeniedError`` if this destination
|
|
|
- is not on our federation whitelist
|
|
|
+ Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
|
|
|
+ result will be the decoded JSON body.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ HttpResponseException: If we get an HTTP response code >= 300
|
|
|
+ (except 429).
|
|
|
+ NotRetryingDestination: If we are not yet ready to retry this
|
|
|
+ server.
|
|
|
+ FederationDeniedError: If this destination is not on our
|
|
|
+ federation whitelist
|
|
|
+ RequestSendFailed: If there were problems connecting to the
|
|
|
+ remote, due to e.g. DNS failures, connection timeouts etc.
|
|
|
"""
|
|
|
|
|
|
request = MatrixFederationRequest(
|
|
@@ -586,17 +625,18 @@ class MatrixFederationHttpClient(object):
|
|
|
ignore_backoff (bool): true to ignore the historical backoff data
|
|
|
and try the request anyway.
|
|
|
Returns:
|
|
|
- Deferred: Succeeds when we get a 2xx HTTP response. The result
|
|
|
- will be the decoded JSON body.
|
|
|
-
|
|
|
- Fails with ``HttpResponseException`` if we get an HTTP response
|
|
|
- code >= 300.
|
|
|
-
|
|
|
- Fails with ``NotRetryingDestination`` if we are not yet ready
|
|
|
- to retry this server.
|
|
|
-
|
|
|
- Fails with ``FederationDeniedError`` if this destination
|
|
|
- is not on our federation whitelist
|
|
|
+ Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
|
|
|
+ result will be the decoded JSON body.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ HttpResponseException: If we get an HTTP response code >= 300
|
|
|
+ (except 429).
|
|
|
+ NotRetryingDestination: If we are not yet ready to retry this
|
|
|
+ server.
|
|
|
+ FederationDeniedError: If this destination is not on our
|
|
|
+ federation whitelist
|
|
|
+ RequestSendFailed: If there were problems connecting to the
|
|
|
+ remote, due to e.g. DNS failures, connection timeouts etc.
|
|
|
"""
|
|
|
logger.debug("get_json args: %s", args)
|
|
|
|
|
@@ -637,17 +677,18 @@ class MatrixFederationHttpClient(object):
|
|
|
ignore_backoff (bool): true to ignore the historical backoff data and
|
|
|
try the request anyway.
|
|
|
Returns:
|
|
|
- Deferred: Succeeds when we get a 2xx HTTP response. The result
|
|
|
- will be the decoded JSON body.
|
|
|
-
|
|
|
- Fails with ``HttpResponseException`` if we get an HTTP response
|
|
|
- code >= 300.
|
|
|
-
|
|
|
- Fails with ``NotRetryingDestination`` if we are not yet ready
|
|
|
- to retry this server.
|
|
|
-
|
|
|
- Fails with ``FederationDeniedError`` if this destination
|
|
|
- is not on our federation whitelist
|
|
|
+ Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The
|
|
|
+ result will be the decoded JSON body.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ HttpResponseException: If we get an HTTP response code >= 300
|
|
|
+ (except 429).
|
|
|
+ NotRetryingDestination: If we are not yet ready to retry this
|
|
|
+ server.
|
|
|
+ FederationDeniedError: If this destination is not on our
|
|
|
+ federation whitelist
|
|
|
+ RequestSendFailed: If there were problems connecting to the
|
|
|
+ remote, due to e.g. DNS failures, connection timeouts etc.
|
|
|
"""
|
|
|
request = MatrixFederationRequest(
|
|
|
method="DELETE",
|
|
@@ -680,18 +721,20 @@ class MatrixFederationHttpClient(object):
|
|
|
args (dict): Optional dictionary used to create the query string.
|
|
|
ignore_backoff (bool): true to ignore the historical backoff data
|
|
|
and try the request anyway.
|
|
|
- Returns:
|
|
|
- Deferred: resolves with an (int,dict) tuple of the file length and
|
|
|
- a dict of the response headers.
|
|
|
-
|
|
|
- Fails with ``HttpResponseException`` if we get an HTTP response code
|
|
|
- >= 300
|
|
|
-
|
|
|
- Fails with ``NotRetryingDestination`` if we are not yet ready
|
|
|
- to retry this server.
|
|
|
|
|
|
- Fails with ``FederationDeniedError`` if this destination
|
|
|
- is not on our federation whitelist
|
|
|
+ Returns:
|
|
|
+ Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of
|
|
|
+ the file length and a dict of the response headers.
|
|
|
+
|
|
|
+ Raises:
|
|
|
+ HttpResponseException: If we get an HTTP response code >= 300
|
|
|
+ (except 429).
|
|
|
+ NotRetryingDestination: If we are not yet ready to retry this
|
|
|
+ server.
|
|
|
+ FederationDeniedError: If this destination is not on our
|
|
|
+ federation whitelist
|
|
|
+ RequestSendFailed: If there were problems connecting to the
|
|
|
+ remote, due to e.g. DNS failures, connection timeouts etc.
|
|
|
"""
|
|
|
request = MatrixFederationRequest(
|
|
|
method="GET",
|
|
@@ -784,21 +827,21 @@ def check_content_type_is_json(headers):
|
|
|
headers (twisted.web.http_headers.Headers): headers to check
|
|
|
|
|
|
Raises:
|
|
|
- RuntimeError if the
|
|
|
+ RequestSendFailed: if the Content-Type header is missing or isn't JSON
|
|
|
|
|
|
"""
|
|
|
c_type = headers.getRawHeaders(b"Content-Type")
|
|
|
if c_type is None:
|
|
|
- raise RuntimeError(
|
|
|
+ raise RequestSendFailed(RuntimeError(
|
|
|
"No Content-Type header"
|
|
|
- )
|
|
|
+ ), can_retry=False)
|
|
|
|
|
|
c_type = c_type[0].decode('ascii') # only the first header
|
|
|
val, options = cgi.parse_header(c_type)
|
|
|
if val != "application/json":
|
|
|
- raise RuntimeError(
|
|
|
+ raise RequestSendFailed(RuntimeError(
|
|
|
"Content-Type not application/json: was '%s'" % c_type
|
|
|
- )
|
|
|
+ ), can_retry=False)
|
|
|
|
|
|
|
|
|
def encode_query_args(args):
|