matrixfederationclient.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import synapse.util.retryutils
  16. from twisted.internet import defer, reactor, protocol
  17. from twisted.internet.error import DNSLookupError
  18. from twisted.web.client import readBody, HTTPConnectionPool, Agent
  19. from twisted.web.http_headers import Headers
  20. from twisted.web._newclient import ResponseDone
  21. from synapse.http.endpoint import matrix_federation_endpoint
  22. from synapse.util.async import sleep
  23. from synapse.util import logcontext
  24. import synapse.metrics
  25. from canonicaljson import encode_canonical_json
  26. from synapse.api.errors import (
  27. SynapseError, Codes, HttpResponseException, FederationDeniedError,
  28. )
  29. from signedjson.sign import sign_json
  30. import cgi
  31. import simplejson as json
  32. import logging
  33. import random
  34. import sys
  35. import urllib
  36. import urlparse
  37. logger = logging.getLogger(__name__)
  38. outbound_logger = logging.getLogger("synapse.http.outbound")
  39. metrics = synapse.metrics.get_metrics_for(__name__)
  40. outgoing_requests_counter = metrics.register_counter(
  41. "requests",
  42. labels=["method"],
  43. )
  44. incoming_responses_counter = metrics.register_counter(
  45. "responses",
  46. labels=["method", "code"],
  47. )
  48. MAX_LONG_RETRIES = 10
  49. MAX_SHORT_RETRIES = 3
  50. class MatrixFederationEndpointFactory(object):
  51. def __init__(self, hs):
  52. self.tls_server_context_factory = hs.tls_server_context_factory
  53. def endpointForURI(self, uri):
  54. destination = uri.netloc
  55. return matrix_federation_endpoint(
  56. reactor, destination, timeout=10,
  57. ssl_context_factory=self.tls_server_context_factory
  58. )
  59. class MatrixFederationHttpClient(object):
  60. """HTTP client used to talk to other homeservers over the federation
  61. protocol. Send client certificates and signs requests.
  62. Attributes:
  63. agent (twisted.web.client.Agent): The twisted Agent used to send the
  64. requests.
  65. """
  66. def __init__(self, hs):
  67. self.hs = hs
  68. self.signing_key = hs.config.signing_key[0]
  69. self.server_name = hs.hostname
  70. pool = HTTPConnectionPool(reactor)
  71. pool.maxPersistentPerHost = 5
  72. pool.cachedConnectionTimeout = 2 * 60
  73. self.agent = Agent.usingEndpointFactory(
  74. reactor, MatrixFederationEndpointFactory(hs), pool=pool
  75. )
  76. self.clock = hs.get_clock()
  77. self._store = hs.get_datastore()
  78. self.version_string = hs.version_string
  79. self._next_id = 1
  80. def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
  81. return urlparse.urlunparse(
  82. ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
  83. )
  84. @defer.inlineCallbacks
  85. def _request(self, destination, method, path,
  86. body_callback, headers_dict={}, param_bytes=b"",
  87. query_bytes=b"", retry_on_dns_fail=True,
  88. timeout=None, long_retries=False,
  89. ignore_backoff=False,
  90. backoff_on_404=False):
  91. """ Creates and sends a request to the given server
  92. Args:
  93. destination (str): The remote server to send the HTTP request to.
  94. method (str): HTTP method
  95. path (str): The HTTP path
  96. ignore_backoff (bool): true to ignore the historical backoff data
  97. and try the request anyway.
  98. backoff_on_404 (bool): Back off if we get a 404
  99. Returns:
  100. Deferred: resolves with the http response object on success.
  101. Fails with ``HTTPRequestException``: if we get an HTTP response
  102. code >= 300.
  103. Fails with ``NotRetryingDestination`` if we are not yet ready
  104. to retry this server.
  105. Fails with ``FederationDeniedError`` if this destination
  106. is not on our federation whitelist
  107. (May also fail with plenty of other Exceptions for things like DNS
  108. failures, connection failures, SSL failures.)
  109. """
  110. if (
  111. self.hs.config.federation_domain_whitelist and
  112. destination not in self.hs.config.federation_domain_whitelist
  113. ):
  114. raise FederationDeniedError(destination)
  115. limiter = yield synapse.util.retryutils.get_retry_limiter(
  116. destination,
  117. self.clock,
  118. self._store,
  119. backoff_on_404=backoff_on_404,
  120. ignore_backoff=ignore_backoff,
  121. )
  122. destination = destination.encode("ascii")
  123. path_bytes = path.encode("ascii")
  124. with limiter:
  125. headers_dict[b"User-Agent"] = [self.version_string]
  126. headers_dict[b"Host"] = [destination]
  127. url_bytes = self._create_url(
  128. destination, path_bytes, param_bytes, query_bytes
  129. )
  130. txn_id = "%s-O-%s" % (method, self._next_id)
  131. self._next_id = (self._next_id + 1) % (sys.maxint - 1)
  132. outbound_logger.info(
  133. "{%s} [%s] Sending request: %s %s",
  134. txn_id, destination, method, url_bytes
  135. )
  136. # XXX: Would be much nicer to retry only at the transaction-layer
  137. # (once we have reliable transactions in place)
  138. if long_retries:
  139. retries_left = MAX_LONG_RETRIES
  140. else:
  141. retries_left = MAX_SHORT_RETRIES
  142. http_url_bytes = urlparse.urlunparse(
  143. ("", "", path_bytes, param_bytes, query_bytes, "")
  144. )
  145. log_result = None
  146. try:
  147. while True:
  148. producer = None
  149. if body_callback:
  150. producer = body_callback(method, http_url_bytes, headers_dict)
  151. try:
  152. def send_request():
  153. request_deferred = self.agent.request(
  154. method,
  155. url_bytes,
  156. Headers(headers_dict),
  157. producer
  158. )
  159. return self.clock.time_bound_deferred(
  160. request_deferred,
  161. time_out=timeout / 1000. if timeout else 60,
  162. )
  163. with logcontext.PreserveLoggingContext():
  164. response = yield send_request()
  165. log_result = "%d %s" % (response.code, response.phrase,)
  166. break
  167. except Exception as e:
  168. if not retry_on_dns_fail and isinstance(e, DNSLookupError):
  169. logger.warn(
  170. "DNS Lookup failed to %s with %s",
  171. destination,
  172. e
  173. )
  174. log_result = "DNS Lookup failed to %s with %s" % (
  175. destination, e
  176. )
  177. raise
  178. logger.warn(
  179. "{%s} Sending request failed to %s: %s %s: %s",
  180. txn_id,
  181. destination,
  182. method,
  183. url_bytes,
  184. _flatten_response_never_received(e),
  185. )
  186. log_result = _flatten_response_never_received(e)
  187. if retries_left and not timeout:
  188. if long_retries:
  189. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  190. delay = min(delay, 60)
  191. delay *= random.uniform(0.8, 1.4)
  192. else:
  193. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  194. delay = min(delay, 2)
  195. delay *= random.uniform(0.8, 1.4)
  196. yield sleep(delay)
  197. retries_left -= 1
  198. else:
  199. raise
  200. finally:
  201. outbound_logger.info(
  202. "{%s} [%s] Result: %s",
  203. txn_id,
  204. destination,
  205. log_result,
  206. )
  207. if 200 <= response.code < 300:
  208. pass
  209. else:
  210. # :'(
  211. # Update transactions table?
  212. with logcontext.PreserveLoggingContext():
  213. body = yield readBody(response)
  214. raise HttpResponseException(
  215. response.code, response.phrase, body
  216. )
  217. defer.returnValue(response)
  218. def sign_request(self, destination, method, url_bytes, headers_dict,
  219. content=None):
  220. request = {
  221. "method": method,
  222. "uri": url_bytes,
  223. "origin": self.server_name,
  224. "destination": destination,
  225. }
  226. if content is not None:
  227. request["content"] = content
  228. request = sign_json(request, self.server_name, self.signing_key)
  229. auth_headers = []
  230. for key, sig in request["signatures"][self.server_name].items():
  231. auth_headers.append(bytes(
  232. "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
  233. self.server_name, key, sig,
  234. )
  235. ))
  236. headers_dict[b"Authorization"] = auth_headers
  237. @defer.inlineCallbacks
  238. def put_json(self, destination, path, data={}, json_data_callback=None,
  239. long_retries=False, timeout=None,
  240. ignore_backoff=False,
  241. backoff_on_404=False):
  242. """ Sends the specifed json data using PUT
  243. Args:
  244. destination (str): The remote server to send the HTTP request
  245. to.
  246. path (str): The HTTP path.
  247. data (dict): A dict containing the data that will be used as
  248. the request body. This will be encoded as JSON.
  249. json_data_callback (callable): A callable returning the dict to
  250. use as the request body.
  251. long_retries (bool): A boolean that indicates whether we should
  252. retry for a short or long time.
  253. timeout(int): How long to try (in ms) the destination for before
  254. giving up. None indicates no timeout.
  255. ignore_backoff (bool): true to ignore the historical backoff data
  256. and try the request anyway.
  257. backoff_on_404 (bool): True if we should count a 404 response as
  258. a failure of the server (and should therefore back off future
  259. requests)
  260. Returns:
  261. Deferred: Succeeds when we get a 2xx HTTP response. The result
  262. will be the decoded JSON body.
  263. Fails with ``HTTPRequestException`` if we get an HTTP response
  264. code >= 300.
  265. Fails with ``NotRetryingDestination`` if we are not yet ready
  266. to retry this server.
  267. Fails with ``FederationDeniedError`` if this destination
  268. is not on our federation whitelist
  269. """
  270. if not json_data_callback:
  271. def json_data_callback():
  272. return data
  273. def body_callback(method, url_bytes, headers_dict):
  274. json_data = json_data_callback()
  275. self.sign_request(
  276. destination, method, url_bytes, headers_dict, json_data
  277. )
  278. producer = _JsonProducer(json_data)
  279. return producer
  280. response = yield self._request(
  281. destination,
  282. "PUT",
  283. path,
  284. body_callback=body_callback,
  285. headers_dict={"Content-Type": ["application/json"]},
  286. long_retries=long_retries,
  287. timeout=timeout,
  288. ignore_backoff=ignore_backoff,
  289. backoff_on_404=backoff_on_404,
  290. )
  291. if 200 <= response.code < 300:
  292. # We need to update the transactions table to say it was sent?
  293. check_content_type_is_json(response.headers)
  294. with logcontext.PreserveLoggingContext():
  295. body = yield readBody(response)
  296. defer.returnValue(json.loads(body))
  297. @defer.inlineCallbacks
  298. def post_json(self, destination, path, data={}, long_retries=False,
  299. timeout=None, ignore_backoff=False, args={}):
  300. """ Sends the specifed json data using POST
  301. Args:
  302. destination (str): The remote server to send the HTTP request
  303. to.
  304. path (str): The HTTP path.
  305. data (dict): A dict containing the data that will be used as
  306. the request body. This will be encoded as JSON.
  307. long_retries (bool): A boolean that indicates whether we should
  308. retry for a short or long time.
  309. timeout(int): How long to try (in ms) the destination for before
  310. giving up. None indicates no timeout.
  311. ignore_backoff (bool): true to ignore the historical backoff data and
  312. try the request anyway.
  313. Returns:
  314. Deferred: Succeeds when we get a 2xx HTTP response. The result
  315. will be the decoded JSON body.
  316. Fails with ``HTTPRequestException`` if we get an HTTP response
  317. code >= 300.
  318. Fails with ``NotRetryingDestination`` if we are not yet ready
  319. to retry this server.
  320. Fails with ``FederationDeniedError`` if this destination
  321. is not on our federation whitelist
  322. """
  323. def body_callback(method, url_bytes, headers_dict):
  324. self.sign_request(
  325. destination, method, url_bytes, headers_dict, data
  326. )
  327. return _JsonProducer(data)
  328. response = yield self._request(
  329. destination,
  330. "POST",
  331. path,
  332. query_bytes=encode_query_args(args),
  333. body_callback=body_callback,
  334. headers_dict={"Content-Type": ["application/json"]},
  335. long_retries=long_retries,
  336. timeout=timeout,
  337. ignore_backoff=ignore_backoff,
  338. )
  339. if 200 <= response.code < 300:
  340. # We need to update the transactions table to say it was sent?
  341. check_content_type_is_json(response.headers)
  342. with logcontext.PreserveLoggingContext():
  343. body = yield readBody(response)
  344. defer.returnValue(json.loads(body))
  345. @defer.inlineCallbacks
  346. def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
  347. timeout=None, ignore_backoff=False):
  348. """ GETs some json from the given host homeserver and path
  349. Args:
  350. destination (str): The remote server to send the HTTP request
  351. to.
  352. path (str): The HTTP path.
  353. args (dict): A dictionary used to create query strings, defaults to
  354. None.
  355. timeout (int): How long to try (in ms) the destination for before
  356. giving up. None indicates no timeout and that the request will
  357. be retried.
  358. ignore_backoff (bool): true to ignore the historical backoff data
  359. and try the request anyway.
  360. Returns:
  361. Deferred: Succeeds when we get a 2xx HTTP response. The result
  362. will be the decoded JSON body.
  363. Fails with ``HTTPRequestException`` if we get an HTTP response
  364. code >= 300.
  365. Fails with ``NotRetryingDestination`` if we are not yet ready
  366. to retry this server.
  367. Fails with ``FederationDeniedError`` if this destination
  368. is not on our federation whitelist
  369. """
  370. logger.debug("get_json args: %s", args)
  371. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  372. def body_callback(method, url_bytes, headers_dict):
  373. self.sign_request(destination, method, url_bytes, headers_dict)
  374. return None
  375. response = yield self._request(
  376. destination,
  377. "GET",
  378. path,
  379. query_bytes=encode_query_args(args),
  380. body_callback=body_callback,
  381. retry_on_dns_fail=retry_on_dns_fail,
  382. timeout=timeout,
  383. ignore_backoff=ignore_backoff,
  384. )
  385. if 200 <= response.code < 300:
  386. # We need to update the transactions table to say it was sent?
  387. check_content_type_is_json(response.headers)
  388. with logcontext.PreserveLoggingContext():
  389. body = yield readBody(response)
  390. defer.returnValue(json.loads(body))
  391. @defer.inlineCallbacks
  392. def delete_json(self, destination, path, long_retries=False,
  393. timeout=None, ignore_backoff=False, args={}):
  394. """Send a DELETE request to the remote expecting some json response
  395. Args:
  396. destination (str): The remote server to send the HTTP request
  397. to.
  398. path (str): The HTTP path.
  399. long_retries (bool): A boolean that indicates whether we should
  400. retry for a short or long time.
  401. timeout(int): How long to try (in ms) the destination for before
  402. giving up. None indicates no timeout.
  403. ignore_backoff (bool): true to ignore the historical backoff data and
  404. try the request anyway.
  405. Returns:
  406. Deferred: Succeeds when we get a 2xx HTTP response. The result
  407. will be the decoded JSON body.
  408. Fails with ``HTTPRequestException`` if we get an HTTP response
  409. code >= 300.
  410. Fails with ``NotRetryingDestination`` if we are not yet ready
  411. to retry this server.
  412. Fails with ``FederationDeniedError`` if this destination
  413. is not on our federation whitelist
  414. """
  415. response = yield self._request(
  416. destination,
  417. "DELETE",
  418. path,
  419. query_bytes=encode_query_args(args),
  420. headers_dict={"Content-Type": ["application/json"]},
  421. long_retries=long_retries,
  422. timeout=timeout,
  423. ignore_backoff=ignore_backoff,
  424. )
  425. if 200 <= response.code < 300:
  426. # We need to update the transactions table to say it was sent?
  427. check_content_type_is_json(response.headers)
  428. with logcontext.PreserveLoggingContext():
  429. body = yield readBody(response)
  430. defer.returnValue(json.loads(body))
  431. @defer.inlineCallbacks
  432. def get_file(self, destination, path, output_stream, args={},
  433. retry_on_dns_fail=True, max_size=None,
  434. ignore_backoff=False):
  435. """GETs a file from a given homeserver
  436. Args:
  437. destination (str): The remote server to send the HTTP request to.
  438. path (str): The HTTP path to GET.
  439. output_stream (file): File to write the response body to.
  440. args (dict): Optional dictionary used to create the query string.
  441. ignore_backoff (bool): true to ignore the historical backoff data
  442. and try the request anyway.
  443. Returns:
  444. Deferred: resolves with an (int,dict) tuple of the file length and
  445. a dict of the response headers.
  446. Fails with ``HTTPRequestException`` if we get an HTTP response code
  447. >= 300
  448. Fails with ``NotRetryingDestination`` if we are not yet ready
  449. to retry this server.
  450. Fails with ``FederationDeniedError`` if this destination
  451. is not on our federation whitelist
  452. """
  453. encoded_args = {}
  454. for k, vs in args.items():
  455. if isinstance(vs, basestring):
  456. vs = [vs]
  457. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  458. query_bytes = urllib.urlencode(encoded_args, True)
  459. logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
  460. def body_callback(method, url_bytes, headers_dict):
  461. self.sign_request(destination, method, url_bytes, headers_dict)
  462. return None
  463. response = yield self._request(
  464. destination,
  465. "GET",
  466. path,
  467. query_bytes=query_bytes,
  468. body_callback=body_callback,
  469. retry_on_dns_fail=retry_on_dns_fail,
  470. ignore_backoff=ignore_backoff,
  471. )
  472. headers = dict(response.headers.getAllRawHeaders())
  473. try:
  474. with logcontext.PreserveLoggingContext():
  475. length = yield _readBodyToFile(
  476. response, output_stream, max_size
  477. )
  478. except Exception:
  479. logger.exception("Failed to download body")
  480. raise
  481. defer.returnValue((length, headers))
  482. class _ReadBodyToFileProtocol(protocol.Protocol):
  483. def __init__(self, stream, deferred, max_size):
  484. self.stream = stream
  485. self.deferred = deferred
  486. self.length = 0
  487. self.max_size = max_size
  488. def dataReceived(self, data):
  489. self.stream.write(data)
  490. self.length += len(data)
  491. if self.max_size is not None and self.length >= self.max_size:
  492. self.deferred.errback(SynapseError(
  493. 502,
  494. "Requested file is too large > %r bytes" % (self.max_size,),
  495. Codes.TOO_LARGE,
  496. ))
  497. self.deferred = defer.Deferred()
  498. self.transport.loseConnection()
  499. def connectionLost(self, reason):
  500. if reason.check(ResponseDone):
  501. self.deferred.callback(self.length)
  502. else:
  503. self.deferred.errback(reason)
  504. def _readBodyToFile(response, stream, max_size):
  505. d = defer.Deferred()
  506. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  507. return d
  508. class _JsonProducer(object):
  509. """ Used by the twisted http client to create the HTTP body from json
  510. """
  511. def __init__(self, jsn):
  512. self.reset(jsn)
  513. def reset(self, jsn):
  514. self.body = encode_canonical_json(jsn)
  515. self.length = len(self.body)
  516. def startProducing(self, consumer):
  517. consumer.write(self.body)
  518. return defer.succeed(None)
  519. def pauseProducing(self):
  520. pass
  521. def stopProducing(self):
  522. pass
  523. def resumeProducing(self):
  524. pass
  525. def _flatten_response_never_received(e):
  526. if hasattr(e, "reasons"):
  527. reasons = ", ".join(
  528. _flatten_response_never_received(f.value)
  529. for f in e.reasons
  530. )
  531. return "%s:[%s]" % (type(e).__name__, reasons)
  532. else:
  533. return repr(e)
  534. def check_content_type_is_json(headers):
  535. """
  536. Check that a set of HTTP headers have a Content-Type header, and that it
  537. is application/json.
  538. Args:
  539. headers (twisted.web.http_headers.Headers): headers to check
  540. Raises:
  541. RuntimeError if the
  542. """
  543. c_type = headers.getRawHeaders("Content-Type")
  544. if c_type is None:
  545. raise RuntimeError(
  546. "No Content-Type header"
  547. )
  548. c_type = c_type[0] # only the first header
  549. val, options = cgi.parse_header(c_type)
  550. if val != "application/json":
  551. raise RuntimeError(
  552. "Content-Type not application/json: was '%s'" % c_type
  553. )
  554. def encode_query_args(args):
  555. encoded_args = {}
  556. for k, vs in args.items():
  557. if isinstance(vs, basestring):
  558. vs = [vs]
  559. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  560. query_bytes = urllib.urlencode(encoded_args, True)
  561. return query_bytes