matrix_federation_agent.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2019 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import json
  16. import logging
  17. import random
  18. import time
  19. import attr
  20. from netaddr import IPAddress
  21. from zope.interface import implementer
  22. from twisted.internet import defer
  23. from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
  24. from twisted.internet.interfaces import IStreamClientEndpoint
  25. from twisted.web.client import URI, Agent, HTTPConnectionPool, RedirectAgent, readBody
  26. from twisted.web.http import stringToDatetime
  27. from twisted.web.http_headers import Headers
  28. from twisted.web.iweb import IAgent
  29. from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
  30. from synapse.logging.context import make_deferred_yieldable
  31. from synapse.util import Clock
  32. from synapse.util.caches.ttlcache import TTLCache
  33. from synapse.util.metrics import Measure
  34. # period to cache .well-known results for by default
  35. WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
  36. # jitter to add to the .well-known default cache ttl
  37. WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
  38. # period to cache failure to fetch .well-known for
  39. WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
  40. # cap for .well-known cache period
  41. WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
  42. logger = logging.getLogger(__name__)
  43. well_known_cache = TTLCache("well-known")
  44. @implementer(IAgent)
  45. class MatrixFederationAgent(object):
  46. """An Agent-like thing which provides a `request` method which will look up a matrix
  47. server and send an HTTP request to it.
  48. Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
  49. Args:
  50. reactor (IReactor): twisted reactor to use for underlying requests
  51. tls_client_options_factory (ClientTLSOptionsFactory|None):
  52. factory to use for fetching client tls options, or none to disable TLS.
  53. _well_known_tls_policy (IPolicyForHTTPS|None):
  54. TLS policy to use for fetching .well-known files. None to use a default
  55. (browser-like) implementation.
  56. _srv_resolver (SrvResolver|None):
  57. SRVResolver impl to use for looking up SRV records. None to use a default
  58. implementation.
  59. _well_known_cache (TTLCache|None):
  60. TTLCache impl for storing cached well-known lookups. None to use a default
  61. implementation.
  62. """
  63. def __init__(
  64. self,
  65. reactor,
  66. tls_client_options_factory,
  67. _well_known_tls_policy=None,
  68. _srv_resolver=None,
  69. _well_known_cache=well_known_cache,
  70. ):
  71. self._reactor = reactor
  72. self._clock = Clock(reactor)
  73. self._tls_client_options_factory = tls_client_options_factory
  74. if _srv_resolver is None:
  75. _srv_resolver = SrvResolver()
  76. self._srv_resolver = _srv_resolver
  77. self._pool = HTTPConnectionPool(reactor)
  78. self._pool.retryAutomatically = False
  79. self._pool.maxPersistentPerHost = 5
  80. self._pool.cachedConnectionTimeout = 2 * 60
  81. agent_args = {}
  82. if _well_known_tls_policy is not None:
  83. # the param is called 'contextFactory', but actually passing a
  84. # contextfactory is deprecated, and it expects an IPolicyForHTTPS.
  85. agent_args["contextFactory"] = _well_known_tls_policy
  86. _well_known_agent = RedirectAgent(
  87. Agent(self._reactor, pool=self._pool, **agent_args)
  88. )
  89. self._well_known_agent = _well_known_agent
  90. # our cache of .well-known lookup results, mapping from server name
  91. # to delegated name. The values can be:
  92. # `bytes`: a valid server-name
  93. # `None`: there is no (valid) .well-known here
  94. self._well_known_cache = _well_known_cache
  95. @defer.inlineCallbacks
  96. def request(self, method, uri, headers=None, bodyProducer=None):
  97. """
  98. Args:
  99. method (bytes): HTTP method: GET/POST/etc
  100. uri (bytes): Absolute URI to be retrieved
  101. headers (twisted.web.http_headers.Headers|None):
  102. HTTP headers to send with the request, or None to
  103. send no extra headers.
  104. bodyProducer (twisted.web.iweb.IBodyProducer|None):
  105. An object which can generate bytes to make up the
  106. body of this request (for example, the properly encoded contents of
  107. a file for a file upload). Or None if the request is to have
  108. no body.
  109. Returns:
  110. Deferred[twisted.web.iweb.IResponse]:
  111. fires when the header of the response has been received (regardless of the
  112. response status code). Fails if there is any problem which prevents that
  113. response from being received (including problems that prevent the request
  114. from being sent).
  115. """
  116. parsed_uri = URI.fromBytes(uri, defaultPort=-1)
  117. res = yield self._route_matrix_uri(parsed_uri)
  118. # set up the TLS connection params
  119. #
  120. # XXX disabling TLS is really only supported here for the benefit of the
  121. # unit tests. We should make the UTs cope with TLS rather than having to make
  122. # the code support the unit tests.
  123. if self._tls_client_options_factory is None:
  124. tls_options = None
  125. else:
  126. tls_options = self._tls_client_options_factory.get_options(
  127. res.tls_server_name.decode("ascii")
  128. )
  129. # make sure that the Host header is set correctly
  130. if headers is None:
  131. headers = Headers()
  132. else:
  133. headers = headers.copy()
  134. if not headers.hasHeader(b"host"):
  135. headers.addRawHeader(b"host", res.host_header)
  136. class EndpointFactory(object):
  137. @staticmethod
  138. def endpointForURI(_uri):
  139. ep = LoggingHostnameEndpoint(
  140. self._reactor, res.target_host, res.target_port
  141. )
  142. if tls_options is not None:
  143. ep = wrapClientTLS(tls_options, ep)
  144. return ep
  145. agent = Agent.usingEndpointFactory(self._reactor, EndpointFactory(), self._pool)
  146. res = yield make_deferred_yieldable(
  147. agent.request(method, uri, headers, bodyProducer)
  148. )
  149. return res
  150. @defer.inlineCallbacks
  151. def _route_matrix_uri(self, parsed_uri, lookup_well_known=True):
  152. """Helper for `request`: determine the routing for a Matrix URI
  153. Args:
  154. parsed_uri (twisted.web.client.URI): uri to route. Note that it should be
  155. parsed with URI.fromBytes(uri, defaultPort=-1) to set the `port` to -1
  156. if there is no explicit port given.
  157. lookup_well_known (bool): True if we should look up the .well-known file if
  158. there is no SRV record.
  159. Returns:
  160. Deferred[_RoutingResult]
  161. """
  162. # check for an IP literal
  163. try:
  164. ip_address = IPAddress(parsed_uri.host.decode("ascii"))
  165. except Exception:
  166. # not an IP address
  167. ip_address = None
  168. if ip_address:
  169. port = parsed_uri.port
  170. if port == -1:
  171. port = 8448
  172. return _RoutingResult(
  173. host_header=parsed_uri.netloc,
  174. tls_server_name=parsed_uri.host,
  175. target_host=parsed_uri.host,
  176. target_port=port,
  177. )
  178. if parsed_uri.port != -1:
  179. # there is an explicit port
  180. return _RoutingResult(
  181. host_header=parsed_uri.netloc,
  182. tls_server_name=parsed_uri.host,
  183. target_host=parsed_uri.host,
  184. target_port=parsed_uri.port,
  185. )
  186. if lookup_well_known:
  187. # try a .well-known lookup
  188. well_known_server = yield self._get_well_known(parsed_uri.host)
  189. if well_known_server:
  190. # if we found a .well-known, start again, but don't do another
  191. # .well-known lookup.
  192. # parse the server name in the .well-known response into host/port.
  193. # (This code is lifted from twisted.web.client.URI.fromBytes).
  194. if b":" in well_known_server:
  195. well_known_host, well_known_port = well_known_server.rsplit(b":", 1)
  196. try:
  197. well_known_port = int(well_known_port)
  198. except ValueError:
  199. # the part after the colon could not be parsed as an int
  200. # - we assume it is an IPv6 literal with no port (the closing
  201. # ']' stops it being parsed as an int)
  202. well_known_host, well_known_port = well_known_server, -1
  203. else:
  204. well_known_host, well_known_port = well_known_server, -1
  205. new_uri = URI(
  206. scheme=parsed_uri.scheme,
  207. netloc=well_known_server,
  208. host=well_known_host,
  209. port=well_known_port,
  210. path=parsed_uri.path,
  211. params=parsed_uri.params,
  212. query=parsed_uri.query,
  213. fragment=parsed_uri.fragment,
  214. )
  215. res = yield self._route_matrix_uri(new_uri, lookup_well_known=False)
  216. return res
  217. # try a SRV lookup
  218. service_name = b"_matrix._tcp.%s" % (parsed_uri.host,)
  219. server_list = yield self._srv_resolver.resolve_service(service_name)
  220. if not server_list:
  221. target_host = parsed_uri.host
  222. port = 8448
  223. logger.debug(
  224. "No SRV record for %s, using %s:%i",
  225. parsed_uri.host.decode("ascii"),
  226. target_host.decode("ascii"),
  227. port,
  228. )
  229. else:
  230. target_host, port = pick_server_from_list(server_list)
  231. logger.debug(
  232. "Picked %s:%i from SRV records for %s",
  233. target_host.decode("ascii"),
  234. port,
  235. parsed_uri.host.decode("ascii"),
  236. )
  237. return _RoutingResult(
  238. host_header=parsed_uri.netloc,
  239. tls_server_name=parsed_uri.host,
  240. target_host=target_host,
  241. target_port=port,
  242. )
  243. @defer.inlineCallbacks
  244. def _get_well_known(self, server_name):
  245. """Attempt to fetch and parse a .well-known file for the given server
  246. Args:
  247. server_name (bytes): name of the server, from the requested url
  248. Returns:
  249. Deferred[bytes|None]: either the new server name, from the .well-known, or
  250. None if there was no .well-known file.
  251. """
  252. try:
  253. result = self._well_known_cache[server_name]
  254. except KeyError:
  255. # TODO: should we linearise so that we don't end up doing two .well-known
  256. # requests for the same server in parallel?
  257. with Measure(self._clock, "get_well_known"):
  258. result, cache_period = yield self._do_get_well_known(server_name)
  259. if cache_period > 0:
  260. self._well_known_cache.set(server_name, result, cache_period)
  261. return result
  262. @defer.inlineCallbacks
  263. def _do_get_well_known(self, server_name):
  264. """Actually fetch and parse a .well-known, without checking the cache
  265. Args:
  266. server_name (bytes): name of the server, from the requested url
  267. Returns:
  268. Deferred[Tuple[bytes|None|object],int]:
  269. result, cache period, where result is one of:
  270. - the new server name from the .well-known (as a `bytes`)
  271. - None if there was no .well-known file.
  272. - INVALID_WELL_KNOWN if the .well-known was invalid
  273. """
  274. uri = b"https://%s/.well-known/matrix/server" % (server_name,)
  275. uri_str = uri.decode("ascii")
  276. logger.info("Fetching %s", uri_str)
  277. try:
  278. response = yield make_deferred_yieldable(
  279. self._well_known_agent.request(b"GET", uri)
  280. )
  281. body = yield make_deferred_yieldable(readBody(response))
  282. if response.code != 200:
  283. raise Exception("Non-200 response %s" % (response.code,))
  284. parsed_body = json.loads(body.decode("utf-8"))
  285. logger.info("Response from .well-known: %s", parsed_body)
  286. if not isinstance(parsed_body, dict):
  287. raise Exception("not a dict")
  288. if "m.server" not in parsed_body:
  289. raise Exception("Missing key 'm.server'")
  290. except Exception as e:
  291. logger.info("Error fetching %s: %s", uri_str, e)
  292. # add some randomness to the TTL to avoid a stampeding herd every hour
  293. # after startup
  294. cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
  295. cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
  296. return (None, cache_period)
  297. result = parsed_body["m.server"].encode("ascii")
  298. cache_period = _cache_period_from_headers(
  299. response.headers, time_now=self._reactor.seconds
  300. )
  301. if cache_period is None:
  302. cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
  303. # add some randomness to the TTL to avoid a stampeding herd every 24 hours
  304. # after startup
  305. cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
  306. else:
  307. cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
  308. return (result, cache_period)
  309. @implementer(IStreamClientEndpoint)
  310. class LoggingHostnameEndpoint(object):
  311. """A wrapper for HostnameEndpint which logs when it connects"""
  312. def __init__(self, reactor, host, port, *args, **kwargs):
  313. self.host = host
  314. self.port = port
  315. self.ep = HostnameEndpoint(reactor, host, port, *args, **kwargs)
  316. def connect(self, protocol_factory):
  317. logger.info("Connecting to %s:%i", self.host.decode("ascii"), self.port)
  318. return self.ep.connect(protocol_factory)
  319. def _cache_period_from_headers(headers, time_now=time.time):
  320. cache_controls = _parse_cache_control(headers)
  321. if b"no-store" in cache_controls:
  322. return 0
  323. if b"max-age" in cache_controls:
  324. try:
  325. max_age = int(cache_controls[b"max-age"])
  326. return max_age
  327. except ValueError:
  328. pass
  329. expires = headers.getRawHeaders(b"expires")
  330. if expires is not None:
  331. try:
  332. expires_date = stringToDatetime(expires[-1])
  333. return expires_date - time_now()
  334. except ValueError:
  335. # RFC7234 says 'A cache recipient MUST interpret invalid date formats,
  336. # especially the value "0", as representing a time in the past (i.e.,
  337. # "already expired").
  338. return 0
  339. return None
  340. def _parse_cache_control(headers):
  341. cache_controls = {}
  342. for hdr in headers.getRawHeaders(b"cache-control", []):
  343. for directive in hdr.split(b","):
  344. splits = [x.strip() for x in directive.split(b"=", 1)]
  345. k = splits[0].lower()
  346. v = splits[1] if len(splits) > 1 else None
  347. cache_controls[k] = v
  348. return cache_controls
  349. @attr.s
  350. class _RoutingResult(object):
  351. """The result returned by `_route_matrix_uri`.
  352. Contains the parameters needed to direct a federation connection to a particular
  353. server.
  354. Where a SRV record points to several servers, this object contains a single server
  355. chosen from the list.
  356. """
  357. host_header = attr.ib()
  358. """
  359. The value we should assign to the Host header (host:port from the matrix
  360. URI, or .well-known).
  361. :type: bytes
  362. """
  363. tls_server_name = attr.ib()
  364. """
  365. The server name we should set in the SNI (typically host, without port, from the
  366. matrix URI or .well-known)
  367. :type: bytes
  368. """
  369. target_host = attr.ib()
  370. """
  371. The hostname (or IP literal) we should route the TCP connection to (the target of the
  372. SRV record, or the hostname from the URL/.well-known)
  373. :type: bytes
  374. """
  375. target_port = attr.ib()
  376. """
  377. The port we should route the TCP connection to (the target of the SRV record, or
  378. the port from the URL/.well-known, or 8448)
  379. :type: int
  380. """