matrixfederationclient.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import synapse.util.retryutils
  16. from twisted.internet import defer, reactor, protocol
  17. from twisted.internet.error import DNSLookupError
  18. from twisted.web.client import readBody, HTTPConnectionPool, Agent
  19. from twisted.web.http_headers import Headers
  20. from twisted.web._newclient import ResponseDone
  21. from synapse.http.endpoint import matrix_federation_endpoint
  22. from synapse.util.async import sleep
  23. from synapse.util import logcontext
  24. import synapse.metrics
  25. from canonicaljson import encode_canonical_json
  26. from synapse.api.errors import (
  27. SynapseError, Codes, HttpResponseException,
  28. )
  29. from signedjson.sign import sign_json
  30. import cgi
  31. import simplejson as json
  32. import logging
  33. import random
  34. import sys
  35. import urllib
  36. import urlparse
  37. logger = logging.getLogger(__name__)
  38. outbound_logger = logging.getLogger("synapse.http.outbound")
  39. metrics = synapse.metrics.get_metrics_for(__name__)
  40. outgoing_requests_counter = metrics.register_counter(
  41. "requests",
  42. labels=["method"],
  43. )
  44. incoming_responses_counter = metrics.register_counter(
  45. "responses",
  46. labels=["method", "code"],
  47. )
  48. MAX_LONG_RETRIES = 10
  49. MAX_SHORT_RETRIES = 3
  50. class MatrixFederationEndpointFactory(object):
  51. def __init__(self, hs):
  52. self.tls_server_context_factory = hs.tls_server_context_factory
  53. def endpointForURI(self, uri):
  54. destination = uri.netloc
  55. return matrix_federation_endpoint(
  56. reactor, destination, timeout=10,
  57. ssl_context_factory=self.tls_server_context_factory
  58. )
  59. class MatrixFederationHttpClient(object):
  60. """HTTP client used to talk to other homeservers over the federation
  61. protocol. Send client certificates and signs requests.
  62. Attributes:
  63. agent (twisted.web.client.Agent): The twisted Agent used to send the
  64. requests.
  65. """
  66. def __init__(self, hs):
  67. self.hs = hs
  68. self.signing_key = hs.config.signing_key[0]
  69. self.server_name = hs.hostname
  70. pool = HTTPConnectionPool(reactor)
  71. pool.maxPersistentPerHost = 5
  72. pool.cachedConnectionTimeout = 2 * 60
  73. self.agent = Agent.usingEndpointFactory(
  74. reactor, MatrixFederationEndpointFactory(hs), pool=pool
  75. )
  76. self.clock = hs.get_clock()
  77. self._store = hs.get_datastore()
  78. self.version_string = hs.version_string
  79. self._next_id = 1
  80. def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
  81. return urlparse.urlunparse(
  82. ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
  83. )
  84. @defer.inlineCallbacks
  85. def _request(self, destination, method, path,
  86. body_callback, headers_dict={}, param_bytes=b"",
  87. query_bytes=b"", retry_on_dns_fail=True,
  88. timeout=None, long_retries=False,
  89. ignore_backoff=False,
  90. backoff_on_404=False):
  91. """ Creates and sends a request to the given server
  92. Args:
  93. destination (str): The remote server to send the HTTP request to.
  94. method (str): HTTP method
  95. path (str): The HTTP path
  96. ignore_backoff (bool): true to ignore the historical backoff data
  97. and try the request anyway.
  98. backoff_on_404 (bool): Back off if we get a 404
  99. Returns:
  100. Deferred: resolves with the http response object on success.
  101. Fails with ``HTTPRequestException``: if we get an HTTP response
  102. code >= 300.
  103. Fails with ``NotRetryingDestination`` if we are not yet ready
  104. to retry this server.
  105. (May also fail with plenty of other Exceptions for things like DNS
  106. failures, connection failures, SSL failures.)
  107. """
  108. limiter = yield synapse.util.retryutils.get_retry_limiter(
  109. destination,
  110. self.clock,
  111. self._store,
  112. backoff_on_404=backoff_on_404,
  113. ignore_backoff=ignore_backoff,
  114. )
  115. destination = destination.encode("ascii")
  116. path_bytes = path.encode("ascii")
  117. with limiter:
  118. headers_dict[b"User-Agent"] = [self.version_string]
  119. headers_dict[b"Host"] = [destination]
  120. url_bytes = self._create_url(
  121. destination, path_bytes, param_bytes, query_bytes
  122. )
  123. txn_id = "%s-O-%s" % (method, self._next_id)
  124. self._next_id = (self._next_id + 1) % (sys.maxint - 1)
  125. outbound_logger.info(
  126. "{%s} [%s] Sending request: %s %s",
  127. txn_id, destination, method, url_bytes
  128. )
  129. # XXX: Would be much nicer to retry only at the transaction-layer
  130. # (once we have reliable transactions in place)
  131. if long_retries:
  132. retries_left = MAX_LONG_RETRIES
  133. else:
  134. retries_left = MAX_SHORT_RETRIES
  135. http_url_bytes = urlparse.urlunparse(
  136. ("", "", path_bytes, param_bytes, query_bytes, "")
  137. )
  138. log_result = None
  139. try:
  140. while True:
  141. producer = None
  142. if body_callback:
  143. producer = body_callback(method, http_url_bytes, headers_dict)
  144. try:
  145. def send_request():
  146. request_deferred = self.agent.request(
  147. method,
  148. url_bytes,
  149. Headers(headers_dict),
  150. producer
  151. )
  152. return self.clock.time_bound_deferred(
  153. request_deferred,
  154. time_out=timeout / 1000. if timeout else 60,
  155. )
  156. with logcontext.PreserveLoggingContext():
  157. response = yield send_request()
  158. log_result = "%d %s" % (response.code, response.phrase,)
  159. break
  160. except Exception as e:
  161. if not retry_on_dns_fail and isinstance(e, DNSLookupError):
  162. logger.warn(
  163. "DNS Lookup failed to %s with %s",
  164. destination,
  165. e
  166. )
  167. log_result = "DNS Lookup failed to %s with %s" % (
  168. destination, e
  169. )
  170. raise
  171. logger.warn(
  172. "{%s} Sending request failed to %s: %s %s: %s",
  173. txn_id,
  174. destination,
  175. method,
  176. url_bytes,
  177. _flatten_response_never_received(e),
  178. )
  179. log_result = _flatten_response_never_received(e)
  180. if retries_left and not timeout:
  181. if long_retries:
  182. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  183. delay = min(delay, 60)
  184. delay *= random.uniform(0.8, 1.4)
  185. else:
  186. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  187. delay = min(delay, 2)
  188. delay *= random.uniform(0.8, 1.4)
  189. yield sleep(delay)
  190. retries_left -= 1
  191. else:
  192. raise
  193. finally:
  194. outbound_logger.info(
  195. "{%s} [%s] Result: %s",
  196. txn_id,
  197. destination,
  198. log_result,
  199. )
  200. if 200 <= response.code < 300:
  201. pass
  202. else:
  203. # :'(
  204. # Update transactions table?
  205. with logcontext.PreserveLoggingContext():
  206. body = yield readBody(response)
  207. raise HttpResponseException(
  208. response.code, response.phrase, body
  209. )
  210. defer.returnValue(response)
  211. def sign_request(self, destination, method, url_bytes, headers_dict,
  212. content=None):
  213. request = {
  214. "method": method,
  215. "uri": url_bytes,
  216. "origin": self.server_name,
  217. "destination": destination,
  218. }
  219. if content is not None:
  220. request["content"] = content
  221. request = sign_json(request, self.server_name, self.signing_key)
  222. auth_headers = []
  223. for key, sig in request["signatures"][self.server_name].items():
  224. auth_headers.append(bytes(
  225. "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
  226. self.server_name, key, sig,
  227. )
  228. ))
  229. headers_dict[b"Authorization"] = auth_headers
  230. @defer.inlineCallbacks
  231. def put_json(self, destination, path, data={}, json_data_callback=None,
  232. long_retries=False, timeout=None,
  233. ignore_backoff=False,
  234. backoff_on_404=False):
  235. """ Sends the specifed json data using PUT
  236. Args:
  237. destination (str): The remote server to send the HTTP request
  238. to.
  239. path (str): The HTTP path.
  240. data (dict): A dict containing the data that will be used as
  241. the request body. This will be encoded as JSON.
  242. json_data_callback (callable): A callable returning the dict to
  243. use as the request body.
  244. long_retries (bool): A boolean that indicates whether we should
  245. retry for a short or long time.
  246. timeout(int): How long to try (in ms) the destination for before
  247. giving up. None indicates no timeout.
  248. ignore_backoff (bool): true to ignore the historical backoff data
  249. and try the request anyway.
  250. backoff_on_404 (bool): True if we should count a 404 response as
  251. a failure of the server (and should therefore back off future
  252. requests)
  253. Returns:
  254. Deferred: Succeeds when we get a 2xx HTTP response. The result
  255. will be the decoded JSON body.
  256. Fails with ``HTTPRequestException`` if we get an HTTP response
  257. code >= 300.
  258. Fails with ``NotRetryingDestination`` if we are not yet ready
  259. to retry this server.
  260. """
  261. if not json_data_callback:
  262. def json_data_callback():
  263. return data
  264. def body_callback(method, url_bytes, headers_dict):
  265. json_data = json_data_callback()
  266. self.sign_request(
  267. destination, method, url_bytes, headers_dict, json_data
  268. )
  269. producer = _JsonProducer(json_data)
  270. return producer
  271. response = yield self._request(
  272. destination,
  273. "PUT",
  274. path,
  275. body_callback=body_callback,
  276. headers_dict={"Content-Type": ["application/json"]},
  277. long_retries=long_retries,
  278. timeout=timeout,
  279. ignore_backoff=ignore_backoff,
  280. backoff_on_404=backoff_on_404,
  281. )
  282. if 200 <= response.code < 300:
  283. # We need to update the transactions table to say it was sent?
  284. check_content_type_is_json(response.headers)
  285. with logcontext.PreserveLoggingContext():
  286. body = yield readBody(response)
  287. defer.returnValue(json.loads(body))
  288. @defer.inlineCallbacks
  289. def post_json(self, destination, path, data={}, long_retries=False,
  290. timeout=None, ignore_backoff=False, args={}):
  291. """ Sends the specifed json data using POST
  292. Args:
  293. destination (str): The remote server to send the HTTP request
  294. to.
  295. path (str): The HTTP path.
  296. data (dict): A dict containing the data that will be used as
  297. the request body. This will be encoded as JSON.
  298. long_retries (bool): A boolean that indicates whether we should
  299. retry for a short or long time.
  300. timeout(int): How long to try (in ms) the destination for before
  301. giving up. None indicates no timeout.
  302. ignore_backoff (bool): true to ignore the historical backoff data and
  303. try the request anyway.
  304. Returns:
  305. Deferred: Succeeds when we get a 2xx HTTP response. The result
  306. will be the decoded JSON body.
  307. Fails with ``HTTPRequestException`` if we get an HTTP response
  308. code >= 300.
  309. Fails with ``NotRetryingDestination`` if we are not yet ready
  310. to retry this server.
  311. """
  312. def body_callback(method, url_bytes, headers_dict):
  313. self.sign_request(
  314. destination, method, url_bytes, headers_dict, data
  315. )
  316. return _JsonProducer(data)
  317. response = yield self._request(
  318. destination,
  319. "POST",
  320. path,
  321. query_bytes=encode_query_args(args),
  322. body_callback=body_callback,
  323. headers_dict={"Content-Type": ["application/json"]},
  324. long_retries=long_retries,
  325. timeout=timeout,
  326. ignore_backoff=ignore_backoff,
  327. )
  328. if 200 <= response.code < 300:
  329. # We need to update the transactions table to say it was sent?
  330. check_content_type_is_json(response.headers)
  331. with logcontext.PreserveLoggingContext():
  332. body = yield readBody(response)
  333. defer.returnValue(json.loads(body))
  334. @defer.inlineCallbacks
  335. def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
  336. timeout=None, ignore_backoff=False):
  337. """ GETs some json from the given host homeserver and path
  338. Args:
  339. destination (str): The remote server to send the HTTP request
  340. to.
  341. path (str): The HTTP path.
  342. args (dict): A dictionary used to create query strings, defaults to
  343. None.
  344. timeout (int): How long to try (in ms) the destination for before
  345. giving up. None indicates no timeout and that the request will
  346. be retried.
  347. ignore_backoff (bool): true to ignore the historical backoff data
  348. and try the request anyway.
  349. Returns:
  350. Deferred: Succeeds when we get a 2xx HTTP response. The result
  351. will be the decoded JSON body.
  352. Fails with ``HTTPRequestException`` if we get an HTTP response
  353. code >= 300.
  354. Fails with ``NotRetryingDestination`` if we are not yet ready
  355. to retry this server.
  356. """
  357. logger.debug("get_json args: %s", args)
  358. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  359. def body_callback(method, url_bytes, headers_dict):
  360. self.sign_request(destination, method, url_bytes, headers_dict)
  361. return None
  362. response = yield self._request(
  363. destination,
  364. "GET",
  365. path,
  366. query_bytes=encode_query_args(args),
  367. body_callback=body_callback,
  368. retry_on_dns_fail=retry_on_dns_fail,
  369. timeout=timeout,
  370. ignore_backoff=ignore_backoff,
  371. )
  372. if 200 <= response.code < 300:
  373. # We need to update the transactions table to say it was sent?
  374. check_content_type_is_json(response.headers)
  375. with logcontext.PreserveLoggingContext():
  376. body = yield readBody(response)
  377. defer.returnValue(json.loads(body))
  378. @defer.inlineCallbacks
  379. def delete_json(self, destination, path, long_retries=False,
  380. timeout=None, ignore_backoff=False, args={}):
  381. """Send a DELETE request to the remote expecting some json response
  382. Args:
  383. destination (str): The remote server to send the HTTP request
  384. to.
  385. path (str): The HTTP path.
  386. long_retries (bool): A boolean that indicates whether we should
  387. retry for a short or long time.
  388. timeout(int): How long to try (in ms) the destination for before
  389. giving up. None indicates no timeout.
  390. ignore_backoff (bool): true to ignore the historical backoff data and
  391. try the request anyway.
  392. Returns:
  393. Deferred: Succeeds when we get a 2xx HTTP response. The result
  394. will be the decoded JSON body.
  395. Fails with ``HTTPRequestException`` if we get an HTTP response
  396. code >= 300.
  397. Fails with ``NotRetryingDestination`` if we are not yet ready
  398. to retry this server.
  399. """
  400. response = yield self._request(
  401. destination,
  402. "DELETE",
  403. path,
  404. query_bytes=encode_query_args(args),
  405. headers_dict={"Content-Type": ["application/json"]},
  406. long_retries=long_retries,
  407. timeout=timeout,
  408. ignore_backoff=ignore_backoff,
  409. )
  410. if 200 <= response.code < 300:
  411. # We need to update the transactions table to say it was sent?
  412. check_content_type_is_json(response.headers)
  413. with logcontext.PreserveLoggingContext():
  414. body = yield readBody(response)
  415. defer.returnValue(json.loads(body))
  416. @defer.inlineCallbacks
  417. def get_file(self, destination, path, output_stream, args={},
  418. retry_on_dns_fail=True, max_size=None,
  419. ignore_backoff=False):
  420. """GETs a file from a given homeserver
  421. Args:
  422. destination (str): The remote server to send the HTTP request to.
  423. path (str): The HTTP path to GET.
  424. output_stream (file): File to write the response body to.
  425. args (dict): Optional dictionary used to create the query string.
  426. ignore_backoff (bool): true to ignore the historical backoff data
  427. and try the request anyway.
  428. Returns:
  429. Deferred: resolves with an (int,dict) tuple of the file length and
  430. a dict of the response headers.
  431. Fails with ``HTTPRequestException`` if we get an HTTP response code
  432. >= 300
  433. Fails with ``NotRetryingDestination`` if we are not yet ready
  434. to retry this server.
  435. """
  436. encoded_args = {}
  437. for k, vs in args.items():
  438. if isinstance(vs, basestring):
  439. vs = [vs]
  440. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  441. query_bytes = urllib.urlencode(encoded_args, True)
  442. logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
  443. def body_callback(method, url_bytes, headers_dict):
  444. self.sign_request(destination, method, url_bytes, headers_dict)
  445. return None
  446. response = yield self._request(
  447. destination,
  448. "GET",
  449. path,
  450. query_bytes=query_bytes,
  451. body_callback=body_callback,
  452. retry_on_dns_fail=retry_on_dns_fail,
  453. ignore_backoff=ignore_backoff,
  454. )
  455. headers = dict(response.headers.getAllRawHeaders())
  456. try:
  457. with logcontext.PreserveLoggingContext():
  458. length = yield _readBodyToFile(
  459. response, output_stream, max_size
  460. )
  461. except:
  462. logger.exception("Failed to download body")
  463. raise
  464. defer.returnValue((length, headers))
  465. class _ReadBodyToFileProtocol(protocol.Protocol):
  466. def __init__(self, stream, deferred, max_size):
  467. self.stream = stream
  468. self.deferred = deferred
  469. self.length = 0
  470. self.max_size = max_size
  471. def dataReceived(self, data):
  472. self.stream.write(data)
  473. self.length += len(data)
  474. if self.max_size is not None and self.length >= self.max_size:
  475. self.deferred.errback(SynapseError(
  476. 502,
  477. "Requested file is too large > %r bytes" % (self.max_size,),
  478. Codes.TOO_LARGE,
  479. ))
  480. self.deferred = defer.Deferred()
  481. self.transport.loseConnection()
  482. def connectionLost(self, reason):
  483. if reason.check(ResponseDone):
  484. self.deferred.callback(self.length)
  485. else:
  486. self.deferred.errback(reason)
  487. def _readBodyToFile(response, stream, max_size):
  488. d = defer.Deferred()
  489. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  490. return d
  491. class _JsonProducer(object):
  492. """ Used by the twisted http client to create the HTTP body from json
  493. """
  494. def __init__(self, jsn):
  495. self.reset(jsn)
  496. def reset(self, jsn):
  497. self.body = encode_canonical_json(jsn)
  498. self.length = len(self.body)
  499. def startProducing(self, consumer):
  500. consumer.write(self.body)
  501. return defer.succeed(None)
  502. def pauseProducing(self):
  503. pass
  504. def stopProducing(self):
  505. pass
  506. def resumeProducing(self):
  507. pass
  508. def _flatten_response_never_received(e):
  509. if hasattr(e, "reasons"):
  510. reasons = ", ".join(
  511. _flatten_response_never_received(f.value)
  512. for f in e.reasons
  513. )
  514. return "%s:[%s]" % (type(e).__name__, reasons)
  515. else:
  516. return repr(e)
  517. def check_content_type_is_json(headers):
  518. """
  519. Check that a set of HTTP headers have a Content-Type header, and that it
  520. is application/json.
  521. Args:
  522. headers (twisted.web.http_headers.Headers): headers to check
  523. Raises:
  524. RuntimeError if the
  525. """
  526. c_type = headers.getRawHeaders("Content-Type")
  527. if c_type is None:
  528. raise RuntimeError(
  529. "No Content-Type header"
  530. )
  531. c_type = c_type[0] # only the first header
  532. val, options = cgi.parse_header(c_type)
  533. if val != "application/json":
  534. raise RuntimeError(
  535. "Content-Type not application/json: was '%s'" % c_type
  536. )
  537. def encode_query_args(args):
  538. encoded_args = {}
  539. for k, vs in args.items():
  540. if isinstance(vs, basestring):
  541. vs = [vs]
  542. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  543. query_bytes = urllib.urlencode(encoded_args, True)
  544. return query_bytes