matrixfederationclient.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer, reactor, protocol
  16. from twisted.internet.error import DNSLookupError
  17. from twisted.web.client import readBody, HTTPConnectionPool, Agent
  18. from twisted.web.http_headers import Headers
  19. from twisted.web._newclient import ResponseDone
  20. from synapse.http.endpoint import matrix_federation_endpoint
  21. from synapse.util.async import sleep
  22. from synapse.util.logcontext import preserve_context_over_fn
  23. import synapse.metrics
  24. from canonicaljson import encode_canonical_json
  25. from synapse.api.errors import (
  26. SynapseError, Codes, HttpResponseException,
  27. )
  28. from signedjson.sign import sign_json
  29. import simplejson as json
  30. import logging
  31. import random
  32. import sys
  33. import urllib
  34. import urlparse
  35. logger = logging.getLogger(__name__)
  36. outbound_logger = logging.getLogger("synapse.http.outbound")
  37. metrics = synapse.metrics.get_metrics_for(__name__)
  38. outgoing_requests_counter = metrics.register_counter(
  39. "requests",
  40. labels=["method"],
  41. )
  42. incoming_responses_counter = metrics.register_counter(
  43. "responses",
  44. labels=["method", "code"],
  45. )
  46. MAX_LONG_RETRIES = 10
  47. MAX_SHORT_RETRIES = 3
  48. class MatrixFederationEndpointFactory(object):
  49. def __init__(self, hs):
  50. self.tls_server_context_factory = hs.tls_server_context_factory
  51. def endpointForURI(self, uri):
  52. destination = uri.netloc
  53. return matrix_federation_endpoint(
  54. reactor, destination, timeout=10,
  55. ssl_context_factory=self.tls_server_context_factory
  56. )
  57. class MatrixFederationHttpClient(object):
  58. """HTTP client used to talk to other homeservers over the federation
  59. protocol. Send client certificates and signs requests.
  60. Attributes:
  61. agent (twisted.web.client.Agent): The twisted Agent used to send the
  62. requests.
  63. """
  64. def __init__(self, hs):
  65. self.hs = hs
  66. self.signing_key = hs.config.signing_key[0]
  67. self.server_name = hs.hostname
  68. pool = HTTPConnectionPool(reactor)
  69. pool.maxPersistentPerHost = 10
  70. self.agent = Agent.usingEndpointFactory(
  71. reactor, MatrixFederationEndpointFactory(hs), pool=pool
  72. )
  73. self.clock = hs.get_clock()
  74. self.version_string = hs.version_string
  75. self._next_id = 1
  76. def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
  77. return urlparse.urlunparse(
  78. ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
  79. )
  80. @defer.inlineCallbacks
  81. def _create_request(self, destination, method, path_bytes,
  82. body_callback, headers_dict={}, param_bytes=b"",
  83. query_bytes=b"", retry_on_dns_fail=True,
  84. timeout=None, long_retries=False):
  85. """ Creates and sends a request to the given url
  86. """
  87. headers_dict[b"User-Agent"] = [self.version_string]
  88. headers_dict[b"Host"] = [destination]
  89. url_bytes = self._create_url(
  90. destination, path_bytes, param_bytes, query_bytes
  91. )
  92. txn_id = "%s-O-%s" % (method, self._next_id)
  93. self._next_id = (self._next_id + 1) % (sys.maxint - 1)
  94. outbound_logger.info(
  95. "{%s} [%s] Sending request: %s %s",
  96. txn_id, destination, method, url_bytes
  97. )
  98. # XXX: Would be much nicer to retry only at the transaction-layer
  99. # (once we have reliable transactions in place)
  100. if long_retries:
  101. retries_left = MAX_LONG_RETRIES
  102. else:
  103. retries_left = MAX_SHORT_RETRIES
  104. http_url_bytes = urlparse.urlunparse(
  105. ("", "", path_bytes, param_bytes, query_bytes, "")
  106. )
  107. log_result = None
  108. try:
  109. while True:
  110. producer = None
  111. if body_callback:
  112. producer = body_callback(method, http_url_bytes, headers_dict)
  113. try:
  114. def send_request():
  115. request_deferred = preserve_context_over_fn(
  116. self.agent.request,
  117. method,
  118. url_bytes,
  119. Headers(headers_dict),
  120. producer
  121. )
  122. return self.clock.time_bound_deferred(
  123. request_deferred,
  124. time_out=timeout / 1000. if timeout else 60,
  125. )
  126. response = yield preserve_context_over_fn(
  127. send_request,
  128. )
  129. log_result = "%d %s" % (response.code, response.phrase,)
  130. break
  131. except Exception as e:
  132. if not retry_on_dns_fail and isinstance(e, DNSLookupError):
  133. logger.warn(
  134. "DNS Lookup failed to %s with %s",
  135. destination,
  136. e
  137. )
  138. log_result = "DNS Lookup failed to %s with %s" % (
  139. destination, e
  140. )
  141. raise
  142. logger.warn(
  143. "{%s} Sending request failed to %s: %s %s: %s - %s",
  144. txn_id,
  145. destination,
  146. method,
  147. url_bytes,
  148. type(e).__name__,
  149. _flatten_response_never_received(e),
  150. )
  151. log_result = "%s - %s" % (
  152. type(e).__name__, _flatten_response_never_received(e),
  153. )
  154. if retries_left and not timeout:
  155. if long_retries:
  156. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  157. delay = min(delay, 60)
  158. delay *= random.uniform(0.8, 1.4)
  159. else:
  160. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  161. delay = min(delay, 2)
  162. delay *= random.uniform(0.8, 1.4)
  163. yield sleep(delay)
  164. retries_left -= 1
  165. else:
  166. raise
  167. finally:
  168. outbound_logger.info(
  169. "{%s} [%s] Result: %s",
  170. txn_id,
  171. destination,
  172. log_result,
  173. )
  174. if 200 <= response.code < 300:
  175. pass
  176. else:
  177. # :'(
  178. # Update transactions table?
  179. body = yield preserve_context_over_fn(readBody, response)
  180. raise HttpResponseException(
  181. response.code, response.phrase, body
  182. )
  183. defer.returnValue(response)
  184. def sign_request(self, destination, method, url_bytes, headers_dict,
  185. content=None):
  186. request = {
  187. "method": method,
  188. "uri": url_bytes,
  189. "origin": self.server_name,
  190. "destination": destination,
  191. }
  192. if content is not None:
  193. request["content"] = content
  194. request = sign_json(request, self.server_name, self.signing_key)
  195. auth_headers = []
  196. for key, sig in request["signatures"][self.server_name].items():
  197. auth_headers.append(bytes(
  198. "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
  199. self.server_name, key, sig,
  200. )
  201. ))
  202. headers_dict[b"Authorization"] = auth_headers
  203. @defer.inlineCallbacks
  204. def put_json(self, destination, path, data={}, json_data_callback=None,
  205. long_retries=False):
  206. """ Sends the specifed json data using PUT
  207. Args:
  208. destination (str): The remote server to send the HTTP request
  209. to.
  210. path (str): The HTTP path.
  211. data (dict): A dict containing the data that will be used as
  212. the request body. This will be encoded as JSON.
  213. json_data_callback (callable): A callable returning the dict to
  214. use as the request body.
  215. long_retries (bool): A boolean that indicates whether we should
  216. retry for a short or long time.
  217. Returns:
  218. Deferred: Succeeds when we get a 2xx HTTP response. The result
  219. will be the decoded JSON body. On a 4xx or 5xx error response a
  220. CodeMessageException is raised.
  221. """
  222. if not json_data_callback:
  223. def json_data_callback():
  224. return data
  225. def body_callback(method, url_bytes, headers_dict):
  226. json_data = json_data_callback()
  227. self.sign_request(
  228. destination, method, url_bytes, headers_dict, json_data
  229. )
  230. producer = _JsonProducer(json_data)
  231. return producer
  232. response = yield self._create_request(
  233. destination.encode("ascii"),
  234. "PUT",
  235. path.encode("ascii"),
  236. body_callback=body_callback,
  237. headers_dict={"Content-Type": ["application/json"]},
  238. long_retries=long_retries,
  239. )
  240. if 200 <= response.code < 300:
  241. # We need to update the transactions table to say it was sent?
  242. c_type = response.headers.getRawHeaders("Content-Type")
  243. if "application/json" not in c_type:
  244. raise RuntimeError(
  245. "Content-Type not application/json"
  246. )
  247. body = yield preserve_context_over_fn(readBody, response)
  248. defer.returnValue(json.loads(body))
  249. @defer.inlineCallbacks
  250. def post_json(self, destination, path, data={}, long_retries=True):
  251. """ Sends the specifed json data using POST
  252. Args:
  253. destination (str): The remote server to send the HTTP request
  254. to.
  255. path (str): The HTTP path.
  256. data (dict): A dict containing the data that will be used as
  257. the request body. This will be encoded as JSON.
  258. long_retries (bool): A boolean that indicates whether we should
  259. retry for a short or long time.
  260. Returns:
  261. Deferred: Succeeds when we get a 2xx HTTP response. The result
  262. will be the decoded JSON body. On a 4xx or 5xx error response a
  263. CodeMessageException is raised.
  264. """
  265. def body_callback(method, url_bytes, headers_dict):
  266. self.sign_request(
  267. destination, method, url_bytes, headers_dict, data
  268. )
  269. return _JsonProducer(data)
  270. response = yield self._create_request(
  271. destination.encode("ascii"),
  272. "POST",
  273. path.encode("ascii"),
  274. body_callback=body_callback,
  275. headers_dict={"Content-Type": ["application/json"]},
  276. long_retries=True,
  277. )
  278. if 200 <= response.code < 300:
  279. # We need to update the transactions table to say it was sent?
  280. c_type = response.headers.getRawHeaders("Content-Type")
  281. if "application/json" not in c_type:
  282. raise RuntimeError(
  283. "Content-Type not application/json"
  284. )
  285. body = yield preserve_context_over_fn(readBody, response)
  286. defer.returnValue(json.loads(body))
  287. @defer.inlineCallbacks
  288. def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
  289. timeout=None):
  290. """ GETs some json from the given host homeserver and path
  291. Args:
  292. destination (str): The remote server to send the HTTP request
  293. to.
  294. path (str): The HTTP path.
  295. args (dict): A dictionary used to create query strings, defaults to
  296. None.
  297. timeout (int): How long to try (in ms) the destination for before
  298. giving up. None indicates no timeout and that the request will
  299. be retried.
  300. Returns:
  301. Deferred: Succeeds when we get *any* HTTP response.
  302. The result of the deferred is a tuple of `(code, response)`,
  303. where `response` is a dict representing the decoded JSON body.
  304. """
  305. logger.debug("get_json args: %s", args)
  306. encoded_args = {}
  307. for k, vs in args.items():
  308. if isinstance(vs, basestring):
  309. vs = [vs]
  310. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  311. query_bytes = urllib.urlencode(encoded_args, True)
  312. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  313. def body_callback(method, url_bytes, headers_dict):
  314. self.sign_request(destination, method, url_bytes, headers_dict)
  315. return None
  316. response = yield self._create_request(
  317. destination.encode("ascii"),
  318. "GET",
  319. path.encode("ascii"),
  320. query_bytes=query_bytes,
  321. body_callback=body_callback,
  322. retry_on_dns_fail=retry_on_dns_fail,
  323. timeout=timeout,
  324. )
  325. if 200 <= response.code < 300:
  326. # We need to update the transactions table to say it was sent?
  327. c_type = response.headers.getRawHeaders("Content-Type")
  328. if "application/json" not in c_type:
  329. raise RuntimeError(
  330. "Content-Type not application/json"
  331. )
  332. body = yield preserve_context_over_fn(readBody, response)
  333. defer.returnValue(json.loads(body))
  334. @defer.inlineCallbacks
  335. def get_file(self, destination, path, output_stream, args={},
  336. retry_on_dns_fail=True, max_size=None):
  337. """GETs a file from a given homeserver
  338. Args:
  339. destination (str): The remote server to send the HTTP request to.
  340. path (str): The HTTP path to GET.
  341. output_stream (file): File to write the response body to.
  342. args (dict): Optional dictionary used to create the query string.
  343. Returns:
  344. A (int,dict) tuple of the file length and a dict of the response
  345. headers.
  346. """
  347. encoded_args = {}
  348. for k, vs in args.items():
  349. if isinstance(vs, basestring):
  350. vs = [vs]
  351. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  352. query_bytes = urllib.urlencode(encoded_args, True)
  353. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  354. def body_callback(method, url_bytes, headers_dict):
  355. self.sign_request(destination, method, url_bytes, headers_dict)
  356. return None
  357. response = yield self._create_request(
  358. destination.encode("ascii"),
  359. "GET",
  360. path.encode("ascii"),
  361. query_bytes=query_bytes,
  362. body_callback=body_callback,
  363. retry_on_dns_fail=retry_on_dns_fail
  364. )
  365. headers = dict(response.headers.getAllRawHeaders())
  366. try:
  367. length = yield preserve_context_over_fn(
  368. _readBodyToFile,
  369. response, output_stream, max_size
  370. )
  371. except:
  372. logger.exception("Failed to download body")
  373. raise
  374. defer.returnValue((length, headers))
  375. class _ReadBodyToFileProtocol(protocol.Protocol):
  376. def __init__(self, stream, deferred, max_size):
  377. self.stream = stream
  378. self.deferred = deferred
  379. self.length = 0
  380. self.max_size = max_size
  381. def dataReceived(self, data):
  382. self.stream.write(data)
  383. self.length += len(data)
  384. if self.max_size is not None and self.length >= self.max_size:
  385. self.deferred.errback(SynapseError(
  386. 502,
  387. "Requested file is too large > %r bytes" % (self.max_size,),
  388. Codes.TOO_LARGE,
  389. ))
  390. self.deferred = defer.Deferred()
  391. self.transport.loseConnection()
  392. def connectionLost(self, reason):
  393. if reason.check(ResponseDone):
  394. self.deferred.callback(self.length)
  395. else:
  396. self.deferred.errback(reason)
  397. def _readBodyToFile(response, stream, max_size):
  398. d = defer.Deferred()
  399. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  400. return d
  401. class _JsonProducer(object):
  402. """ Used by the twisted http client to create the HTTP body from json
  403. """
  404. def __init__(self, jsn):
  405. self.reset(jsn)
  406. def reset(self, jsn):
  407. self.body = encode_canonical_json(jsn)
  408. self.length = len(self.body)
  409. def startProducing(self, consumer):
  410. consumer.write(self.body)
  411. return defer.succeed(None)
  412. def pauseProducing(self):
  413. pass
  414. def stopProducing(self):
  415. pass
  416. def resumeProducing(self):
  417. pass
  418. def _flatten_response_never_received(e):
  419. if hasattr(e, "reasons"):
  420. return ", ".join(
  421. _flatten_response_never_received(f.value)
  422. for f in e.reasons
  423. )
  424. else:
  425. return "%s: %s" % (type(e).__name__, e.message,)