matrixfederationagent.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2019 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from __future__ import absolute_import
  16. import logging
  17. import random
  18. import time
  19. import attr
  20. from netaddr import IPAddress
  21. from zope.interface import implementer
  22. from twisted.internet import defer
  23. from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
  24. from twisted.internet.interfaces import IStreamClientEndpoint
  25. from twisted.web.client import URI, Agent, HTTPConnectionPool, RedirectAgent
  26. from twisted.web.http import stringToDatetime
  27. from twisted.web.http_headers import Headers
  28. from twisted.web.iweb import IAgent
  29. from sydent.http.httpcommon import read_body_with_max_size
  30. from sydent.http.srvresolver import SrvResolver, pick_server_from_list
  31. from sydent.util import json_decoder
  32. from sydent.util.ttlcache import TTLCache
  33. # period to cache .well-known results for by default
  34. WELL_KNOWN_DEFAULT_CACHE_PERIOD = 24 * 3600
  35. # jitter to add to the .well-known default cache ttl
  36. WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER = 10 * 60
  37. # period to cache failure to fetch .well-known for
  38. WELL_KNOWN_INVALID_CACHE_PERIOD = 1 * 3600
  39. # cap for .well-known cache period
  40. WELL_KNOWN_MAX_CACHE_PERIOD = 48 * 3600
  41. # The maximum size (in bytes) to allow a well-known file to be.
  42. WELL_KNOWN_MAX_SIZE = 50 * 1024 # 50 KiB
  43. logger = logging.getLogger(__name__)
  44. well_known_cache = TTLCache("well-known")
  45. @implementer(IAgent)
  46. class MatrixFederationAgent(object):
  47. """An Agent-like thing which provides a `request` method which will look up a matrix
  48. server and send an HTTP request to it.
  49. Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
  50. :param reactor: twisted reactor to use for underlying requests
  51. :type reactor: IReactor
  52. :param tls_client_options_factory: Factory to use for fetching client tls
  53. options, or none to disable TLS.
  54. :type tls_client_options_factory: ClientTLSOptionsFactory, None
  55. :param _well_known_tls_policy: TLS policy to use for fetching .well-known
  56. files. None to use a default (browser-like) implementation.
  57. :type _well_known_tls_policy: IPolicyForHTTPS, None
  58. :param _srv_resolver: SRVResolver impl to use for looking up SRV records.
  59. None to use a default implementation.
  60. :type _srv_resolver: SrvResolver, None
  61. :param _well_known_cache: TTLCache impl for storing cached well-known
  62. lookups. None to use a default implementation.
  63. :type _well_known_cache: TTLCache, None
  64. """
  65. def __init__(
  66. self,
  67. reactor,
  68. tls_client_options_factory,
  69. _well_known_tls_policy=None,
  70. _srv_resolver=None,
  71. _well_known_cache=well_known_cache,
  72. ):
  73. self._reactor = reactor
  74. self._tls_client_options_factory = tls_client_options_factory
  75. if _srv_resolver is None:
  76. _srv_resolver = SrvResolver()
  77. self._srv_resolver = _srv_resolver
  78. self._pool = HTTPConnectionPool(reactor)
  79. self._pool.retryAutomatically = False
  80. self._pool.maxPersistentPerHost = 5
  81. self._pool.cachedConnectionTimeout = 2 * 60
  82. agent_args = {}
  83. if _well_known_tls_policy is not None:
  84. # the param is called 'contextFactory', but actually passing a
  85. # contextfactory is deprecated, and it expects an IPolicyForHTTPS.
  86. agent_args["contextFactory"] = _well_known_tls_policy
  87. _well_known_agent = RedirectAgent(
  88. Agent(self._reactor, pool=self._pool, **agent_args),
  89. )
  90. self._well_known_agent = _well_known_agent
  91. # our cache of .well-known lookup results, mapping from server name
  92. # to delegated name. The values can be:
  93. # `bytes`: a valid server-name
  94. # `None`: there is no (valid) .well-known here
  95. self._well_known_cache = _well_known_cache
  96. @defer.inlineCallbacks
  97. def request(self, method, uri, headers=None, bodyProducer=None):
  98. """
  99. :param method: HTTP method (GET/POST/etc).
  100. :type method: bytes
  101. :param uri: Absolute URI to be retrieved.
  102. :type uri: bytes
  103. :param headers: HTTP headers to send with the request, or None to
  104. send no extra headers.
  105. :type headers: twisted.web.http_headers.Headers, None
  106. :param bodyProducer: An object which can generate bytes to make up the
  107. body of this request (for example, the properly encoded contents of
  108. a file for a file upload). Or None if the request is to have
  109. no body.
  110. :type bodyProducer: twisted.web.iweb.IBodyProducer, None
  111. :returns a deferred that fires when the header of the response has
  112. been received (regardless of the response status code). Fails if
  113. there is any problem which prevents that response from being received
  114. (including problems that prevent the request from being sent).
  115. :rtype: Deferred[twisted.web.iweb.IResponse]
  116. """
  117. parsed_uri = URI.fromBytes(uri, defaultPort=-1)
  118. res = yield self._route_matrix_uri(parsed_uri)
  119. # set up the TLS connection params
  120. #
  121. # XXX disabling TLS is really only supported here for the benefit of the
  122. # unit tests. We should make the UTs cope with TLS rather than having to make
  123. # the code support the unit tests.
  124. if self._tls_client_options_factory is None:
  125. tls_options = None
  126. else:
  127. tls_options = self._tls_client_options_factory.get_options(
  128. res.tls_server_name.decode("ascii")
  129. )
  130. # make sure that the Host header is set correctly
  131. if headers is None:
  132. headers = Headers()
  133. else:
  134. headers = headers.copy()
  135. if not headers.hasHeader(b"host"):
  136. headers.addRawHeader(b"host", res.host_header)
  137. class EndpointFactory(object):
  138. @staticmethod
  139. def endpointForURI(_uri):
  140. ep = LoggingHostnameEndpoint(
  141. self._reactor,
  142. res.target_host,
  143. res.target_port,
  144. )
  145. if tls_options is not None:
  146. ep = wrapClientTLS(tls_options, ep)
  147. return ep
  148. agent = Agent.usingEndpointFactory(self._reactor, EndpointFactory(), self._pool)
  149. res = yield agent.request(method, uri, headers, bodyProducer)
  150. defer.returnValue(res)
  151. @defer.inlineCallbacks
  152. def _route_matrix_uri(self, parsed_uri, lookup_well_known=True):
  153. """Helper for `request`: determine the routing for a Matrix URI
  154. :param parsed_uri: uri to route. Note that it should be parsed with
  155. URI.fromBytes(uri, defaultPort=-1) to set the `port` to -1 if there
  156. is no explicit port given.
  157. :type parsed_uri: twisted.web.client.URI
  158. :param lookup_well_known: True if we should look up the .well-known
  159. file if there is no SRV record.
  160. :type lookup_well_known: bool
  161. :returns a routing result.
  162. :rtype: Deferred[_RoutingResult]
  163. """
  164. # check for an IP literal
  165. try:
  166. ip_address = IPAddress(parsed_uri.host.decode("ascii"))
  167. except Exception:
  168. # not an IP address
  169. ip_address = None
  170. if ip_address:
  171. port = parsed_uri.port
  172. if port == -1:
  173. port = 8448
  174. defer.returnValue(
  175. _RoutingResult(
  176. host_header=parsed_uri.netloc,
  177. tls_server_name=parsed_uri.host,
  178. target_host=parsed_uri.host,
  179. target_port=port,
  180. )
  181. )
  182. if parsed_uri.port != -1:
  183. # there is an explicit port
  184. defer.returnValue(
  185. _RoutingResult(
  186. host_header=parsed_uri.netloc,
  187. tls_server_name=parsed_uri.host,
  188. target_host=parsed_uri.host,
  189. target_port=parsed_uri.port,
  190. )
  191. )
  192. if lookup_well_known:
  193. # try a .well-known lookup
  194. well_known_server = yield self._get_well_known(parsed_uri.host)
  195. if well_known_server:
  196. # if we found a .well-known, start again, but don't do another
  197. # .well-known lookup.
  198. # parse the server name in the .well-known response into host/port.
  199. # (This code is lifted from twisted.web.client.URI.fromBytes).
  200. if b":" in well_known_server:
  201. well_known_host, well_known_port = well_known_server.rsplit(b":", 1)
  202. try:
  203. well_known_port = int(well_known_port)
  204. except ValueError:
  205. # the part after the colon could not be parsed as an int
  206. # - we assume it is an IPv6 literal with no port (the closing
  207. # ']' stops it being parsed as an int)
  208. well_known_host, well_known_port = well_known_server, -1
  209. else:
  210. well_known_host, well_known_port = well_known_server, -1
  211. new_uri = URI(
  212. scheme=parsed_uri.scheme,
  213. netloc=well_known_server,
  214. host=well_known_host,
  215. port=well_known_port,
  216. path=parsed_uri.path,
  217. params=parsed_uri.params,
  218. query=parsed_uri.query,
  219. fragment=parsed_uri.fragment,
  220. )
  221. res = yield self._route_matrix_uri(new_uri, lookup_well_known=False)
  222. defer.returnValue(res)
  223. # try a SRV lookup
  224. service_name = b"_matrix._tcp.%s" % (parsed_uri.host,)
  225. server_list = yield self._srv_resolver.resolve_service(service_name)
  226. if not server_list:
  227. target_host = parsed_uri.host
  228. port = 8448
  229. logger.debug(
  230. "No SRV record for %s, using %s:%i",
  231. parsed_uri.host.decode("ascii"),
  232. target_host.decode("ascii"),
  233. port,
  234. )
  235. else:
  236. target_host, port = pick_server_from_list(server_list)
  237. logger.debug(
  238. "Picked %s:%i from SRV records for %s",
  239. target_host.decode("ascii"),
  240. port,
  241. parsed_uri.host.decode("ascii"),
  242. )
  243. defer.returnValue(
  244. _RoutingResult(
  245. host_header=parsed_uri.netloc,
  246. tls_server_name=parsed_uri.host,
  247. target_host=target_host,
  248. target_port=port,
  249. )
  250. )
  251. @defer.inlineCallbacks
  252. def _get_well_known(self, server_name):
  253. """Attempt to fetch and parse a .well-known file for the given server
  254. :param server_name: Name of the server, from the requested url.
  255. :type server_name: bytes
  256. :returns either the new server name, from the .well-known, or None if
  257. there was no .well-known file.
  258. :rtype: Deferred[bytes|None]
  259. """
  260. try:
  261. result = self._well_known_cache[server_name]
  262. except KeyError:
  263. # TODO: should we linearise so that we don't end up doing two .well-known
  264. # requests for the same server in parallel?
  265. result, cache_period = yield self._do_get_well_known(server_name)
  266. if cache_period > 0:
  267. self._well_known_cache.set(server_name, result, cache_period)
  268. defer.returnValue(result)
  269. @defer.inlineCallbacks
  270. def _do_get_well_known(self, server_name):
  271. """Actually fetch and parse a .well-known, without checking the cache
  272. :param server_name: Name of the server, from the requested url
  273. :type server_name: bytes
  274. :returns a tuple of (result, cache period), where result is one of:
  275. - the new server name from the .well-known (as a `bytes`)
  276. - None if there was no .well-known file.
  277. - INVALID_WELL_KNOWN if the .well-known was invalid
  278. :rtype: Deferred[Tuple[bytes|None|object],int]
  279. """
  280. uri = b"https://%s/.well-known/matrix/server" % (server_name,)
  281. uri_str = uri.decode("ascii")
  282. logger.info("Fetching %s", uri_str)
  283. try:
  284. response = yield self._well_known_agent.request(b"GET", uri)
  285. body = yield read_body_with_max_size(response, WELL_KNOWN_MAX_SIZE)
  286. if response.code != 200:
  287. raise Exception("Non-200 response %s" % (response.code,))
  288. parsed_body = json_decoder.decode(body.decode("utf-8"))
  289. logger.info("Response from .well-known: %s", parsed_body)
  290. if not isinstance(parsed_body, dict):
  291. raise Exception("not a dict")
  292. if "m.server" not in parsed_body:
  293. raise Exception("Missing key 'm.server'")
  294. except Exception as e:
  295. logger.info("Error fetching %s: %s", uri_str, e)
  296. # add some randomness to the TTL to avoid a stampeding herd every hour
  297. # after startup
  298. cache_period = WELL_KNOWN_INVALID_CACHE_PERIOD
  299. cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
  300. defer.returnValue((None, cache_period))
  301. return
  302. result = parsed_body["m.server"].encode("ascii")
  303. cache_period = _cache_period_from_headers(
  304. response.headers,
  305. time_now=self._reactor.seconds,
  306. )
  307. if cache_period is None:
  308. cache_period = WELL_KNOWN_DEFAULT_CACHE_PERIOD
  309. # add some randomness to the TTL to avoid a stampeding herd every 24 hours
  310. # after startup
  311. cache_period += random.uniform(0, WELL_KNOWN_DEFAULT_CACHE_PERIOD_JITTER)
  312. else:
  313. cache_period = min(cache_period, WELL_KNOWN_MAX_CACHE_PERIOD)
  314. defer.returnValue((result, cache_period))
  315. @implementer(IStreamClientEndpoint)
  316. class LoggingHostnameEndpoint(object):
  317. """A wrapper for HostnameEndpint which logs when it connects"""
  318. def __init__(self, reactor, host, port, *args, **kwargs):
  319. self.host = host
  320. self.port = port
  321. self.ep = HostnameEndpoint(reactor, host, port, *args, **kwargs)
  322. logger.info("Endpoint created with %s:%d", host, port)
  323. def connect(self, protocol_factory):
  324. logger.info("Connecting to %s:%i", self.host.decode("ascii"), self.port)
  325. return self.ep.connect(protocol_factory)
  326. def _cache_period_from_headers(headers, time_now=time.time):
  327. cache_controls = _parse_cache_control(headers)
  328. if b"no-store" in cache_controls:
  329. return 0
  330. if b"max-age" in cache_controls:
  331. try:
  332. max_age = int(cache_controls[b"max-age"])
  333. return max_age
  334. except ValueError:
  335. pass
  336. expires = headers.getRawHeaders(b"expires")
  337. if expires is not None:
  338. try:
  339. expires_date = stringToDatetime(expires[-1])
  340. return expires_date - time_now()
  341. except ValueError:
  342. # RFC7234 says 'A cache recipient MUST interpret invalid date formats,
  343. # especially the value "0", as representing a time in the past (i.e.,
  344. # "already expired").
  345. return 0
  346. return None
  347. def _parse_cache_control(headers):
  348. cache_controls = {}
  349. for hdr in headers.getRawHeaders(b"cache-control", []):
  350. for directive in hdr.split(b","):
  351. splits = [x.strip() for x in directive.split(b"=", 1)]
  352. k = splits[0].lower()
  353. v = splits[1] if len(splits) > 1 else None
  354. cache_controls[k] = v
  355. return cache_controls
  356. @attr.s
  357. class _RoutingResult(object):
  358. """The result returned by `_route_matrix_uri`.
  359. Contains the parameters needed to direct a federation connection to a particular
  360. server.
  361. Where a SRV record points to several servers, this object contains a single server
  362. chosen from the list.
  363. """
  364. host_header = attr.ib()
  365. """
  366. The value we should assign to the Host header (host:port from the matrix
  367. URI, or .well-known).
  368. :type: bytes
  369. """
  370. tls_server_name = attr.ib()
  371. """
  372. The server name we should set in the SNI (typically host, without port, from the
  373. matrix URI or .well-known)
  374. :type: bytes
  375. """
  376. target_host = attr.ib()
  377. """
  378. The hostname (or IP literal) we should route the TCP connection to (the target of the
  379. SRV record, or the hostname from the URL/.well-known)
  380. :type: bytes
  381. """
  382. target_port = attr.ib()
  383. """
  384. The port we should route the TCP connection to (the target of the SRV record, or
  385. the port from the URL/.well-known, or 8448)
  386. :type: int
  387. """