matrixfederationclient.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import cgi
  17. import logging
  18. import random
  19. import sys
  20. from six import PY3, string_types
  21. from six.moves import urllib
  22. import treq
  23. from canonicaljson import encode_canonical_json
  24. from prometheus_client import Counter
  25. from signedjson.sign import sign_json
  26. from twisted.internet import defer, protocol, reactor
  27. from twisted.internet.error import DNSLookupError
  28. from twisted.web._newclient import ResponseDone
  29. from twisted.web.client import Agent, HTTPConnectionPool
  30. from twisted.web.http_headers import Headers
  31. import synapse.metrics
  32. import synapse.util.retryutils
  33. from synapse.api.errors import (
  34. Codes,
  35. FederationDeniedError,
  36. HttpResponseException,
  37. SynapseError,
  38. )
  39. from synapse.http import cancelled_to_request_timed_out_error
  40. from synapse.http.endpoint import matrix_federation_endpoint
  41. from synapse.util import logcontext
  42. from synapse.util.async_helpers import add_timeout_to_deferred
  43. from synapse.util.logcontext import make_deferred_yieldable
  44. logger = logging.getLogger(__name__)
  45. outbound_logger = logging.getLogger("synapse.http.outbound")
  46. outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
  47. "", ["method"])
  48. incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
  49. "", ["method", "code"])
  50. MAX_LONG_RETRIES = 10
  51. MAX_SHORT_RETRIES = 3
  52. if PY3:
  53. MAXINT = sys.maxsize
  54. else:
  55. MAXINT = sys.maxint
  56. class MatrixFederationEndpointFactory(object):
  57. def __init__(self, hs):
  58. self.tls_client_options_factory = hs.tls_client_options_factory
  59. def endpointForURI(self, uri):
  60. destination = uri.netloc.decode('ascii')
  61. return matrix_federation_endpoint(
  62. reactor, destination, timeout=10,
  63. tls_client_options_factory=self.tls_client_options_factory
  64. )
  65. class MatrixFederationHttpClient(object):
  66. """HTTP client used to talk to other homeservers over the federation
  67. protocol. Send client certificates and signs requests.
  68. Attributes:
  69. agent (twisted.web.client.Agent): The twisted Agent used to send the
  70. requests.
  71. """
  72. def __init__(self, hs):
  73. self.hs = hs
  74. self.signing_key = hs.config.signing_key[0]
  75. self.server_name = hs.hostname
  76. pool = HTTPConnectionPool(reactor)
  77. pool.maxPersistentPerHost = 5
  78. pool.cachedConnectionTimeout = 2 * 60
  79. self.agent = Agent.usingEndpointFactory(
  80. reactor, MatrixFederationEndpointFactory(hs), pool=pool
  81. )
  82. self.clock = hs.get_clock()
  83. self._store = hs.get_datastore()
  84. self.version_string = hs.version_string.encode('ascii')
  85. self._next_id = 1
  86. def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
  87. return urllib.parse.urlunparse(
  88. (b"matrix", destination, path_bytes, param_bytes, query_bytes, b"")
  89. )
  90. @defer.inlineCallbacks
  91. def _request(self, destination, method, path,
  92. json=None, json_callback=None,
  93. param_bytes=b"",
  94. query=None, retry_on_dns_fail=True,
  95. timeout=None, long_retries=False,
  96. ignore_backoff=False,
  97. backoff_on_404=False):
  98. """
  99. Creates and sends a request to the given server.
  100. Args:
  101. destination (str): The remote server to send the HTTP request to.
  102. method (str): HTTP method
  103. path (str): The HTTP path
  104. json (dict or None): JSON to send in the body.
  105. json_callback (func or None): A callback to generate the JSON.
  106. query (dict or None): Query arguments.
  107. ignore_backoff (bool): true to ignore the historical backoff data
  108. and try the request anyway.
  109. backoff_on_404 (bool): Back off if we get a 404
  110. Returns:
  111. Deferred: resolves with the http response object on success.
  112. Fails with ``HTTPRequestException``: if we get an HTTP response
  113. code >= 300.
  114. Fails with ``NotRetryingDestination`` if we are not yet ready
  115. to retry this server.
  116. Fails with ``FederationDeniedError`` if this destination
  117. is not on our federation whitelist
  118. (May also fail with plenty of other Exceptions for things like DNS
  119. failures, connection failures, SSL failures.)
  120. """
  121. if (
  122. self.hs.config.federation_domain_whitelist is not None and
  123. destination not in self.hs.config.federation_domain_whitelist
  124. ):
  125. raise FederationDeniedError(destination)
  126. limiter = yield synapse.util.retryutils.get_retry_limiter(
  127. destination,
  128. self.clock,
  129. self._store,
  130. backoff_on_404=backoff_on_404,
  131. ignore_backoff=ignore_backoff,
  132. )
  133. headers_dict = {}
  134. path_bytes = path.encode("ascii")
  135. if query:
  136. query_bytes = encode_query_args(query)
  137. else:
  138. query_bytes = b""
  139. headers_dict = {
  140. "User-Agent": [self.version_string],
  141. "Host": [destination],
  142. }
  143. with limiter:
  144. url = self._create_url(
  145. destination.encode("ascii"), path_bytes, param_bytes, query_bytes
  146. ).decode('ascii')
  147. txn_id = "%s-O-%s" % (method, self._next_id)
  148. self._next_id = (self._next_id + 1) % (MAXINT - 1)
  149. outbound_logger.info(
  150. "{%s} [%s] Sending request: %s %s",
  151. txn_id, destination, method, url
  152. )
  153. # XXX: Would be much nicer to retry only at the transaction-layer
  154. # (once we have reliable transactions in place)
  155. if long_retries:
  156. retries_left = MAX_LONG_RETRIES
  157. else:
  158. retries_left = MAX_SHORT_RETRIES
  159. http_url = urllib.parse.urlunparse(
  160. (b"", b"", path_bytes, param_bytes, query_bytes, b"")
  161. ).decode('ascii')
  162. log_result = None
  163. try:
  164. while True:
  165. try:
  166. if json_callback:
  167. json = json_callback()
  168. if json:
  169. data = encode_canonical_json(json)
  170. headers_dict["Content-Type"] = ["application/json"]
  171. self.sign_request(
  172. destination, method, http_url, headers_dict, json
  173. )
  174. else:
  175. data = None
  176. self.sign_request(destination, method, http_url, headers_dict)
  177. request_deferred = treq.request(
  178. method,
  179. url,
  180. headers=Headers(headers_dict),
  181. data=data,
  182. agent=self.agent,
  183. )
  184. add_timeout_to_deferred(
  185. request_deferred,
  186. timeout / 1000. if timeout else 60,
  187. self.hs.get_reactor(),
  188. cancelled_to_request_timed_out_error,
  189. )
  190. response = yield make_deferred_yieldable(
  191. request_deferred,
  192. )
  193. log_result = "%d %s" % (response.code, response.phrase,)
  194. break
  195. except Exception as e:
  196. if not retry_on_dns_fail and isinstance(e, DNSLookupError):
  197. logger.warn(
  198. "DNS Lookup failed to %s with %s",
  199. destination,
  200. e
  201. )
  202. log_result = "DNS Lookup failed to %s with %s" % (
  203. destination, e
  204. )
  205. raise
  206. logger.warn(
  207. "{%s} Sending request failed to %s: %s %s: %s",
  208. txn_id,
  209. destination,
  210. method,
  211. url,
  212. _flatten_response_never_received(e),
  213. )
  214. log_result = _flatten_response_never_received(e)
  215. if retries_left and not timeout:
  216. if long_retries:
  217. delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
  218. delay = min(delay, 60)
  219. delay *= random.uniform(0.8, 1.4)
  220. else:
  221. delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
  222. delay = min(delay, 2)
  223. delay *= random.uniform(0.8, 1.4)
  224. yield self.clock.sleep(delay)
  225. retries_left -= 1
  226. else:
  227. raise
  228. finally:
  229. outbound_logger.info(
  230. "{%s} [%s] Result: %s",
  231. txn_id,
  232. destination,
  233. log_result,
  234. )
  235. if 200 <= response.code < 300:
  236. pass
  237. else:
  238. # :'(
  239. # Update transactions table?
  240. with logcontext.PreserveLoggingContext():
  241. body = yield treq.content(response)
  242. raise HttpResponseException(
  243. response.code, response.phrase, body
  244. )
  245. defer.returnValue(response)
  246. def sign_request(self, destination, method, url_bytes, headers_dict,
  247. content=None, destination_is=None):
  248. """
  249. Signs a request by adding an Authorization header to headers_dict
  250. Args:
  251. destination (bytes|None): The desination home server of the request.
  252. May be None if the destination is an identity server, in which case
  253. destination_is must be non-None.
  254. method (bytes): The HTTP method of the request
  255. url_bytes (bytes): The URI path of the request
  256. headers_dict (dict): Dictionary of request headers to append to
  257. content (bytes): The body of the request
  258. destination_is (bytes): As 'destination', but if the destination is an
  259. identity server
  260. Returns:
  261. None
  262. """
  263. request = {
  264. "method": method,
  265. "uri": url_bytes,
  266. "origin": self.server_name,
  267. }
  268. if destination is not None:
  269. request["destination"] = destination
  270. if destination_is is not None:
  271. request["destination_is"] = destination_is
  272. if content is not None:
  273. request["content"] = content
  274. request = sign_json(request, self.server_name, self.signing_key)
  275. auth_headers = []
  276. for key, sig in request["signatures"][self.server_name].items():
  277. auth_headers.append((
  278. "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
  279. self.server_name, key, sig,
  280. )).encode('ascii')
  281. )
  282. headers_dict[b"Authorization"] = auth_headers
  283. @defer.inlineCallbacks
  284. def put_json(self, destination, path, args={}, data={},
  285. json_data_callback=None,
  286. long_retries=False, timeout=None,
  287. ignore_backoff=False,
  288. backoff_on_404=False):
  289. """ Sends the specifed json data using PUT
  290. Args:
  291. destination (str): The remote server to send the HTTP request
  292. to.
  293. path (str): The HTTP path.
  294. args (dict): query params
  295. data (dict): A dict containing the data that will be used as
  296. the request body. This will be encoded as JSON.
  297. json_data_callback (callable): A callable returning the dict to
  298. use as the request body.
  299. long_retries (bool): A boolean that indicates whether we should
  300. retry for a short or long time.
  301. timeout(int): How long to try (in ms) the destination for before
  302. giving up. None indicates no timeout.
  303. ignore_backoff (bool): true to ignore the historical backoff data
  304. and try the request anyway.
  305. backoff_on_404 (bool): True if we should count a 404 response as
  306. a failure of the server (and should therefore back off future
  307. requests)
  308. Returns:
  309. Deferred: Succeeds when we get a 2xx HTTP response. The result
  310. will be the decoded JSON body.
  311. Fails with ``HTTPRequestException`` if we get an HTTP response
  312. code >= 300.
  313. Fails with ``NotRetryingDestination`` if we are not yet ready
  314. to retry this server.
  315. Fails with ``FederationDeniedError`` if this destination
  316. is not on our federation whitelist
  317. """
  318. if not json_data_callback:
  319. json_data_callback = lambda: data
  320. response = yield self._request(
  321. destination,
  322. "PUT",
  323. path,
  324. json_callback=json_data_callback,
  325. query=args,
  326. long_retries=long_retries,
  327. timeout=timeout,
  328. ignore_backoff=ignore_backoff,
  329. backoff_on_404=backoff_on_404,
  330. )
  331. if 200 <= response.code < 300:
  332. # We need to update the transactions table to say it was sent?
  333. check_content_type_is_json(response.headers)
  334. with logcontext.PreserveLoggingContext():
  335. body = yield treq.json_content(response)
  336. defer.returnValue(body)
  337. @defer.inlineCallbacks
  338. def post_json(self, destination, path, data={}, long_retries=False,
  339. timeout=None, ignore_backoff=False, args={}):
  340. """ Sends the specifed json data using POST
  341. Args:
  342. destination (str): The remote server to send the HTTP request
  343. to.
  344. path (str): The HTTP path.
  345. data (dict): A dict containing the data that will be used as
  346. the request body. This will be encoded as JSON.
  347. long_retries (bool): A boolean that indicates whether we should
  348. retry for a short or long time.
  349. timeout(int): How long to try (in ms) the destination for before
  350. giving up. None indicates no timeout.
  351. ignore_backoff (bool): true to ignore the historical backoff data and
  352. try the request anyway.
  353. args (dict): query params
  354. Returns:
  355. Deferred: Succeeds when we get a 2xx HTTP response. The result
  356. will be the decoded JSON body.
  357. Fails with ``HTTPRequestException`` if we get an HTTP response
  358. code >= 300.
  359. Fails with ``NotRetryingDestination`` if we are not yet ready
  360. to retry this server.
  361. Fails with ``FederationDeniedError`` if this destination
  362. is not on our federation whitelist
  363. """
  364. response = yield self._request(
  365. destination,
  366. "POST",
  367. path,
  368. query=args,
  369. json=data,
  370. long_retries=long_retries,
  371. timeout=timeout,
  372. ignore_backoff=ignore_backoff,
  373. )
  374. if 200 <= response.code < 300:
  375. # We need to update the transactions table to say it was sent?
  376. check_content_type_is_json(response.headers)
  377. with logcontext.PreserveLoggingContext():
  378. body = yield treq.json_content(response)
  379. defer.returnValue(body)
  380. @defer.inlineCallbacks
  381. def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
  382. timeout=None, ignore_backoff=False):
  383. """ GETs some json from the given host homeserver and path
  384. Args:
  385. destination (str): The remote server to send the HTTP request
  386. to.
  387. path (str): The HTTP path.
  388. args (dict|None): A dictionary used to create query strings, defaults to
  389. None.
  390. timeout (int): How long to try (in ms) the destination for before
  391. giving up. None indicates no timeout and that the request will
  392. be retried.
  393. ignore_backoff (bool): true to ignore the historical backoff data
  394. and try the request anyway.
  395. Returns:
  396. Deferred: Succeeds when we get a 2xx HTTP response. The result
  397. will be the decoded JSON body.
  398. Fails with ``HTTPRequestException`` if we get an HTTP response
  399. code >= 300.
  400. Fails with ``NotRetryingDestination`` if we are not yet ready
  401. to retry this server.
  402. Fails with ``FederationDeniedError`` if this destination
  403. is not on our federation whitelist
  404. """
  405. logger.debug("get_json args: %s", args)
  406. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  407. response = yield self._request(
  408. destination,
  409. "GET",
  410. path,
  411. query=args,
  412. retry_on_dns_fail=retry_on_dns_fail,
  413. timeout=timeout,
  414. ignore_backoff=ignore_backoff,
  415. )
  416. if 200 <= response.code < 300:
  417. # We need to update the transactions table to say it was sent?
  418. check_content_type_is_json(response.headers)
  419. with logcontext.PreserveLoggingContext():
  420. body = yield treq.json_content(response)
  421. defer.returnValue(body)
  422. @defer.inlineCallbacks
  423. def delete_json(self, destination, path, long_retries=False,
  424. timeout=None, ignore_backoff=False, args={}):
  425. """Send a DELETE request to the remote expecting some json response
  426. Args:
  427. destination (str): The remote server to send the HTTP request
  428. to.
  429. path (str): The HTTP path.
  430. long_retries (bool): A boolean that indicates whether we should
  431. retry for a short or long time.
  432. timeout(int): How long to try (in ms) the destination for before
  433. giving up. None indicates no timeout.
  434. ignore_backoff (bool): true to ignore the historical backoff data and
  435. try the request anyway.
  436. Returns:
  437. Deferred: Succeeds when we get a 2xx HTTP response. The result
  438. will be the decoded JSON body.
  439. Fails with ``HTTPRequestException`` if we get an HTTP response
  440. code >= 300.
  441. Fails with ``NotRetryingDestination`` if we are not yet ready
  442. to retry this server.
  443. Fails with ``FederationDeniedError`` if this destination
  444. is not on our federation whitelist
  445. """
  446. response = yield self._request(
  447. destination,
  448. "DELETE",
  449. path,
  450. query=args,
  451. long_retries=long_retries,
  452. timeout=timeout,
  453. ignore_backoff=ignore_backoff,
  454. )
  455. if 200 <= response.code < 300:
  456. # We need to update the transactions table to say it was sent?
  457. check_content_type_is_json(response.headers)
  458. with logcontext.PreserveLoggingContext():
  459. body = yield treq.json_content(response)
  460. defer.returnValue(body)
  461. @defer.inlineCallbacks
  462. def get_file(self, destination, path, output_stream, args={},
  463. retry_on_dns_fail=True, max_size=None,
  464. ignore_backoff=False):
  465. """GETs a file from a given homeserver
  466. Args:
  467. destination (str): The remote server to send the HTTP request to.
  468. path (str): The HTTP path to GET.
  469. output_stream (file): File to write the response body to.
  470. args (dict): Optional dictionary used to create the query string.
  471. ignore_backoff (bool): true to ignore the historical backoff data
  472. and try the request anyway.
  473. Returns:
  474. Deferred: resolves with an (int,dict) tuple of the file length and
  475. a dict of the response headers.
  476. Fails with ``HTTPRequestException`` if we get an HTTP response code
  477. >= 300
  478. Fails with ``NotRetryingDestination`` if we are not yet ready
  479. to retry this server.
  480. Fails with ``FederationDeniedError`` if this destination
  481. is not on our federation whitelist
  482. """
  483. response = yield self._request(
  484. destination,
  485. "GET",
  486. path,
  487. query=args,
  488. retry_on_dns_fail=retry_on_dns_fail,
  489. ignore_backoff=ignore_backoff,
  490. )
  491. headers = dict(response.headers.getAllRawHeaders())
  492. try:
  493. with logcontext.PreserveLoggingContext():
  494. length = yield _readBodyToFile(
  495. response, output_stream, max_size
  496. )
  497. except Exception:
  498. logger.exception("Failed to download body")
  499. raise
  500. defer.returnValue((length, headers))
  501. class _ReadBodyToFileProtocol(protocol.Protocol):
  502. def __init__(self, stream, deferred, max_size):
  503. self.stream = stream
  504. self.deferred = deferred
  505. self.length = 0
  506. self.max_size = max_size
  507. def dataReceived(self, data):
  508. self.stream.write(data)
  509. self.length += len(data)
  510. if self.max_size is not None and self.length >= self.max_size:
  511. self.deferred.errback(SynapseError(
  512. 502,
  513. "Requested file is too large > %r bytes" % (self.max_size,),
  514. Codes.TOO_LARGE,
  515. ))
  516. self.deferred = defer.Deferred()
  517. self.transport.loseConnection()
  518. def connectionLost(self, reason):
  519. if reason.check(ResponseDone):
  520. self.deferred.callback(self.length)
  521. else:
  522. self.deferred.errback(reason)
  523. def _readBodyToFile(response, stream, max_size):
  524. d = defer.Deferred()
  525. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  526. return d
  527. def _flatten_response_never_received(e):
  528. if hasattr(e, "reasons"):
  529. reasons = ", ".join(
  530. _flatten_response_never_received(f.value)
  531. for f in e.reasons
  532. )
  533. return "%s:[%s]" % (type(e).__name__, reasons)
  534. else:
  535. return repr(e)
  536. def check_content_type_is_json(headers):
  537. """
  538. Check that a set of HTTP headers have a Content-Type header, and that it
  539. is application/json.
  540. Args:
  541. headers (twisted.web.http_headers.Headers): headers to check
  542. Raises:
  543. RuntimeError if the
  544. """
  545. c_type = headers.getRawHeaders(b"Content-Type")
  546. if c_type is None:
  547. raise RuntimeError(
  548. "No Content-Type header"
  549. )
  550. c_type = c_type[0].decode('ascii') # only the first header
  551. val, options = cgi.parse_header(c_type)
  552. if val != "application/json":
  553. raise RuntimeError(
  554. "Content-Type not application/json: was '%s'" % c_type
  555. )
  556. def encode_query_args(args):
  557. if args is None:
  558. return b""
  559. encoded_args = {}
  560. for k, vs in args.items():
  561. if isinstance(vs, string_types):
  562. vs = [vs]
  563. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  564. query_bytes = urllib.parse.urlencode(encoded_args, True)
  565. return query_bytes.encode('utf8')