client.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. import urllib
  18. from io import BytesIO
  19. import treq
  20. from canonicaljson import encode_canonical_json, json
  21. from netaddr import IPAddress
  22. from prometheus_client import Counter
  23. from zope.interface import implementer, provider
  24. from OpenSSL import SSL
  25. from OpenSSL.SSL import VERIFY_NONE
  26. from twisted.internet import defer, protocol, ssl
  27. from twisted.internet.interfaces import (
  28. IReactorPluggableNameResolver,
  29. IResolutionReceiver,
  30. )
  31. from twisted.python.failure import Failure
  32. from twisted.web._newclient import ResponseDone
  33. from twisted.web.client import Agent, HTTPConnectionPool, readBody
  34. from twisted.web.http import PotentialDataLoss
  35. from twisted.web.http_headers import Headers
  36. from synapse.api.errors import Codes, HttpResponseException, SynapseError
  37. from synapse.http import (
  38. QuieterFileBodyProducer,
  39. cancelled_to_request_timed_out_error,
  40. redact_uri,
  41. )
  42. from synapse.http.proxyagent import ProxyAgent
  43. from synapse.logging.context import make_deferred_yieldable
  44. from synapse.logging.opentracing import set_tag, start_active_span, tags
  45. from synapse.util.async_helpers import timeout_deferred
  46. logger = logging.getLogger(__name__)
  47. outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
  48. incoming_responses_counter = Counter(
  49. "synapse_http_client_responses", "", ["method", "code"]
  50. )
  51. def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
  52. """
  53. Args:
  54. ip_address (netaddr.IPAddress)
  55. ip_whitelist (netaddr.IPSet)
  56. ip_blacklist (netaddr.IPSet)
  57. """
  58. if ip_address in ip_blacklist:
  59. if ip_whitelist is None or ip_address not in ip_whitelist:
  60. return True
  61. return False
  62. class IPBlacklistingResolver(object):
  63. """
  64. A proxy for reactor.nameResolver which only produces non-blacklisted IP
  65. addresses, preventing DNS rebinding attacks on URL preview.
  66. """
  67. def __init__(self, reactor, ip_whitelist, ip_blacklist):
  68. """
  69. Args:
  70. reactor (twisted.internet.reactor)
  71. ip_whitelist (netaddr.IPSet)
  72. ip_blacklist (netaddr.IPSet)
  73. """
  74. self._reactor = reactor
  75. self._ip_whitelist = ip_whitelist
  76. self._ip_blacklist = ip_blacklist
  77. def resolveHostName(self, recv, hostname, portNumber=0):
  78. r = recv()
  79. addresses = []
  80. def _callback():
  81. r.resolutionBegan(None)
  82. has_bad_ip = False
  83. for i in addresses:
  84. ip_address = IPAddress(i.host)
  85. if check_against_blacklist(
  86. ip_address, self._ip_whitelist, self._ip_blacklist
  87. ):
  88. logger.info(
  89. "Dropped %s from DNS resolution to %s due to blacklist"
  90. % (ip_address, hostname)
  91. )
  92. has_bad_ip = True
  93. # if we have a blacklisted IP, we'd like to raise an error to block the
  94. # request, but all we can really do from here is claim that there were no
  95. # valid results.
  96. if not has_bad_ip:
  97. for i in addresses:
  98. r.addressResolved(i)
  99. r.resolutionComplete()
  100. @provider(IResolutionReceiver)
  101. class EndpointReceiver(object):
  102. @staticmethod
  103. def resolutionBegan(resolutionInProgress):
  104. pass
  105. @staticmethod
  106. def addressResolved(address):
  107. addresses.append(address)
  108. @staticmethod
  109. def resolutionComplete():
  110. _callback()
  111. self._reactor.nameResolver.resolveHostName(
  112. EndpointReceiver, hostname, portNumber=portNumber
  113. )
  114. return r
  115. class BlacklistingAgentWrapper(Agent):
  116. """
  117. An Agent wrapper which will prevent access to IP addresses being accessed
  118. directly (without an IP address lookup).
  119. """
  120. def __init__(self, agent, reactor, ip_whitelist=None, ip_blacklist=None):
  121. """
  122. Args:
  123. agent (twisted.web.client.Agent): The Agent to wrap.
  124. reactor (twisted.internet.reactor)
  125. ip_whitelist (netaddr.IPSet)
  126. ip_blacklist (netaddr.IPSet)
  127. """
  128. self._agent = agent
  129. self._ip_whitelist = ip_whitelist
  130. self._ip_blacklist = ip_blacklist
  131. def request(self, method, uri, headers=None, bodyProducer=None):
  132. h = urllib.parse.urlparse(uri.decode("ascii"))
  133. try:
  134. ip_address = IPAddress(h.hostname)
  135. if check_against_blacklist(
  136. ip_address, self._ip_whitelist, self._ip_blacklist
  137. ):
  138. logger.info("Blocking access to %s due to blacklist" % (ip_address,))
  139. e = SynapseError(403, "IP address blocked by IP blacklist entry")
  140. return defer.fail(Failure(e))
  141. except Exception:
  142. # Not an IP
  143. pass
  144. return self._agent.request(
  145. method, uri, headers=headers, bodyProducer=bodyProducer
  146. )
  147. class SimpleHttpClient(object):
  148. """
  149. A simple, no-frills HTTP client with methods that wrap up common ways of
  150. using HTTP in Matrix
  151. """
  152. def __init__(
  153. self,
  154. hs,
  155. treq_args={},
  156. ip_whitelist=None,
  157. ip_blacklist=None,
  158. http_proxy=None,
  159. https_proxy=None,
  160. ):
  161. """
  162. Args:
  163. hs (synapse.server.HomeServer)
  164. treq_args (dict): Extra keyword arguments to be given to treq.request.
  165. ip_blacklist (netaddr.IPSet): The IP addresses that are blacklisted that
  166. we may not request.
  167. ip_whitelist (netaddr.IPSet): The whitelisted IP addresses, that we can
  168. request if it were otherwise caught in a blacklist.
  169. http_proxy (bytes): proxy server to use for http connections. host[:port]
  170. https_proxy (bytes): proxy server to use for https connections. host[:port]
  171. """
  172. self.hs = hs
  173. self._ip_whitelist = ip_whitelist
  174. self._ip_blacklist = ip_blacklist
  175. self._extra_treq_args = treq_args
  176. self.user_agent = hs.version_string
  177. self.clock = hs.get_clock()
  178. if hs.config.user_agent_suffix:
  179. self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
  180. self.user_agent = self.user_agent.encode("ascii")
  181. if self._ip_blacklist:
  182. real_reactor = hs.get_reactor()
  183. # If we have an IP blacklist, we need to use a DNS resolver which
  184. # filters out blacklisted IP addresses, to prevent DNS rebinding.
  185. nameResolver = IPBlacklistingResolver(
  186. real_reactor, self._ip_whitelist, self._ip_blacklist
  187. )
  188. @implementer(IReactorPluggableNameResolver)
  189. class Reactor(object):
  190. def __getattr__(_self, attr):
  191. if attr == "nameResolver":
  192. return nameResolver
  193. else:
  194. return getattr(real_reactor, attr)
  195. self.reactor = Reactor()
  196. else:
  197. self.reactor = hs.get_reactor()
  198. # the pusher makes lots of concurrent SSL connections to sygnal, and
  199. # tends to do so in batches, so we need to allow the pool to keep
  200. # lots of idle connections around.
  201. pool = HTTPConnectionPool(self.reactor)
  202. # XXX: The justification for using the cache factor here is that larger instances
  203. # will need both more cache and more connections.
  204. # Still, this should probably be a separate dial
  205. pool.maxPersistentPerHost = max((100 * hs.config.caches.global_factor, 5))
  206. pool.cachedConnectionTimeout = 2 * 60
  207. self.agent = ProxyAgent(
  208. self.reactor,
  209. connectTimeout=15,
  210. contextFactory=self.hs.get_http_client_context_factory(),
  211. pool=pool,
  212. http_proxy=http_proxy,
  213. https_proxy=https_proxy,
  214. )
  215. if self._ip_blacklist:
  216. # If we have an IP blacklist, we then install the blacklisting Agent
  217. # which prevents direct access to IP addresses, that are not caught
  218. # by the DNS resolution.
  219. self.agent = BlacklistingAgentWrapper(
  220. self.agent,
  221. self.reactor,
  222. ip_whitelist=self._ip_whitelist,
  223. ip_blacklist=self._ip_blacklist,
  224. )
  225. @defer.inlineCallbacks
  226. def request(self, method, uri, data=None, headers=None):
  227. """
  228. Args:
  229. method (str): HTTP method to use.
  230. uri (str): URI to query.
  231. data (bytes): Data to send in the request body, if applicable.
  232. headers (t.w.http_headers.Headers): Request headers.
  233. """
  234. # A small wrapper around self.agent.request() so we can easily attach
  235. # counters to it
  236. outgoing_requests_counter.labels(method).inc()
  237. # log request but strip `access_token` (AS requests for example include this)
  238. logger.info("Sending request %s %s", method, redact_uri(uri))
  239. with start_active_span(
  240. "outgoing-client-request",
  241. tags={
  242. tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
  243. tags.HTTP_METHOD: method,
  244. tags.HTTP_URL: uri,
  245. },
  246. finish_on_close=True,
  247. ):
  248. try:
  249. body_producer = None
  250. if data is not None:
  251. body_producer = QuieterFileBodyProducer(BytesIO(data))
  252. request_deferred = treq.request(
  253. method,
  254. uri,
  255. agent=self.agent,
  256. data=body_producer,
  257. headers=headers,
  258. **self._extra_treq_args
  259. )
  260. request_deferred = timeout_deferred(
  261. request_deferred,
  262. 60,
  263. self.hs.get_reactor(),
  264. cancelled_to_request_timed_out_error,
  265. )
  266. response = yield make_deferred_yieldable(request_deferred)
  267. incoming_responses_counter.labels(method, response.code).inc()
  268. logger.info(
  269. "Received response to %s %s: %s",
  270. method,
  271. redact_uri(uri),
  272. response.code,
  273. )
  274. return response
  275. except Exception as e:
  276. incoming_responses_counter.labels(method, "ERR").inc()
  277. logger.info(
  278. "Error sending request to %s %s: %s %s",
  279. method,
  280. redact_uri(uri),
  281. type(e).__name__,
  282. e.args[0],
  283. )
  284. set_tag(tags.ERROR, True)
  285. set_tag("error_reason", e.args[0])
  286. raise
  287. @defer.inlineCallbacks
  288. def post_urlencoded_get_json(self, uri, args={}, headers=None):
  289. """
  290. Args:
  291. uri (str):
  292. args (dict[str, str|List[str]]): query params
  293. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  294. header name to a list of values for that header
  295. Returns:
  296. Deferred[object]: parsed json
  297. Raises:
  298. HttpResponseException: On a non-2xx HTTP response.
  299. ValueError: if the response was not JSON
  300. """
  301. # TODO: Do we ever want to log message contents?
  302. logger.debug("post_urlencoded_get_json args: %s", args)
  303. query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True).encode(
  304. "utf8"
  305. )
  306. actual_headers = {
  307. b"Content-Type": [b"application/x-www-form-urlencoded"],
  308. b"User-Agent": [self.user_agent],
  309. b"Accept": [b"application/json"],
  310. }
  311. if headers:
  312. actual_headers.update(headers)
  313. response = yield self.request(
  314. "POST", uri, headers=Headers(actual_headers), data=query_bytes
  315. )
  316. body = yield make_deferred_yieldable(readBody(response))
  317. if 200 <= response.code < 300:
  318. return json.loads(body)
  319. else:
  320. raise HttpResponseException(response.code, response.phrase, body)
  321. @defer.inlineCallbacks
  322. def post_json_get_json(self, uri, post_json, headers=None):
  323. """
  324. Args:
  325. uri (str):
  326. post_json (object):
  327. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  328. header name to a list of values for that header
  329. Returns:
  330. Deferred[object]: parsed json
  331. Raises:
  332. HttpResponseException: On a non-2xx HTTP response.
  333. ValueError: if the response was not JSON
  334. """
  335. json_str = encode_canonical_json(post_json)
  336. logger.debug("HTTP POST %s -> %s", json_str, uri)
  337. actual_headers = {
  338. b"Content-Type": [b"application/json"],
  339. b"User-Agent": [self.user_agent],
  340. b"Accept": [b"application/json"],
  341. }
  342. if headers:
  343. actual_headers.update(headers)
  344. response = yield self.request(
  345. "POST", uri, headers=Headers(actual_headers), data=json_str
  346. )
  347. body = yield make_deferred_yieldable(readBody(response))
  348. if 200 <= response.code < 300:
  349. return json.loads(body)
  350. else:
  351. raise HttpResponseException(response.code, response.phrase, body)
  352. @defer.inlineCallbacks
  353. def get_json(self, uri, args={}, headers=None):
  354. """ Gets some json from the given URI.
  355. Args:
  356. uri (str): The URI to request, not including query parameters
  357. args (dict): A dictionary used to create query strings, defaults to
  358. None.
  359. **Note**: The value of each key is assumed to be an iterable
  360. and *not* a string.
  361. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  362. header name to a list of values for that header
  363. Returns:
  364. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  365. HTTP body as JSON.
  366. Raises:
  367. HttpResponseException On a non-2xx HTTP response.
  368. ValueError: if the response was not JSON
  369. """
  370. actual_headers = {b"Accept": [b"application/json"]}
  371. if headers:
  372. actual_headers.update(headers)
  373. body = yield self.get_raw(uri, args, headers=headers)
  374. return json.loads(body)
  375. @defer.inlineCallbacks
  376. def put_json(self, uri, json_body, args={}, headers=None):
  377. """ Puts some json to the given URI.
  378. Args:
  379. uri (str): The URI to request, not including query parameters
  380. json_body (dict): The JSON to put in the HTTP body,
  381. args (dict): A dictionary used to create query strings, defaults to
  382. None.
  383. **Note**: The value of each key is assumed to be an iterable
  384. and *not* a string.
  385. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  386. header name to a list of values for that header
  387. Returns:
  388. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  389. HTTP body as JSON.
  390. Raises:
  391. HttpResponseException On a non-2xx HTTP response.
  392. ValueError: if the response was not JSON
  393. """
  394. if len(args):
  395. query_bytes = urllib.parse.urlencode(args, True)
  396. uri = "%s?%s" % (uri, query_bytes)
  397. json_str = encode_canonical_json(json_body)
  398. actual_headers = {
  399. b"Content-Type": [b"application/json"],
  400. b"User-Agent": [self.user_agent],
  401. b"Accept": [b"application/json"],
  402. }
  403. if headers:
  404. actual_headers.update(headers)
  405. response = yield self.request(
  406. "PUT", uri, headers=Headers(actual_headers), data=json_str
  407. )
  408. body = yield make_deferred_yieldable(readBody(response))
  409. if 200 <= response.code < 300:
  410. return json.loads(body)
  411. else:
  412. raise HttpResponseException(response.code, response.phrase, body)
  413. @defer.inlineCallbacks
  414. def get_raw(self, uri, args={}, headers=None):
  415. """ Gets raw text from the given URI.
  416. Args:
  417. uri (str): The URI to request, not including query parameters
  418. args (dict): A dictionary used to create query strings, defaults to
  419. None.
  420. **Note**: The value of each key is assumed to be an iterable
  421. and *not* a string.
  422. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  423. header name to a list of values for that header
  424. Returns:
  425. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  426. HTTP body at text.
  427. Raises:
  428. HttpResponseException on a non-2xx HTTP response.
  429. """
  430. if len(args):
  431. query_bytes = urllib.parse.urlencode(args, True)
  432. uri = "%s?%s" % (uri, query_bytes)
  433. actual_headers = {b"User-Agent": [self.user_agent]}
  434. if headers:
  435. actual_headers.update(headers)
  436. response = yield self.request("GET", uri, headers=Headers(actual_headers))
  437. body = yield make_deferred_yieldable(readBody(response))
  438. if 200 <= response.code < 300:
  439. return body
  440. else:
  441. raise HttpResponseException(response.code, response.phrase, body)
  442. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  443. # The two should be factored out.
  444. @defer.inlineCallbacks
  445. def get_file(self, url, output_stream, max_size=None, headers=None):
  446. """GETs a file from a given URL
  447. Args:
  448. url (str): The URL to GET
  449. output_stream (file): File to write the response body to.
  450. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  451. header name to a list of values for that header
  452. Returns:
  453. A (int,dict,string,int) tuple of the file length, dict of the response
  454. headers, absolute URI of the response and HTTP response code.
  455. """
  456. actual_headers = {b"User-Agent": [self.user_agent]}
  457. if headers:
  458. actual_headers.update(headers)
  459. response = yield self.request("GET", url, headers=Headers(actual_headers))
  460. resp_headers = dict(response.headers.getAllRawHeaders())
  461. if (
  462. b"Content-Length" in resp_headers
  463. and int(resp_headers[b"Content-Length"][0]) > max_size
  464. ):
  465. logger.warning("Requested URL is too large > %r bytes" % (self.max_size,))
  466. raise SynapseError(
  467. 502,
  468. "Requested file is too large > %r bytes" % (self.max_size,),
  469. Codes.TOO_LARGE,
  470. )
  471. if response.code > 299:
  472. logger.warning("Got %d when downloading %s" % (response.code, url))
  473. raise SynapseError(502, "Got error %d" % (response.code,), Codes.UNKNOWN)
  474. # TODO: if our Content-Type is HTML or something, just read the first
  475. # N bytes into RAM rather than saving it all to disk only to read it
  476. # straight back in again
  477. try:
  478. length = yield make_deferred_yieldable(
  479. _readBodyToFile(response, output_stream, max_size)
  480. )
  481. except SynapseError:
  482. # This can happen e.g. because the body is too large.
  483. raise
  484. except Exception as e:
  485. raise SynapseError(502, ("Failed to download remote body: %s" % e)) from e
  486. return (
  487. length,
  488. resp_headers,
  489. response.request.absoluteURI.decode("ascii"),
  490. response.code,
  491. )
  492. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  493. # The two should be factored out.
  494. class _ReadBodyToFileProtocol(protocol.Protocol):
  495. def __init__(self, stream, deferred, max_size):
  496. self.stream = stream
  497. self.deferred = deferred
  498. self.length = 0
  499. self.max_size = max_size
  500. def dataReceived(self, data):
  501. self.stream.write(data)
  502. self.length += len(data)
  503. if self.max_size is not None and self.length >= self.max_size:
  504. self.deferred.errback(
  505. SynapseError(
  506. 502,
  507. "Requested file is too large > %r bytes" % (self.max_size,),
  508. Codes.TOO_LARGE,
  509. )
  510. )
  511. self.deferred = defer.Deferred()
  512. self.transport.loseConnection()
  513. def connectionLost(self, reason):
  514. if reason.check(ResponseDone):
  515. self.deferred.callback(self.length)
  516. elif reason.check(PotentialDataLoss):
  517. # stolen from https://github.com/twisted/treq/pull/49/files
  518. # http://twistedmatrix.com/trac/ticket/4840
  519. self.deferred.callback(self.length)
  520. else:
  521. self.deferred.errback(reason)
  522. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  523. # The two should be factored out.
  524. def _readBodyToFile(response, stream, max_size):
  525. d = defer.Deferred()
  526. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  527. return d
  528. def encode_urlencode_args(args):
  529. return {k: encode_urlencode_arg(v) for k, v in args.items()}
  530. def encode_urlencode_arg(arg):
  531. if isinstance(arg, str):
  532. return arg.encode("utf-8")
  533. elif isinstance(arg, list):
  534. return [encode_urlencode_arg(i) for i in arg]
  535. else:
  536. return arg
  537. def _print_ex(e):
  538. if hasattr(e, "reasons") and e.reasons:
  539. for ex in e.reasons:
  540. _print_ex(ex)
  541. else:
  542. logger.exception(e)
  543. class InsecureInterceptableContextFactory(ssl.ContextFactory):
  544. """
  545. Factory for PyOpenSSL SSL contexts which accepts any certificate for any domain.
  546. Do not use this since it allows an attacker to intercept your communications.
  547. """
  548. def __init__(self):
  549. self._context = SSL.Context(SSL.SSLv23_METHOD)
  550. self._context.set_verify(VERIFY_NONE, lambda *_: None)
  551. def getContext(self, hostname=None, port=None):
  552. return self._context
  553. def creatorForNetloc(self, hostname, port):
  554. return self