matrixfederationclient.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import cgi
  17. import logging
  18. import random
  19. import sys
  20. import urllib
  21. from six import string_types
  22. from six.moves.urllib import parse as urlparse
  23. from canonicaljson import encode_canonical_json, json
  24. from prometheus_client import Counter
  25. from signedjson.sign import sign_json
  26. from twisted.internet import defer, protocol, reactor
  27. from twisted.internet.error import DNSLookupError
  28. from twisted.web._newclient import ResponseDone
  29. from twisted.web.client import Agent, HTTPConnectionPool, readBody
  30. from twisted.web.http_headers import Headers
  31. import synapse.metrics
  32. import synapse.util.retryutils
  33. from synapse.api.errors import (
  34. Codes,
  35. FederationDeniedError,
  36. HttpResponseException,
  37. SynapseError,
  38. )
  39. from synapse.http import cancelled_to_request_timed_out_error
  40. from synapse.http.endpoint import matrix_federation_endpoint
  41. from synapse.util import logcontext
  42. from synapse.util.async import add_timeout_to_deferred
  43. from synapse.util.logcontext import make_deferred_yieldable
  44. logger = logging.getLogger(__name__)
  45. outbound_logger = logging.getLogger("synapse.http.outbound")
  46. outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
  47. "", ["method"])
  48. incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
  49. "", ["method", "code"])
  50. MAX_LONG_RETRIES = 10
  51. MAX_SHORT_RETRIES = 3
  52. class MatrixFederationEndpointFactory(object):
  53. def __init__(self, hs):
  54. self.tls_client_options_factory = hs.tls_client_options_factory
  55. def endpointForURI(self, uri):
  56. destination = uri.netloc
  57. return matrix_federation_endpoint(
  58. reactor, destination, timeout=10,
  59. tls_client_options_factory=self.tls_client_options_factory
  60. )
  61. class MatrixFederationHttpClient(object):
  62. """HTTP client used to talk to other homeservers over the federation
  63. protocol. Send client certificates and signs requests.
  64. Attributes:
  65. agent (twisted.web.client.Agent): The twisted Agent used to send the
  66. requests.
  67. """
  68. def __init__(self, hs):
  69. self.hs = hs
  70. self.signing_key = hs.config.signing_key[0]
  71. self.server_name = hs.hostname
  72. pool = HTTPConnectionPool(reactor)
  73. pool.maxPersistentPerHost = 5
  74. pool.cachedConnectionTimeout = 2 * 60
  75. self.agent = Agent.usingEndpointFactory(
  76. reactor, MatrixFederationEndpointFactory(hs), pool=pool
  77. )
  78. self.clock = hs.get_clock()
  79. self._store = hs.get_datastore()
  80. self.version_string = hs.version_string
  81. self._next_id = 1
  82. def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
  83. return urlparse.urlunparse(
  84. ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
  85. )
  86. @defer.inlineCallbacks
  87. def _request(self, destination, method, path,
  88. body_callback, headers_dict={}, param_bytes=b"",
  89. query_bytes=b"", retry_on_dns_fail=True,
  90. timeout=None, long_retries=False,
  91. ignore_backoff=False,
  92. backoff_on_404=False):
  93. """ Creates and sends a request to the given server
  94. Args:
  95. destination (str): The remote server to send the HTTP request to.
  96. method (str): HTTP method
  97. path (str): The HTTP path
  98. ignore_backoff (bool): true to ignore the historical backoff data
  99. and try the request anyway.
  100. backoff_on_404 (bool): Back off if we get a 404
  101. Returns:
  102. Deferred: resolves with the http response object on success.
  103. Fails with ``HTTPRequestException``: if we get an HTTP response
  104. code >= 300.
  105. Fails with ``NotRetryingDestination`` if we are not yet ready
  106. to retry this server.
  107. Fails with ``FederationDeniedError`` if this destination
  108. is not on our federation whitelist
  109. (May also fail with plenty of other Exceptions for things like DNS
  110. failures, connection failures, SSL failures.)
  111. """
  112. if (
  113. self.hs.config.federation_domain_whitelist and
  114. destination not in self.hs.config.federation_domain_whitelist
  115. ):
  116. raise FederationDeniedError(destination)
  117. limiter = yield synapse.util.retryutils.get_retry_limiter(
  118. destination,
  119. self.clock,
  120. self._store,
  121. backoff_on_404=backoff_on_404,
  122. ignore_backoff=ignore_backoff,
  123. )
  124. destination = destination.encode("ascii")
  125. path_bytes = path.encode("ascii")
  126. with limiter:
  127. headers_dict[b"User-Agent"] = [self.version_string]
  128. headers_dict[b"Host"] = [destination]
  129. url_bytes = self._create_url(
  130. destination, path_bytes, param_bytes, query_bytes
  131. )
  132. txn_id = "%s-O-%s" % (method, self._next_id)
  133. self._next_id = (self._next_id + 1) % (sys.maxint - 1)
  134. outbound_logger.info(
  135. "{%s} [%s] Sending request: %s %s",
  136. txn_id, destination, method, url_bytes
  137. )
  138. # XXX: Would be much nicer to retry only at the transaction-layer
  139. # (once we have reliable transactions in place)
  140. if long_retries:
  141. retries_left = MAX_LONG_RETRIES
  142. else:
  143. retries_left = MAX_SHORT_RETRIES
  144. http_url_bytes = urlparse.urlunparse(
  145. ("", "", path_bytes, param_bytes, query_bytes, "")
  146. )
  147. log_result = None
  148. try:
  149. while True:
  150. producer = None
  151. if body_callback:
  152. producer = body_callback(method, http_url_bytes, headers_dict)
  153. try:
  154. request_deferred = self.agent.request(
  155. method,
  156. url_bytes,
  157. Headers(headers_dict),
  158. producer
  159. )
  160. add_timeout_to_deferred(
  161. request_deferred,
  162. timeout / 1000. if timeout else 60,
  163. self.hs.get_reactor(),
  164. cancelled_to_request_timed_out_error,
  165. )
  166. response = yield make_deferred_yieldable(
  167. request_deferred,
  168. )
  169. log_result = "%d %s" % (response.code, response.phrase,)
  170. break
  171. except Exception as e:
  172. if not retry_on_dns_fail and isinstance(e, DNSLookupError):
  173. logger.warn(
  174. "DNS Lookup failed to %s with %s",
  175. destination,
  176. e
  177. )
  178. log_result = "DNS Lookup failed to %s with %s" % (
  179. destination, e
  180. )
  181. raise
  182. logger.warn(
  183. "{%s} Sending request failed to %s: %s %s: %s",
  184. txn_id,
  185. destination,
  186. method,
  187. url_bytes,
  188. _flatten_response_never_received(e),
  189. )
  190. log_result = _flatten_response_never_received(e)
  191. if retries_left and not timeout:
  192. if long_retries:
  193. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  194. delay = min(delay, 60)
  195. delay *= random.uniform(0.8, 1.4)
  196. else:
  197. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  198. delay = min(delay, 2)
  199. delay *= random.uniform(0.8, 1.4)
  200. yield self.clock.sleep(delay)
  201. retries_left -= 1
  202. else:
  203. raise
  204. finally:
  205. outbound_logger.info(
  206. "{%s} [%s] Result: %s",
  207. txn_id,
  208. destination,
  209. log_result,
  210. )
  211. if 200 <= response.code < 300:
  212. pass
  213. else:
  214. # :'(
  215. # Update transactions table?
  216. with logcontext.PreserveLoggingContext():
  217. body = yield readBody(response)
  218. raise HttpResponseException(
  219. response.code, response.phrase, body
  220. )
  221. defer.returnValue(response)
  222. def sign_request(self, destination, method, url_bytes, headers_dict,
  223. content=None, destination_is=None):
  224. """
  225. Signs a request by adding an Authorization header to headers_dict
  226. Args:
  227. destination (bytes|None): The desination home server of the request.
  228. May be None if the destination is an identity server, in which case
  229. destination_is must be non-None.
  230. method (bytes): The HTTP method of the request
  231. url_bytes (bytes): The URI path of the request
  232. headers_dict (dict): Dictionary of request headers to append to
  233. content (bytes): The body of the request
  234. destination_is (bytes): As 'destination', but if the destination is an
  235. identity server
  236. Returns:
  237. None
  238. """
  239. request = {
  240. "method": method,
  241. "uri": url_bytes,
  242. "origin": self.server_name,
  243. }
  244. if destination is not None:
  245. request["destination"] = destination
  246. if destination_is is not None:
  247. request["destination_is"] = destination_is
  248. if content is not None:
  249. request["content"] = content
  250. request = sign_json(request, self.server_name, self.signing_key)
  251. auth_headers = []
  252. for key, sig in request["signatures"][self.server_name].items():
  253. auth_headers.append(bytes(
  254. "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
  255. self.server_name, key, sig,
  256. )
  257. ))
  258. headers_dict[b"Authorization"] = auth_headers
  259. @defer.inlineCallbacks
  260. def put_json(self, destination, path, args={}, data={},
  261. json_data_callback=None,
  262. long_retries=False, timeout=None,
  263. ignore_backoff=False,
  264. backoff_on_404=False):
  265. """ Sends the specifed json data using PUT
  266. Args:
  267. destination (str): The remote server to send the HTTP request
  268. to.
  269. path (str): The HTTP path.
  270. args (dict): query params
  271. data (dict): A dict containing the data that will be used as
  272. the request body. This will be encoded as JSON.
  273. json_data_callback (callable): A callable returning the dict to
  274. use as the request body.
  275. long_retries (bool): A boolean that indicates whether we should
  276. retry for a short or long time.
  277. timeout(int): How long to try (in ms) the destination for before
  278. giving up. None indicates no timeout.
  279. ignore_backoff (bool): true to ignore the historical backoff data
  280. and try the request anyway.
  281. backoff_on_404 (bool): True if we should count a 404 response as
  282. a failure of the server (and should therefore back off future
  283. requests)
  284. Returns:
  285. Deferred: Succeeds when we get a 2xx HTTP response. The result
  286. will be the decoded JSON body.
  287. Fails with ``HTTPRequestException`` if we get an HTTP response
  288. code >= 300.
  289. Fails with ``NotRetryingDestination`` if we are not yet ready
  290. to retry this server.
  291. Fails with ``FederationDeniedError`` if this destination
  292. is not on our federation whitelist
  293. """
  294. if not json_data_callback:
  295. def json_data_callback():
  296. return data
  297. def body_callback(method, url_bytes, headers_dict):
  298. json_data = json_data_callback()
  299. self.sign_request(
  300. destination, method, url_bytes, headers_dict, json_data
  301. )
  302. producer = _JsonProducer(json_data)
  303. return producer
  304. response = yield self._request(
  305. destination,
  306. "PUT",
  307. path,
  308. body_callback=body_callback,
  309. headers_dict={"Content-Type": ["application/json"]},
  310. query_bytes=encode_query_args(args),
  311. long_retries=long_retries,
  312. timeout=timeout,
  313. ignore_backoff=ignore_backoff,
  314. backoff_on_404=backoff_on_404,
  315. )
  316. if 200 <= response.code < 300:
  317. # We need to update the transactions table to say it was sent?
  318. check_content_type_is_json(response.headers)
  319. with logcontext.PreserveLoggingContext():
  320. body = yield readBody(response)
  321. defer.returnValue(json.loads(body))
  322. @defer.inlineCallbacks
  323. def post_json(self, destination, path, data={}, long_retries=False,
  324. timeout=None, ignore_backoff=False, args={}):
  325. """ Sends the specifed json data using POST
  326. Args:
  327. destination (str): The remote server to send the HTTP request
  328. to.
  329. path (str): The HTTP path.
  330. data (dict): A dict containing the data that will be used as
  331. the request body. This will be encoded as JSON.
  332. long_retries (bool): A boolean that indicates whether we should
  333. retry for a short or long time.
  334. timeout(int): How long to try (in ms) the destination for before
  335. giving up. None indicates no timeout.
  336. ignore_backoff (bool): true to ignore the historical backoff data and
  337. try the request anyway.
  338. args (dict): query params
  339. Returns:
  340. Deferred: Succeeds when we get a 2xx HTTP response. The result
  341. will be the decoded JSON body.
  342. Fails with ``HTTPRequestException`` if we get an HTTP response
  343. code >= 300.
  344. Fails with ``NotRetryingDestination`` if we are not yet ready
  345. to retry this server.
  346. Fails with ``FederationDeniedError`` if this destination
  347. is not on our federation whitelist
  348. """
  349. def body_callback(method, url_bytes, headers_dict):
  350. self.sign_request(
  351. destination, method, url_bytes, headers_dict, data
  352. )
  353. return _JsonProducer(data)
  354. response = yield self._request(
  355. destination,
  356. "POST",
  357. path,
  358. query_bytes=encode_query_args(args),
  359. body_callback=body_callback,
  360. headers_dict={"Content-Type": ["application/json"]},
  361. long_retries=long_retries,
  362. timeout=timeout,
  363. ignore_backoff=ignore_backoff,
  364. )
  365. if 200 <= response.code < 300:
  366. # We need to update the transactions table to say it was sent?
  367. check_content_type_is_json(response.headers)
  368. with logcontext.PreserveLoggingContext():
  369. body = yield readBody(response)
  370. defer.returnValue(json.loads(body))
  371. @defer.inlineCallbacks
  372. def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
  373. timeout=None, ignore_backoff=False):
  374. """ GETs some json from the given host homeserver and path
  375. Args:
  376. destination (str): The remote server to send the HTTP request
  377. to.
  378. path (str): The HTTP path.
  379. args (dict): A dictionary used to create query strings, defaults to
  380. None.
  381. timeout (int): How long to try (in ms) the destination for before
  382. giving up. None indicates no timeout and that the request will
  383. be retried.
  384. ignore_backoff (bool): true to ignore the historical backoff data
  385. and try the request anyway.
  386. Returns:
  387. Deferred: Succeeds when we get a 2xx HTTP response. The result
  388. will be the decoded JSON body.
  389. Fails with ``HTTPRequestException`` if we get an HTTP response
  390. code >= 300.
  391. Fails with ``NotRetryingDestination`` if we are not yet ready
  392. to retry this server.
  393. Fails with ``FederationDeniedError`` if this destination
  394. is not on our federation whitelist
  395. """
  396. logger.debug("get_json args: %s", args)
  397. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  398. def body_callback(method, url_bytes, headers_dict):
  399. self.sign_request(destination, method, url_bytes, headers_dict)
  400. return None
  401. response = yield self._request(
  402. destination,
  403. "GET",
  404. path,
  405. query_bytes=encode_query_args(args),
  406. body_callback=body_callback,
  407. retry_on_dns_fail=retry_on_dns_fail,
  408. timeout=timeout,
  409. ignore_backoff=ignore_backoff,
  410. )
  411. if 200 <= response.code < 300:
  412. # We need to update the transactions table to say it was sent?
  413. check_content_type_is_json(response.headers)
  414. with logcontext.PreserveLoggingContext():
  415. body = yield readBody(response)
  416. defer.returnValue(json.loads(body))
  417. @defer.inlineCallbacks
  418. def delete_json(self, destination, path, long_retries=False,
  419. timeout=None, ignore_backoff=False, args={}):
  420. """Send a DELETE request to the remote expecting some json response
  421. Args:
  422. destination (str): The remote server to send the HTTP request
  423. to.
  424. path (str): The HTTP path.
  425. long_retries (bool): A boolean that indicates whether we should
  426. retry for a short or long time.
  427. timeout(int): How long to try (in ms) the destination for before
  428. giving up. None indicates no timeout.
  429. ignore_backoff (bool): true to ignore the historical backoff data and
  430. try the request anyway.
  431. Returns:
  432. Deferred: Succeeds when we get a 2xx HTTP response. The result
  433. will be the decoded JSON body.
  434. Fails with ``HTTPRequestException`` if we get an HTTP response
  435. code >= 300.
  436. Fails with ``NotRetryingDestination`` if we are not yet ready
  437. to retry this server.
  438. Fails with ``FederationDeniedError`` if this destination
  439. is not on our federation whitelist
  440. """
  441. response = yield self._request(
  442. destination,
  443. "DELETE",
  444. path,
  445. query_bytes=encode_query_args(args),
  446. headers_dict={"Content-Type": ["application/json"]},
  447. long_retries=long_retries,
  448. timeout=timeout,
  449. ignore_backoff=ignore_backoff,
  450. )
  451. if 200 <= response.code < 300:
  452. # We need to update the transactions table to say it was sent?
  453. check_content_type_is_json(response.headers)
  454. with logcontext.PreserveLoggingContext():
  455. body = yield readBody(response)
  456. defer.returnValue(json.loads(body))
  457. @defer.inlineCallbacks
  458. def get_file(self, destination, path, output_stream, args={},
  459. retry_on_dns_fail=True, max_size=None,
  460. ignore_backoff=False):
  461. """GETs a file from a given homeserver
  462. Args:
  463. destination (str): The remote server to send the HTTP request to.
  464. path (str): The HTTP path to GET.
  465. output_stream (file): File to write the response body to.
  466. args (dict): Optional dictionary used to create the query string.
  467. ignore_backoff (bool): true to ignore the historical backoff data
  468. and try the request anyway.
  469. Returns:
  470. Deferred: resolves with an (int,dict) tuple of the file length and
  471. a dict of the response headers.
  472. Fails with ``HTTPRequestException`` if we get an HTTP response code
  473. >= 300
  474. Fails with ``NotRetryingDestination`` if we are not yet ready
  475. to retry this server.
  476. Fails with ``FederationDeniedError`` if this destination
  477. is not on our federation whitelist
  478. """
  479. encoded_args = {}
  480. for k, vs in args.items():
  481. if isinstance(vs, string_types):
  482. vs = [vs]
  483. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  484. query_bytes = urllib.urlencode(encoded_args, True)
  485. logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
  486. def body_callback(method, url_bytes, headers_dict):
  487. self.sign_request(destination, method, url_bytes, headers_dict)
  488. return None
  489. response = yield self._request(
  490. destination,
  491. "GET",
  492. path,
  493. query_bytes=query_bytes,
  494. body_callback=body_callback,
  495. retry_on_dns_fail=retry_on_dns_fail,
  496. ignore_backoff=ignore_backoff,
  497. )
  498. headers = dict(response.headers.getAllRawHeaders())
  499. try:
  500. with logcontext.PreserveLoggingContext():
  501. length = yield _readBodyToFile(
  502. response, output_stream, max_size
  503. )
  504. except Exception:
  505. logger.exception("Failed to download body")
  506. raise
  507. defer.returnValue((length, headers))
  508. class _ReadBodyToFileProtocol(protocol.Protocol):
  509. def __init__(self, stream, deferred, max_size):
  510. self.stream = stream
  511. self.deferred = deferred
  512. self.length = 0
  513. self.max_size = max_size
  514. def dataReceived(self, data):
  515. self.stream.write(data)
  516. self.length += len(data)
  517. if self.max_size is not None and self.length >= self.max_size:
  518. self.deferred.errback(SynapseError(
  519. 502,
  520. "Requested file is too large > %r bytes" % (self.max_size,),
  521. Codes.TOO_LARGE,
  522. ))
  523. self.deferred = defer.Deferred()
  524. self.transport.loseConnection()
  525. def connectionLost(self, reason):
  526. if reason.check(ResponseDone):
  527. self.deferred.callback(self.length)
  528. else:
  529. self.deferred.errback(reason)
  530. def _readBodyToFile(response, stream, max_size):
  531. d = defer.Deferred()
  532. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  533. return d
  534. class _JsonProducer(object):
  535. """ Used by the twisted http client to create the HTTP body from json
  536. """
  537. def __init__(self, jsn):
  538. self.reset(jsn)
  539. def reset(self, jsn):
  540. self.body = encode_canonical_json(jsn)
  541. self.length = len(self.body)
  542. def startProducing(self, consumer):
  543. consumer.write(self.body)
  544. return defer.succeed(None)
  545. def pauseProducing(self):
  546. pass
  547. def stopProducing(self):
  548. pass
  549. def resumeProducing(self):
  550. pass
  551. def _flatten_response_never_received(e):
  552. if hasattr(e, "reasons"):
  553. reasons = ", ".join(
  554. _flatten_response_never_received(f.value)
  555. for f in e.reasons
  556. )
  557. return "%s:[%s]" % (type(e).__name__, reasons)
  558. else:
  559. return repr(e)
  560. def check_content_type_is_json(headers):
  561. """
  562. Check that a set of HTTP headers have a Content-Type header, and that it
  563. is application/json.
  564. Args:
  565. headers (twisted.web.http_headers.Headers): headers to check
  566. Raises:
  567. RuntimeError if the
  568. """
  569. c_type = headers.getRawHeaders(b"Content-Type")
  570. if c_type is None:
  571. raise RuntimeError(
  572. "No Content-Type header"
  573. )
  574. c_type = c_type[0] # only the first header
  575. val, options = cgi.parse_header(c_type)
  576. if val != "application/json":
  577. raise RuntimeError(
  578. "Content-Type not application/json: was '%s'" % c_type
  579. )
  580. def encode_query_args(args):
  581. encoded_args = {}
  582. for k, vs in args.items():
  583. if isinstance(vs, string_types):
  584. vs = [vs]
  585. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  586. query_bytes = urllib.urlencode(encoded_args, True)
  587. return query_bytes