matrixfederationclient.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer, reactor, protocol
  16. from twisted.internet.error import DNSLookupError
  17. from twisted.web.client import readBody, HTTPConnectionPool, Agent
  18. from twisted.web.http_headers import Headers
  19. from twisted.web._newclient import ResponseDone
  20. from synapse.http.endpoint import matrix_federation_endpoint
  21. from synapse.util.async import sleep
  22. from synapse.util.logcontext import preserve_context_over_fn
  23. import synapse.metrics
  24. from canonicaljson import encode_canonical_json
  25. from synapse.api.errors import (
  26. SynapseError, Codes, HttpResponseException,
  27. )
  28. from signedjson.sign import sign_json
  29. import simplejson as json
  30. import logging
  31. import random
  32. import sys
  33. import urllib
  34. import urlparse
  35. logger = logging.getLogger(__name__)
  36. outbound_logger = logging.getLogger("synapse.http.outbound")
  37. metrics = synapse.metrics.get_metrics_for(__name__)
  38. outgoing_requests_counter = metrics.register_counter(
  39. "requests",
  40. labels=["method"],
  41. )
  42. incoming_responses_counter = metrics.register_counter(
  43. "responses",
  44. labels=["method", "code"],
  45. )
  46. MAX_LONG_RETRIES = 10
  47. MAX_SHORT_RETRIES = 3
  48. class MatrixFederationEndpointFactory(object):
  49. def __init__(self, hs):
  50. self.tls_server_context_factory = hs.tls_server_context_factory
  51. def endpointForURI(self, uri):
  52. destination = uri.netloc
  53. return matrix_federation_endpoint(
  54. reactor, destination, timeout=10,
  55. ssl_context_factory=self.tls_server_context_factory
  56. )
  57. class MatrixFederationHttpClient(object):
  58. """HTTP client used to talk to other homeservers over the federation
  59. protocol. Send client certificates and signs requests.
  60. Attributes:
  61. agent (twisted.web.client.Agent): The twisted Agent used to send the
  62. requests.
  63. """
  64. def __init__(self, hs):
  65. self.hs = hs
  66. self.signing_key = hs.config.signing_key[0]
  67. self.server_name = hs.hostname
  68. pool = HTTPConnectionPool(reactor)
  69. pool.maxPersistentPerHost = 10
  70. self.agent = Agent.usingEndpointFactory(
  71. reactor, MatrixFederationEndpointFactory(hs), pool=pool
  72. )
  73. self.clock = hs.get_clock()
  74. self.version_string = hs.version_string
  75. self._next_id = 1
  76. def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
  77. return urlparse.urlunparse(
  78. ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
  79. )
  80. @defer.inlineCallbacks
  81. def _create_request(self, destination, method, path_bytes,
  82. body_callback, headers_dict={}, param_bytes=b"",
  83. query_bytes=b"", retry_on_dns_fail=True,
  84. timeout=None, long_retries=False):
  85. """ Creates and sends a request to the given url
  86. """
  87. headers_dict[b"User-Agent"] = [self.version_string]
  88. headers_dict[b"Host"] = [destination]
  89. url_bytes = self._create_url(
  90. destination, path_bytes, param_bytes, query_bytes
  91. )
  92. txn_id = "%s-O-%s" % (method, self._next_id)
  93. self._next_id = (self._next_id + 1) % (sys.maxint - 1)
  94. outbound_logger.info(
  95. "{%s} [%s] Sending request: %s %s",
  96. txn_id, destination, method, url_bytes
  97. )
  98. # XXX: Would be much nicer to retry only at the transaction-layer
  99. # (once we have reliable transactions in place)
  100. if long_retries:
  101. retries_left = MAX_LONG_RETRIES
  102. else:
  103. retries_left = MAX_SHORT_RETRIES
  104. http_url_bytes = urlparse.urlunparse(
  105. ("", "", path_bytes, param_bytes, query_bytes, "")
  106. )
  107. log_result = None
  108. try:
  109. while True:
  110. producer = None
  111. if body_callback:
  112. producer = body_callback(method, http_url_bytes, headers_dict)
  113. try:
  114. def send_request():
  115. request_deferred = preserve_context_over_fn(
  116. self.agent.request,
  117. method,
  118. url_bytes,
  119. Headers(headers_dict),
  120. producer
  121. )
  122. return self.clock.time_bound_deferred(
  123. request_deferred,
  124. time_out=timeout / 1000. if timeout else 60,
  125. )
  126. response = yield preserve_context_over_fn(send_request)
  127. log_result = "%d %s" % (response.code, response.phrase,)
  128. break
  129. except Exception as e:
  130. if not retry_on_dns_fail and isinstance(e, DNSLookupError):
  131. logger.warn(
  132. "DNS Lookup failed to %s with %s",
  133. destination,
  134. e
  135. )
  136. log_result = "DNS Lookup failed to %s with %s" % (
  137. destination, e
  138. )
  139. raise
  140. logger.warn(
  141. "{%s} Sending request failed to %s: %s %s: %s - %s",
  142. txn_id,
  143. destination,
  144. method,
  145. url_bytes,
  146. type(e).__name__,
  147. _flatten_response_never_received(e),
  148. )
  149. log_result = "%s - %s" % (
  150. type(e).__name__, _flatten_response_never_received(e),
  151. )
  152. if retries_left and not timeout:
  153. if long_retries:
  154. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  155. delay = min(delay, 60)
  156. delay *= random.uniform(0.8, 1.4)
  157. else:
  158. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  159. delay = min(delay, 2)
  160. delay *= random.uniform(0.8, 1.4)
  161. yield sleep(delay)
  162. retries_left -= 1
  163. else:
  164. raise
  165. finally:
  166. outbound_logger.info(
  167. "{%s} [%s] Result: %s",
  168. txn_id,
  169. destination,
  170. log_result,
  171. )
  172. if 200 <= response.code < 300:
  173. pass
  174. else:
  175. # :'(
  176. # Update transactions table?
  177. body = yield preserve_context_over_fn(readBody, response)
  178. raise HttpResponseException(
  179. response.code, response.phrase, body
  180. )
  181. defer.returnValue(response)
  182. def sign_request(self, destination, method, url_bytes, headers_dict,
  183. content=None):
  184. request = {
  185. "method": method,
  186. "uri": url_bytes,
  187. "origin": self.server_name,
  188. "destination": destination,
  189. }
  190. if content is not None:
  191. request["content"] = content
  192. request = sign_json(request, self.server_name, self.signing_key)
  193. auth_headers = []
  194. for key, sig in request["signatures"][self.server_name].items():
  195. auth_headers.append(bytes(
  196. "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
  197. self.server_name, key, sig,
  198. )
  199. ))
  200. headers_dict[b"Authorization"] = auth_headers
  201. @defer.inlineCallbacks
  202. def put_json(self, destination, path, data={}, json_data_callback=None,
  203. long_retries=False, timeout=None):
  204. """ Sends the specifed json data using PUT
  205. Args:
  206. destination (str): The remote server to send the HTTP request
  207. to.
  208. path (str): The HTTP path.
  209. data (dict): A dict containing the data that will be used as
  210. the request body. This will be encoded as JSON.
  211. json_data_callback (callable): A callable returning the dict to
  212. use as the request body.
  213. long_retries (bool): A boolean that indicates whether we should
  214. retry for a short or long time.
  215. timeout(int): How long to try (in ms) the destination for before
  216. giving up. None indicates no timeout.
  217. Returns:
  218. Deferred: Succeeds when we get a 2xx HTTP response. The result
  219. will be the decoded JSON body. On a 4xx or 5xx error response a
  220. CodeMessageException is raised.
  221. """
  222. if not json_data_callback:
  223. def json_data_callback():
  224. return data
  225. def body_callback(method, url_bytes, headers_dict):
  226. json_data = json_data_callback()
  227. self.sign_request(
  228. destination, method, url_bytes, headers_dict, json_data
  229. )
  230. producer = _JsonProducer(json_data)
  231. return producer
  232. response = yield self._create_request(
  233. destination.encode("ascii"),
  234. "PUT",
  235. path.encode("ascii"),
  236. body_callback=body_callback,
  237. headers_dict={"Content-Type": ["application/json"]},
  238. long_retries=long_retries,
  239. timeout=timeout,
  240. )
  241. if 200 <= response.code < 300:
  242. # We need to update the transactions table to say it was sent?
  243. c_type = response.headers.getRawHeaders("Content-Type")
  244. if "application/json" not in c_type:
  245. raise RuntimeError(
  246. "Content-Type not application/json"
  247. )
  248. body = yield preserve_context_over_fn(readBody, response)
  249. defer.returnValue(json.loads(body))
  250. @defer.inlineCallbacks
  251. def post_json(self, destination, path, data={}, long_retries=True,
  252. timeout=None):
  253. """ Sends the specifed json data using POST
  254. Args:
  255. destination (str): The remote server to send the HTTP request
  256. to.
  257. path (str): The HTTP path.
  258. data (dict): A dict containing the data that will be used as
  259. the request body. This will be encoded as JSON.
  260. long_retries (bool): A boolean that indicates whether we should
  261. retry for a short or long time.
  262. timeout(int): How long to try (in ms) the destination for before
  263. giving up. None indicates no timeout.
  264. Returns:
  265. Deferred: Succeeds when we get a 2xx HTTP response. The result
  266. will be the decoded JSON body. On a 4xx or 5xx error response a
  267. CodeMessageException is raised.
  268. """
  269. def body_callback(method, url_bytes, headers_dict):
  270. self.sign_request(
  271. destination, method, url_bytes, headers_dict, data
  272. )
  273. return _JsonProducer(data)
  274. response = yield self._create_request(
  275. destination.encode("ascii"),
  276. "POST",
  277. path.encode("ascii"),
  278. body_callback=body_callback,
  279. headers_dict={"Content-Type": ["application/json"]},
  280. long_retries=True,
  281. timeout=timeout,
  282. )
  283. if 200 <= response.code < 300:
  284. # We need to update the transactions table to say it was sent?
  285. c_type = response.headers.getRawHeaders("Content-Type")
  286. if "application/json" not in c_type:
  287. raise RuntimeError(
  288. "Content-Type not application/json"
  289. )
  290. body = yield preserve_context_over_fn(readBody, response)
  291. defer.returnValue(json.loads(body))
  292. @defer.inlineCallbacks
  293. def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
  294. timeout=None):
  295. """ GETs some json from the given host homeserver and path
  296. Args:
  297. destination (str): The remote server to send the HTTP request
  298. to.
  299. path (str): The HTTP path.
  300. args (dict): A dictionary used to create query strings, defaults to
  301. None.
  302. timeout (int): How long to try (in ms) the destination for before
  303. giving up. None indicates no timeout and that the request will
  304. be retried.
  305. Returns:
  306. Deferred: Succeeds when we get *any* HTTP response.
  307. The result of the deferred is a tuple of `(code, response)`,
  308. where `response` is a dict representing the decoded JSON body.
  309. """
  310. logger.debug("get_json args: %s", args)
  311. encoded_args = {}
  312. for k, vs in args.items():
  313. if isinstance(vs, basestring):
  314. vs = [vs]
  315. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  316. query_bytes = urllib.urlencode(encoded_args, True)
  317. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  318. def body_callback(method, url_bytes, headers_dict):
  319. self.sign_request(destination, method, url_bytes, headers_dict)
  320. return None
  321. response = yield self._create_request(
  322. destination.encode("ascii"),
  323. "GET",
  324. path.encode("ascii"),
  325. query_bytes=query_bytes,
  326. body_callback=body_callback,
  327. retry_on_dns_fail=retry_on_dns_fail,
  328. timeout=timeout,
  329. )
  330. if 200 <= response.code < 300:
  331. # We need to update the transactions table to say it was sent?
  332. c_type = response.headers.getRawHeaders("Content-Type")
  333. if "application/json" not in c_type:
  334. raise RuntimeError(
  335. "Content-Type not application/json"
  336. )
  337. body = yield preserve_context_over_fn(readBody, response)
  338. defer.returnValue(json.loads(body))
  339. @defer.inlineCallbacks
  340. def get_file(self, destination, path, output_stream, args={},
  341. retry_on_dns_fail=True, max_size=None):
  342. """GETs a file from a given homeserver
  343. Args:
  344. destination (str): The remote server to send the HTTP request to.
  345. path (str): The HTTP path to GET.
  346. output_stream (file): File to write the response body to.
  347. args (dict): Optional dictionary used to create the query string.
  348. Returns:
  349. A (int,dict) tuple of the file length and a dict of the response
  350. headers.
  351. """
  352. encoded_args = {}
  353. for k, vs in args.items():
  354. if isinstance(vs, basestring):
  355. vs = [vs]
  356. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  357. query_bytes = urllib.urlencode(encoded_args, True)
  358. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  359. def body_callback(method, url_bytes, headers_dict):
  360. self.sign_request(destination, method, url_bytes, headers_dict)
  361. return None
  362. response = yield self._create_request(
  363. destination.encode("ascii"),
  364. "GET",
  365. path.encode("ascii"),
  366. query_bytes=query_bytes,
  367. body_callback=body_callback,
  368. retry_on_dns_fail=retry_on_dns_fail
  369. )
  370. headers = dict(response.headers.getAllRawHeaders())
  371. try:
  372. length = yield preserve_context_over_fn(
  373. _readBodyToFile,
  374. response, output_stream, max_size
  375. )
  376. except:
  377. logger.exception("Failed to download body")
  378. raise
  379. defer.returnValue((length, headers))
  380. class _ReadBodyToFileProtocol(protocol.Protocol):
  381. def __init__(self, stream, deferred, max_size):
  382. self.stream = stream
  383. self.deferred = deferred
  384. self.length = 0
  385. self.max_size = max_size
  386. def dataReceived(self, data):
  387. self.stream.write(data)
  388. self.length += len(data)
  389. if self.max_size is not None and self.length >= self.max_size:
  390. self.deferred.errback(SynapseError(
  391. 502,
  392. "Requested file is too large > %r bytes" % (self.max_size,),
  393. Codes.TOO_LARGE,
  394. ))
  395. self.deferred = defer.Deferred()
  396. self.transport.loseConnection()
  397. def connectionLost(self, reason):
  398. if reason.check(ResponseDone):
  399. self.deferred.callback(self.length)
  400. else:
  401. self.deferred.errback(reason)
  402. def _readBodyToFile(response, stream, max_size):
  403. d = defer.Deferred()
  404. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  405. return d
  406. class _JsonProducer(object):
  407. """ Used by the twisted http client to create the HTTP body from json
  408. """
  409. def __init__(self, jsn):
  410. self.reset(jsn)
  411. def reset(self, jsn):
  412. self.body = encode_canonical_json(jsn)
  413. self.length = len(self.body)
  414. def startProducing(self, consumer):
  415. consumer.write(self.body)
  416. return defer.succeed(None)
  417. def pauseProducing(self):
  418. pass
  419. def stopProducing(self):
  420. pass
  421. def resumeProducing(self):
  422. pass
  423. def _flatten_response_never_received(e):
  424. if hasattr(e, "reasons"):
  425. return ", ".join(
  426. _flatten_response_never_received(f.value)
  427. for f in e.reasons
  428. )
  429. else:
  430. return "%s: %s" % (type(e).__name__, e.message,)