matrix_federation_agent.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2019 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import attr
  17. from netaddr import IPAddress
  18. from zope.interface import implementer
  19. from twisted.internet import defer
  20. from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
  21. from twisted.internet.interfaces import IStreamClientEndpoint
  22. from twisted.web.client import URI, Agent, HTTPConnectionPool
  23. from twisted.web.http_headers import Headers
  24. from twisted.web.iweb import IAgent
  25. from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list
  26. from synapse.http.federation.well_known_resolver import WellKnownResolver
  27. from synapse.logging.context import make_deferred_yieldable
  28. from synapse.util import Clock
  29. logger = logging.getLogger(__name__)
  30. @implementer(IAgent)
  31. class MatrixFederationAgent(object):
  32. """An Agent-like thing which provides a `request` method which will look up a matrix
  33. server and send an HTTP request to it.
  34. Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
  35. Args:
  36. reactor (IReactor): twisted reactor to use for underlying requests
  37. tls_client_options_factory (ClientTLSOptionsFactory|None):
  38. factory to use for fetching client tls options, or none to disable TLS.
  39. _srv_resolver (SrvResolver|None):
  40. SRVResolver impl to use for looking up SRV records. None to use a default
  41. implementation.
  42. _well_known_cache (TTLCache|None):
  43. TTLCache impl for storing cached well-known lookups. None to use a default
  44. implementation.
  45. """
  46. def __init__(
  47. self,
  48. reactor,
  49. tls_client_options_factory,
  50. _srv_resolver=None,
  51. _well_known_cache=None,
  52. ):
  53. self._reactor = reactor
  54. self._clock = Clock(reactor)
  55. self._tls_client_options_factory = tls_client_options_factory
  56. if _srv_resolver is None:
  57. _srv_resolver = SrvResolver()
  58. self._srv_resolver = _srv_resolver
  59. self._pool = HTTPConnectionPool(reactor)
  60. self._pool.retryAutomatically = False
  61. self._pool.maxPersistentPerHost = 5
  62. self._pool.cachedConnectionTimeout = 2 * 60
  63. self._well_known_resolver = WellKnownResolver(
  64. self._reactor,
  65. agent=Agent(
  66. self._reactor,
  67. pool=self._pool,
  68. contextFactory=tls_client_options_factory,
  69. ),
  70. well_known_cache=_well_known_cache,
  71. )
  72. @defer.inlineCallbacks
  73. def request(self, method, uri, headers=None, bodyProducer=None):
  74. """
  75. Args:
  76. method (bytes): HTTP method: GET/POST/etc
  77. uri (bytes): Absolute URI to be retrieved
  78. headers (twisted.web.http_headers.Headers|None):
  79. HTTP headers to send with the request, or None to
  80. send no extra headers.
  81. bodyProducer (twisted.web.iweb.IBodyProducer|None):
  82. An object which can generate bytes to make up the
  83. body of this request (for example, the properly encoded contents of
  84. a file for a file upload). Or None if the request is to have
  85. no body.
  86. Returns:
  87. Deferred[twisted.web.iweb.IResponse]:
  88. fires when the header of the response has been received (regardless of the
  89. response status code). Fails if there is any problem which prevents that
  90. response from being received (including problems that prevent the request
  91. from being sent).
  92. """
  93. parsed_uri = URI.fromBytes(uri, defaultPort=-1)
  94. res = yield self._route_matrix_uri(parsed_uri)
  95. # set up the TLS connection params
  96. #
  97. # XXX disabling TLS is really only supported here for the benefit of the
  98. # unit tests. We should make the UTs cope with TLS rather than having to make
  99. # the code support the unit tests.
  100. if self._tls_client_options_factory is None:
  101. tls_options = None
  102. else:
  103. tls_options = self._tls_client_options_factory.get_options(
  104. res.tls_server_name.decode("ascii")
  105. )
  106. # make sure that the Host header is set correctly
  107. if headers is None:
  108. headers = Headers()
  109. else:
  110. headers = headers.copy()
  111. if not headers.hasHeader(b"host"):
  112. headers.addRawHeader(b"host", res.host_header)
  113. class EndpointFactory(object):
  114. @staticmethod
  115. def endpointForURI(_uri):
  116. ep = LoggingHostnameEndpoint(
  117. self._reactor, res.target_host, res.target_port
  118. )
  119. if tls_options is not None:
  120. ep = wrapClientTLS(tls_options, ep)
  121. return ep
  122. agent = Agent.usingEndpointFactory(self._reactor, EndpointFactory(), self._pool)
  123. res = yield make_deferred_yieldable(
  124. agent.request(method, uri, headers, bodyProducer)
  125. )
  126. return res
  127. @defer.inlineCallbacks
  128. def _route_matrix_uri(self, parsed_uri, lookup_well_known=True):
  129. """Helper for `request`: determine the routing for a Matrix URI
  130. Args:
  131. parsed_uri (twisted.web.client.URI): uri to route. Note that it should be
  132. parsed with URI.fromBytes(uri, defaultPort=-1) to set the `port` to -1
  133. if there is no explicit port given.
  134. lookup_well_known (bool): True if we should look up the .well-known file if
  135. there is no SRV record.
  136. Returns:
  137. Deferred[_RoutingResult]
  138. """
  139. # check for an IP literal
  140. try:
  141. ip_address = IPAddress(parsed_uri.host.decode("ascii"))
  142. except Exception:
  143. # not an IP address
  144. ip_address = None
  145. if ip_address:
  146. port = parsed_uri.port
  147. if port == -1:
  148. port = 8448
  149. return _RoutingResult(
  150. host_header=parsed_uri.netloc,
  151. tls_server_name=parsed_uri.host,
  152. target_host=parsed_uri.host,
  153. target_port=port,
  154. )
  155. if parsed_uri.port != -1:
  156. # there is an explicit port
  157. return _RoutingResult(
  158. host_header=parsed_uri.netloc,
  159. tls_server_name=parsed_uri.host,
  160. target_host=parsed_uri.host,
  161. target_port=parsed_uri.port,
  162. )
  163. if lookup_well_known:
  164. # try a .well-known lookup
  165. well_known_result = yield self._well_known_resolver.get_well_known(
  166. parsed_uri.host
  167. )
  168. well_known_server = well_known_result.delegated_server
  169. if well_known_server:
  170. # if we found a .well-known, start again, but don't do another
  171. # .well-known lookup.
  172. # parse the server name in the .well-known response into host/port.
  173. # (This code is lifted from twisted.web.client.URI.fromBytes).
  174. if b":" in well_known_server:
  175. well_known_host, well_known_port = well_known_server.rsplit(b":", 1)
  176. try:
  177. well_known_port = int(well_known_port)
  178. except ValueError:
  179. # the part after the colon could not be parsed as an int
  180. # - we assume it is an IPv6 literal with no port (the closing
  181. # ']' stops it being parsed as an int)
  182. well_known_host, well_known_port = well_known_server, -1
  183. else:
  184. well_known_host, well_known_port = well_known_server, -1
  185. new_uri = URI(
  186. scheme=parsed_uri.scheme,
  187. netloc=well_known_server,
  188. host=well_known_host,
  189. port=well_known_port,
  190. path=parsed_uri.path,
  191. params=parsed_uri.params,
  192. query=parsed_uri.query,
  193. fragment=parsed_uri.fragment,
  194. )
  195. res = yield self._route_matrix_uri(new_uri, lookup_well_known=False)
  196. return res
  197. # try a SRV lookup
  198. service_name = b"_matrix._tcp.%s" % (parsed_uri.host,)
  199. server_list = yield self._srv_resolver.resolve_service(service_name)
  200. if not server_list:
  201. target_host = parsed_uri.host
  202. port = 8448
  203. logger.debug(
  204. "No SRV record for %s, using %s:%i",
  205. parsed_uri.host.decode("ascii"),
  206. target_host.decode("ascii"),
  207. port,
  208. )
  209. else:
  210. target_host, port = pick_server_from_list(server_list)
  211. logger.debug(
  212. "Picked %s:%i from SRV records for %s",
  213. target_host.decode("ascii"),
  214. port,
  215. parsed_uri.host.decode("ascii"),
  216. )
  217. return _RoutingResult(
  218. host_header=parsed_uri.netloc,
  219. tls_server_name=parsed_uri.host,
  220. target_host=target_host,
  221. target_port=port,
  222. )
  223. @implementer(IStreamClientEndpoint)
  224. class LoggingHostnameEndpoint(object):
  225. """A wrapper for HostnameEndpint which logs when it connects"""
  226. def __init__(self, reactor, host, port, *args, **kwargs):
  227. self.host = host
  228. self.port = port
  229. self.ep = HostnameEndpoint(reactor, host, port, *args, **kwargs)
  230. def connect(self, protocol_factory):
  231. logger.info("Connecting to %s:%i", self.host.decode("ascii"), self.port)
  232. return self.ep.connect(protocol_factory)
  233. @attr.s
  234. class _RoutingResult(object):
  235. """The result returned by `_route_matrix_uri`.
  236. Contains the parameters needed to direct a federation connection to a particular
  237. server.
  238. Where a SRV record points to several servers, this object contains a single server
  239. chosen from the list.
  240. """
  241. host_header = attr.ib()
  242. """
  243. The value we should assign to the Host header (host:port from the matrix
  244. URI, or .well-known).
  245. :type: bytes
  246. """
  247. tls_server_name = attr.ib()
  248. """
  249. The server name we should set in the SNI (typically host, without port, from the
  250. matrix URI or .well-known)
  251. :type: bytes
  252. """
  253. target_host = attr.ib()
  254. """
  255. The hostname (or IP literal) we should route the TCP connection to (the target of the
  256. SRV record, or the hostname from the URL/.well-known)
  257. :type: bytes
  258. """
  259. target_port = attr.ib()
  260. """
  261. The port we should route the TCP connection to (the target of the SRV record, or
  262. the port from the URL/.well-known, or 8448)
  263. :type: int
  264. """