matrix_federation_agent.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432
  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import urllib.parse
  16. from typing import Any, Generator, List, Optional
  17. from urllib.request import ( # type: ignore[attr-defined]
  18. getproxies_environment,
  19. proxy_bypass_environment,
  20. )
  21. from netaddr import AddrFormatError, IPAddress, IPSet
  22. from zope.interface import implementer
  23. from twisted.internet import defer
  24. from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
  25. from twisted.internet.interfaces import (
  26. IProtocol,
  27. IProtocolFactory,
  28. IReactorCore,
  29. IStreamClientEndpoint,
  30. )
  31. from twisted.web.client import URI, Agent, HTTPConnectionPool
  32. from twisted.web.http_headers import Headers
  33. from twisted.web.iweb import IAgent, IAgentEndpointFactory, IBodyProducer, IResponse
  34. from synapse.crypto.context_factory import FederationPolicyForHTTPS
  35. from synapse.http import proxyagent
  36. from synapse.http.client import BlacklistingAgentWrapper, BlacklistingReactorWrapper
  37. from synapse.http.connectproxyclient import HTTPConnectProxyEndpoint
  38. from synapse.http.federation.srv_resolver import Server, SrvResolver
  39. from synapse.http.federation.well_known_resolver import WellKnownResolver
  40. from synapse.http.proxyagent import ProxyAgent
  41. from synapse.logging.context import make_deferred_yieldable, run_in_background
  42. from synapse.types import ISynapseReactor
  43. from synapse.util import Clock
  44. logger = logging.getLogger(__name__)
  45. @implementer(IAgent)
  46. class MatrixFederationAgent:
  47. """An Agent-like thing which provides a `request` method which correctly
  48. handles resolving matrix server names when using matrix://. Handles standard
  49. https URIs as normal.
  50. Doesn't implement any retries. (Those are done in MatrixFederationHttpClient.)
  51. Args:
  52. reactor: twisted reactor to use for underlying requests
  53. tls_client_options_factory:
  54. factory to use for fetching client tls options, or none to disable TLS.
  55. user_agent:
  56. The user agent header to use for federation requests.
  57. ip_whitelist: Allowed IP addresses.
  58. ip_blacklist: Disallowed IP addresses.
  59. proxy_reactor: twisted reactor to use for connections to the proxy server
  60. reactor might have some blacklisting applied (i.e. for DNS queries),
  61. but we need unblocked access to the proxy.
  62. _srv_resolver:
  63. SrvResolver implementation to use for looking up SRV records. None
  64. to use a default implementation.
  65. _well_known_resolver:
  66. WellKnownResolver to use to perform well-known lookups. None to use a
  67. default implementation.
  68. """
  69. def __init__(
  70. self,
  71. reactor: ISynapseReactor,
  72. tls_client_options_factory: Optional[FederationPolicyForHTTPS],
  73. user_agent: bytes,
  74. ip_whitelist: IPSet,
  75. ip_blacklist: IPSet,
  76. _srv_resolver: Optional[SrvResolver] = None,
  77. _well_known_resolver: Optional[WellKnownResolver] = None,
  78. ):
  79. # proxy_reactor is not blacklisted
  80. proxy_reactor = reactor
  81. # We need to use a DNS resolver which filters out blacklisted IP
  82. # addresses, to prevent DNS rebinding.
  83. reactor = BlacklistingReactorWrapper(reactor, ip_whitelist, ip_blacklist)
  84. self._clock = Clock(reactor)
  85. self._pool = HTTPConnectionPool(reactor)
  86. self._pool.retryAutomatically = False
  87. self._pool.maxPersistentPerHost = 5
  88. self._pool.cachedConnectionTimeout = 2 * 60
  89. self._agent = Agent.usingEndpointFactory(
  90. reactor,
  91. MatrixHostnameEndpointFactory(
  92. reactor,
  93. proxy_reactor,
  94. tls_client_options_factory,
  95. _srv_resolver,
  96. ),
  97. pool=self._pool,
  98. )
  99. self.user_agent = user_agent
  100. if _well_known_resolver is None:
  101. _well_known_resolver = WellKnownResolver(
  102. reactor,
  103. agent=BlacklistingAgentWrapper(
  104. ProxyAgent(
  105. reactor,
  106. proxy_reactor,
  107. pool=self._pool,
  108. contextFactory=tls_client_options_factory,
  109. use_proxy=True,
  110. ),
  111. ip_blacklist=ip_blacklist,
  112. ),
  113. user_agent=self.user_agent,
  114. )
  115. self._well_known_resolver = _well_known_resolver
  116. @defer.inlineCallbacks
  117. def request(
  118. self,
  119. method: bytes,
  120. uri: bytes,
  121. headers: Optional[Headers] = None,
  122. bodyProducer: Optional[IBodyProducer] = None,
  123. ) -> Generator[defer.Deferred, Any, IResponse]:
  124. """
  125. Args:
  126. method: HTTP method: GET/POST/etc
  127. uri: Absolute URI to be retrieved
  128. headers:
  129. HTTP headers to send with the request, or None to send no extra headers.
  130. bodyProducer:
  131. An object which can generate bytes to make up the
  132. body of this request (for example, the properly encoded contents of
  133. a file for a file upload). Or None if the request is to have
  134. no body.
  135. Returns:
  136. A deferred which fires when the header of the response has been received
  137. (regardless of the response status code). Fails if there is any problem
  138. which prevents that response from being received (including problems that
  139. prevent the request from being sent).
  140. """
  141. # We use urlparse as that will set `port` to None if there is no
  142. # explicit port.
  143. parsed_uri = urllib.parse.urlparse(uri)
  144. # There must be a valid hostname.
  145. assert parsed_uri.hostname
  146. # If this is a matrix:// URI check if the server has delegated matrix
  147. # traffic using well-known delegation.
  148. #
  149. # We have to do this here and not in the endpoint as we need to rewrite
  150. # the host header with the delegated server name.
  151. delegated_server = None
  152. if (
  153. parsed_uri.scheme == b"matrix"
  154. and not _is_ip_literal(parsed_uri.hostname)
  155. and not parsed_uri.port
  156. ):
  157. well_known_result = yield defer.ensureDeferred(
  158. self._well_known_resolver.get_well_known(parsed_uri.hostname)
  159. )
  160. delegated_server = well_known_result.delegated_server
  161. if delegated_server:
  162. # Ok, the server has delegated matrix traffic to somewhere else, so
  163. # lets rewrite the URL to replace the server with the delegated
  164. # server name.
  165. uri = urllib.parse.urlunparse(
  166. (
  167. parsed_uri.scheme,
  168. delegated_server,
  169. parsed_uri.path,
  170. parsed_uri.params,
  171. parsed_uri.query,
  172. parsed_uri.fragment,
  173. )
  174. )
  175. parsed_uri = urllib.parse.urlparse(uri)
  176. # We need to make sure the host header is set to the netloc of the
  177. # server and that a user-agent is provided.
  178. if headers is None:
  179. request_headers = Headers()
  180. else:
  181. request_headers = headers.copy()
  182. if not request_headers.hasHeader(b"host"):
  183. request_headers.addRawHeader(b"host", parsed_uri.netloc)
  184. if not request_headers.hasHeader(b"user-agent"):
  185. request_headers.addRawHeader(b"user-agent", self.user_agent)
  186. res = yield make_deferred_yieldable(
  187. self._agent.request(method, uri, request_headers, bodyProducer)
  188. )
  189. return res
  190. @implementer(IAgentEndpointFactory)
  191. class MatrixHostnameEndpointFactory:
  192. """Factory for MatrixHostnameEndpoint for parsing to an Agent."""
  193. def __init__(
  194. self,
  195. reactor: IReactorCore,
  196. proxy_reactor: IReactorCore,
  197. tls_client_options_factory: Optional[FederationPolicyForHTTPS],
  198. srv_resolver: Optional[SrvResolver],
  199. ):
  200. self._reactor = reactor
  201. self._proxy_reactor = proxy_reactor
  202. self._tls_client_options_factory = tls_client_options_factory
  203. if srv_resolver is None:
  204. srv_resolver = SrvResolver()
  205. self._srv_resolver = srv_resolver
  206. def endpointForURI(self, parsed_uri: URI) -> "MatrixHostnameEndpoint":
  207. return MatrixHostnameEndpoint(
  208. self._reactor,
  209. self._proxy_reactor,
  210. self._tls_client_options_factory,
  211. self._srv_resolver,
  212. parsed_uri,
  213. )
  214. @implementer(IStreamClientEndpoint)
  215. class MatrixHostnameEndpoint:
  216. """An endpoint that resolves matrix:// URLs using Matrix server name
  217. resolution (i.e. via SRV). Does not check for well-known delegation.
  218. Args:
  219. reactor: twisted reactor to use for underlying requests
  220. proxy_reactor: twisted reactor to use for connections to the proxy server.
  221. 'reactor' might have some blacklisting applied (i.e. for DNS queries),
  222. but we need unblocked access to the proxy.
  223. tls_client_options_factory:
  224. factory to use for fetching client tls options, or none to disable TLS.
  225. srv_resolver: The SRV resolver to use
  226. parsed_uri: The parsed URI that we're wanting to connect to.
  227. Raises:
  228. ValueError if the environment variables contain an invalid proxy specification.
  229. RuntimeError if no tls_options_factory is given for a https connection
  230. """
  231. def __init__(
  232. self,
  233. reactor: IReactorCore,
  234. proxy_reactor: IReactorCore,
  235. tls_client_options_factory: Optional[FederationPolicyForHTTPS],
  236. srv_resolver: SrvResolver,
  237. parsed_uri: URI,
  238. ):
  239. self._reactor = reactor
  240. self._parsed_uri = parsed_uri
  241. # http_proxy is not needed because federation is always over TLS
  242. proxies = getproxies_environment()
  243. https_proxy = proxies["https"].encode() if "https" in proxies else None
  244. self.no_proxy = proxies["no"] if "no" in proxies else None
  245. # endpoint and credentials to use to connect to the outbound https proxy, if any.
  246. (
  247. self._https_proxy_endpoint,
  248. self._https_proxy_creds,
  249. ) = proxyagent.http_proxy_endpoint(
  250. https_proxy,
  251. proxy_reactor,
  252. tls_client_options_factory,
  253. )
  254. # set up the TLS connection params
  255. #
  256. # XXX disabling TLS is really only supported here for the benefit of the
  257. # unit tests. We should make the UTs cope with TLS rather than having to make
  258. # the code support the unit tests.
  259. if tls_client_options_factory is None:
  260. self._tls_options = None
  261. else:
  262. self._tls_options = tls_client_options_factory.get_options(
  263. self._parsed_uri.host
  264. )
  265. self._srv_resolver = srv_resolver
  266. def connect(
  267. self, protocol_factory: IProtocolFactory
  268. ) -> "defer.Deferred[IProtocol]":
  269. """Implements IStreamClientEndpoint interface"""
  270. return run_in_background(self._do_connect, protocol_factory)
  271. async def _do_connect(self, protocol_factory: IProtocolFactory) -> IProtocol:
  272. first_exception = None
  273. server_list = await self._resolve_server()
  274. for server in server_list:
  275. host = server.host
  276. port = server.port
  277. should_skip_proxy = False
  278. if self.no_proxy is not None:
  279. should_skip_proxy = proxy_bypass_environment(
  280. host.decode(),
  281. proxies={"no": self.no_proxy},
  282. )
  283. endpoint: IStreamClientEndpoint
  284. try:
  285. if self._https_proxy_endpoint and not should_skip_proxy:
  286. logger.debug(
  287. "Connecting to %s:%i via %s",
  288. host.decode("ascii"),
  289. port,
  290. self._https_proxy_endpoint,
  291. )
  292. endpoint = HTTPConnectProxyEndpoint(
  293. self._reactor,
  294. self._https_proxy_endpoint,
  295. host,
  296. port,
  297. proxy_creds=self._https_proxy_creds,
  298. )
  299. else:
  300. logger.debug("Connecting to %s:%i", host.decode("ascii"), port)
  301. # not using a proxy
  302. endpoint = HostnameEndpoint(self._reactor, host, port)
  303. if self._tls_options:
  304. endpoint = wrapClientTLS(self._tls_options, endpoint)
  305. result = await make_deferred_yieldable(
  306. endpoint.connect(protocol_factory)
  307. )
  308. return result
  309. except Exception as e:
  310. logger.info(
  311. "Failed to connect to %s:%i: %s", host.decode("ascii"), port, e
  312. )
  313. if not first_exception:
  314. first_exception = e
  315. # We return the first failure because that's probably the most interesting.
  316. if first_exception:
  317. raise first_exception
  318. # This shouldn't happen as we should always have at least one host/port
  319. # to try and if that doesn't work then we'll have an exception.
  320. raise Exception("Failed to resolve server %r" % (self._parsed_uri.netloc,))
  321. async def _resolve_server(self) -> List[Server]:
  322. """Resolves the server name to a list of hosts and ports to attempt to
  323. connect to.
  324. """
  325. if self._parsed_uri.scheme != b"matrix":
  326. return [Server(host=self._parsed_uri.host, port=self._parsed_uri.port)]
  327. # Note: We don't do well-known lookup as that needs to have happened
  328. # before now, due to needing to rewrite the Host header of the HTTP
  329. # request.
  330. # We reparse the URI so that defaultPort is -1 rather than 80
  331. parsed_uri = urllib.parse.urlparse(self._parsed_uri.toBytes())
  332. host = parsed_uri.hostname
  333. port = parsed_uri.port
  334. # If there is an explicit port or the host is an IP address we bypass
  335. # SRV lookups and just use the given host/port.
  336. if port or _is_ip_literal(host):
  337. return [Server(host, port or 8448)]
  338. logger.debug("Looking up SRV record for %s", host.decode(errors="replace"))
  339. server_list = await self._srv_resolver.resolve_service(b"_matrix._tcp." + host)
  340. if server_list:
  341. logger.debug(
  342. "Got %s from SRV lookup for %s",
  343. ", ".join(map(str, server_list)),
  344. host.decode(errors="replace"),
  345. )
  346. return server_list
  347. # No SRV records, so we fallback to host and 8448
  348. logger.debug("No SRV records for %s", host.decode(errors="replace"))
  349. return [Server(host, 8448)]
  350. def _is_ip_literal(host: bytes) -> bool:
  351. """Test if the given host name is either an IPv4 or IPv6 literal.
  352. Args:
  353. host: The host name to check
  354. Returns:
  355. True if the hostname is an IP address literal.
  356. """
  357. host_str = host.decode("ascii")
  358. try:
  359. IPAddress(host_str)
  360. return True
  361. except AddrFormatError:
  362. return False