123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522 |
- # -*- coding: utf-8 -*-
- # Copyright 2014-2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from twisted.internet import defer, reactor, protocol
- from twisted.internet.error import DNSLookupError
- from twisted.web.client import readBody, HTTPConnectionPool, Agent
- from twisted.web.http_headers import Headers
- from twisted.web._newclient import ResponseDone
- from synapse.http.endpoint import matrix_federation_endpoint
- from synapse.util.async import sleep
- from synapse.util.logcontext import preserve_context_over_fn
- import synapse.metrics
- from canonicaljson import encode_canonical_json
- from synapse.api.errors import (
- SynapseError, Codes, HttpResponseException,
- )
- from signedjson.sign import sign_json
- import simplejson as json
- import logging
- import random
- import sys
- import urllib
- import urlparse
- logger = logging.getLogger(__name__)
- outbound_logger = logging.getLogger("synapse.http.outbound")
- metrics = synapse.metrics.get_metrics_for(__name__)
- outgoing_requests_counter = metrics.register_counter(
- "requests",
- labels=["method"],
- )
- incoming_responses_counter = metrics.register_counter(
- "responses",
- labels=["method", "code"],
- )
- MAX_LONG_RETRIES = 10
- MAX_SHORT_RETRIES = 3
- class MatrixFederationEndpointFactory(object):
- def __init__(self, hs):
- self.tls_server_context_factory = hs.tls_server_context_factory
- def endpointForURI(self, uri):
- destination = uri.netloc
- return matrix_federation_endpoint(
- reactor, destination, timeout=10,
- ssl_context_factory=self.tls_server_context_factory
- )
- class MatrixFederationHttpClient(object):
- """HTTP client used to talk to other homeservers over the federation
- protocol. Send client certificates and signs requests.
- Attributes:
- agent (twisted.web.client.Agent): The twisted Agent used to send the
- requests.
- """
- def __init__(self, hs):
- self.hs = hs
- self.signing_key = hs.config.signing_key[0]
- self.server_name = hs.hostname
- pool = HTTPConnectionPool(reactor)
- pool.maxPersistentPerHost = 10
- self.agent = Agent.usingEndpointFactory(
- reactor, MatrixFederationEndpointFactory(hs), pool=pool
- )
- self.clock = hs.get_clock()
- self.version_string = hs.version_string
- self._next_id = 1
- def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
- return urlparse.urlunparse(
- ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
- )
- @defer.inlineCallbacks
- def _create_request(self, destination, method, path_bytes,
- body_callback, headers_dict={}, param_bytes=b"",
- query_bytes=b"", retry_on_dns_fail=True,
- timeout=None, long_retries=False):
- """ Creates and sends a request to the given url
- """
- headers_dict[b"User-Agent"] = [self.version_string]
- headers_dict[b"Host"] = [destination]
- url_bytes = self._create_url(
- destination, path_bytes, param_bytes, query_bytes
- )
- txn_id = "%s-O-%s" % (method, self._next_id)
- self._next_id = (self._next_id + 1) % (sys.maxint - 1)
- outbound_logger.info(
- "{%s} [%s] Sending request: %s %s",
- txn_id, destination, method, url_bytes
- )
- # XXX: Would be much nicer to retry only at the transaction-layer
- # (once we have reliable transactions in place)
- if long_retries:
- retries_left = MAX_LONG_RETRIES
- else:
- retries_left = MAX_SHORT_RETRIES
- http_url_bytes = urlparse.urlunparse(
- ("", "", path_bytes, param_bytes, query_bytes, "")
- )
- log_result = None
- try:
- while True:
- producer = None
- if body_callback:
- producer = body_callback(method, http_url_bytes, headers_dict)
- try:
- def send_request():
- request_deferred = preserve_context_over_fn(
- self.agent.request,
- method,
- url_bytes,
- Headers(headers_dict),
- producer
- )
- return self.clock.time_bound_deferred(
- request_deferred,
- time_out=timeout / 1000. if timeout else 60,
- )
- response = yield preserve_context_over_fn(
- send_request,
- )
- log_result = "%d %s" % (response.code, response.phrase,)
- break
- except Exception as e:
- if not retry_on_dns_fail and isinstance(e, DNSLookupError):
- logger.warn(
- "DNS Lookup failed to %s with %s",
- destination,
- e
- )
- log_result = "DNS Lookup failed to %s with %s" % (
- destination, e
- )
- raise
- logger.warn(
- "{%s} Sending request failed to %s: %s %s: %s - %s",
- txn_id,
- destination,
- method,
- url_bytes,
- type(e).__name__,
- _flatten_response_never_received(e),
- )
- log_result = "%s - %s" % (
- type(e).__name__, _flatten_response_never_received(e),
- )
- if retries_left and not timeout:
- if long_retries:
- delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
- delay = min(delay, 60)
- delay *= random.uniform(0.8, 1.4)
- else:
- delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
- delay = min(delay, 2)
- delay *= random.uniform(0.8, 1.4)
- yield sleep(delay)
- retries_left -= 1
- else:
- raise
- finally:
- outbound_logger.info(
- "{%s} [%s] Result: %s",
- txn_id,
- destination,
- log_result,
- )
- if 200 <= response.code < 300:
- pass
- else:
- # :'(
- # Update transactions table?
- body = yield preserve_context_over_fn(readBody, response)
- raise HttpResponseException(
- response.code, response.phrase, body
- )
- defer.returnValue(response)
- def sign_request(self, destination, method, url_bytes, headers_dict,
- content=None):
- request = {
- "method": method,
- "uri": url_bytes,
- "origin": self.server_name,
- "destination": destination,
- }
- if content is not None:
- request["content"] = content
- request = sign_json(request, self.server_name, self.signing_key)
- auth_headers = []
- for key, sig in request["signatures"][self.server_name].items():
- auth_headers.append(bytes(
- "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
- self.server_name, key, sig,
- )
- ))
- headers_dict[b"Authorization"] = auth_headers
- @defer.inlineCallbacks
- def put_json(self, destination, path, data={}, json_data_callback=None,
- long_retries=False):
- """ Sends the specifed json data using PUT
- Args:
- destination (str): The remote server to send the HTTP request
- to.
- path (str): The HTTP path.
- data (dict): A dict containing the data that will be used as
- the request body. This will be encoded as JSON.
- json_data_callback (callable): A callable returning the dict to
- use as the request body.
- long_retries (bool): A boolean that indicates whether we should
- retry for a short or long time.
- Returns:
- Deferred: Succeeds when we get a 2xx HTTP response. The result
- will be the decoded JSON body. On a 4xx or 5xx error response a
- CodeMessageException is raised.
- """
- if not json_data_callback:
- def json_data_callback():
- return data
- def body_callback(method, url_bytes, headers_dict):
- json_data = json_data_callback()
- self.sign_request(
- destination, method, url_bytes, headers_dict, json_data
- )
- producer = _JsonProducer(json_data)
- return producer
- response = yield self._create_request(
- destination.encode("ascii"),
- "PUT",
- path.encode("ascii"),
- body_callback=body_callback,
- headers_dict={"Content-Type": ["application/json"]},
- long_retries=long_retries,
- )
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- c_type = response.headers.getRawHeaders("Content-Type")
- if "application/json" not in c_type:
- raise RuntimeError(
- "Content-Type not application/json"
- )
- body = yield preserve_context_over_fn(readBody, response)
- defer.returnValue(json.loads(body))
- @defer.inlineCallbacks
- def post_json(self, destination, path, data={}, long_retries=True):
- """ Sends the specifed json data using POST
- Args:
- destination (str): The remote server to send the HTTP request
- to.
- path (str): The HTTP path.
- data (dict): A dict containing the data that will be used as
- the request body. This will be encoded as JSON.
- long_retries (bool): A boolean that indicates whether we should
- retry for a short or long time.
- Returns:
- Deferred: Succeeds when we get a 2xx HTTP response. The result
- will be the decoded JSON body. On a 4xx or 5xx error response a
- CodeMessageException is raised.
- """
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(
- destination, method, url_bytes, headers_dict, data
- )
- return _JsonProducer(data)
- response = yield self._create_request(
- destination.encode("ascii"),
- "POST",
- path.encode("ascii"),
- body_callback=body_callback,
- headers_dict={"Content-Type": ["application/json"]},
- long_retries=True,
- )
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- c_type = response.headers.getRawHeaders("Content-Type")
- if "application/json" not in c_type:
- raise RuntimeError(
- "Content-Type not application/json"
- )
- body = yield preserve_context_over_fn(readBody, response)
- defer.returnValue(json.loads(body))
- @defer.inlineCallbacks
- def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
- timeout=None):
- """ GETs some json from the given host homeserver and path
- Args:
- destination (str): The remote server to send the HTTP request
- to.
- path (str): The HTTP path.
- args (dict): A dictionary used to create query strings, defaults to
- None.
- timeout (int): How long to try (in ms) the destination for before
- giving up. None indicates no timeout and that the request will
- be retried.
- Returns:
- Deferred: Succeeds when we get *any* HTTP response.
- The result of the deferred is a tuple of `(code, response)`,
- where `response` is a dict representing the decoded JSON body.
- """
- logger.debug("get_json args: %s", args)
- encoded_args = {}
- for k, vs in args.items():
- if isinstance(vs, basestring):
- vs = [vs]
- encoded_args[k] = [v.encode("UTF-8") for v in vs]
- query_bytes = urllib.urlencode(encoded_args, True)
- logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(destination, method, url_bytes, headers_dict)
- return None
- response = yield self._create_request(
- destination.encode("ascii"),
- "GET",
- path.encode("ascii"),
- query_bytes=query_bytes,
- body_callback=body_callback,
- retry_on_dns_fail=retry_on_dns_fail,
- timeout=timeout,
- )
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- c_type = response.headers.getRawHeaders("Content-Type")
- if "application/json" not in c_type:
- raise RuntimeError(
- "Content-Type not application/json"
- )
- body = yield preserve_context_over_fn(readBody, response)
- defer.returnValue(json.loads(body))
- @defer.inlineCallbacks
- def get_file(self, destination, path, output_stream, args={},
- retry_on_dns_fail=True, max_size=None):
- """GETs a file from a given homeserver
- Args:
- destination (str): The remote server to send the HTTP request to.
- path (str): The HTTP path to GET.
- output_stream (file): File to write the response body to.
- args (dict): Optional dictionary used to create the query string.
- Returns:
- A (int,dict) tuple of the file length and a dict of the response
- headers.
- """
- encoded_args = {}
- for k, vs in args.items():
- if isinstance(vs, basestring):
- vs = [vs]
- encoded_args[k] = [v.encode("UTF-8") for v in vs]
- query_bytes = urllib.urlencode(encoded_args, True)
- logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(destination, method, url_bytes, headers_dict)
- return None
- response = yield self._create_request(
- destination.encode("ascii"),
- "GET",
- path.encode("ascii"),
- query_bytes=query_bytes,
- body_callback=body_callback,
- retry_on_dns_fail=retry_on_dns_fail
- )
- headers = dict(response.headers.getAllRawHeaders())
- try:
- length = yield preserve_context_over_fn(
- _readBodyToFile,
- response, output_stream, max_size
- )
- except:
- logger.exception("Failed to download body")
- raise
- defer.returnValue((length, headers))
- class _ReadBodyToFileProtocol(protocol.Protocol):
- def __init__(self, stream, deferred, max_size):
- self.stream = stream
- self.deferred = deferred
- self.length = 0
- self.max_size = max_size
- def dataReceived(self, data):
- self.stream.write(data)
- self.length += len(data)
- if self.max_size is not None and self.length >= self.max_size:
- self.deferred.errback(SynapseError(
- 502,
- "Requested file is too large > %r bytes" % (self.max_size,),
- Codes.TOO_LARGE,
- ))
- self.deferred = defer.Deferred()
- self.transport.loseConnection()
- def connectionLost(self, reason):
- if reason.check(ResponseDone):
- self.deferred.callback(self.length)
- else:
- self.deferred.errback(reason)
- def _readBodyToFile(response, stream, max_size):
- d = defer.Deferred()
- response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
- return d
- class _JsonProducer(object):
- """ Used by the twisted http client to create the HTTP body from json
- """
- def __init__(self, jsn):
- self.reset(jsn)
- def reset(self, jsn):
- self.body = encode_canonical_json(jsn)
- self.length = len(self.body)
- def startProducing(self, consumer):
- consumer.write(self.body)
- return defer.succeed(None)
- def pauseProducing(self):
- pass
- def stopProducing(self):
- pass
- def resumeProducing(self):
- pass
- def _flatten_response_never_received(e):
- if hasattr(e, "reasons"):
- return ", ".join(
- _flatten_response_never_received(f.value)
- for f in e.reasons
- )
- else:
- return "%s: %s" % (type(e).__name__, e.message,)
|