client.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from io import BytesIO
  18. from six import raise_from, text_type
  19. from six.moves import urllib
  20. import treq
  21. from canonicaljson import encode_canonical_json, json
  22. from netaddr import IPAddress
  23. from prometheus_client import Counter
  24. from zope.interface import implementer, provider
  25. from OpenSSL import SSL
  26. from OpenSSL.SSL import VERIFY_NONE
  27. from twisted.internet import defer, protocol, ssl
  28. from twisted.internet.interfaces import (
  29. IReactorPluggableNameResolver,
  30. IResolutionReceiver,
  31. )
  32. from twisted.python.failure import Failure
  33. from twisted.web._newclient import ResponseDone
  34. from twisted.web.client import Agent, HTTPConnectionPool, readBody
  35. from twisted.web.http import PotentialDataLoss
  36. from twisted.web.http_headers import Headers
  37. from synapse.api.errors import Codes, HttpResponseException, SynapseError
  38. from synapse.http import (
  39. QuieterFileBodyProducer,
  40. cancelled_to_request_timed_out_error,
  41. redact_uri,
  42. )
  43. from synapse.http.proxyagent import ProxyAgent
  44. from synapse.logging.context import make_deferred_yieldable
  45. from synapse.logging.opentracing import set_tag, start_active_span, tags
  46. from synapse.util.async_helpers import timeout_deferred
  47. from synapse.util.caches import CACHE_SIZE_FACTOR
  48. logger = logging.getLogger(__name__)
  49. outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
  50. incoming_responses_counter = Counter(
  51. "synapse_http_client_responses", "", ["method", "code"]
  52. )
  53. def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
  54. """
  55. Args:
  56. ip_address (netaddr.IPAddress)
  57. ip_whitelist (netaddr.IPSet)
  58. ip_blacklist (netaddr.IPSet)
  59. """
  60. if ip_address in ip_blacklist:
  61. if ip_whitelist is None or ip_address not in ip_whitelist:
  62. return True
  63. return False
  64. class IPBlacklistingResolver(object):
  65. """
  66. A proxy for reactor.nameResolver which only produces non-blacklisted IP
  67. addresses, preventing DNS rebinding attacks on URL preview.
  68. """
  69. def __init__(self, reactor, ip_whitelist, ip_blacklist):
  70. """
  71. Args:
  72. reactor (twisted.internet.reactor)
  73. ip_whitelist (netaddr.IPSet)
  74. ip_blacklist (netaddr.IPSet)
  75. """
  76. self._reactor = reactor
  77. self._ip_whitelist = ip_whitelist
  78. self._ip_blacklist = ip_blacklist
  79. def resolveHostName(self, recv, hostname, portNumber=0):
  80. r = recv()
  81. addresses = []
  82. def _callback():
  83. r.resolutionBegan(None)
  84. has_bad_ip = False
  85. for i in addresses:
  86. ip_address = IPAddress(i.host)
  87. if check_against_blacklist(
  88. ip_address, self._ip_whitelist, self._ip_blacklist
  89. ):
  90. logger.info(
  91. "Dropped %s from DNS resolution to %s due to blacklist"
  92. % (ip_address, hostname)
  93. )
  94. has_bad_ip = True
  95. # if we have a blacklisted IP, we'd like to raise an error to block the
  96. # request, but all we can really do from here is claim that there were no
  97. # valid results.
  98. if not has_bad_ip:
  99. for i in addresses:
  100. r.addressResolved(i)
  101. r.resolutionComplete()
  102. @provider(IResolutionReceiver)
  103. class EndpointReceiver(object):
  104. @staticmethod
  105. def resolutionBegan(resolutionInProgress):
  106. pass
  107. @staticmethod
  108. def addressResolved(address):
  109. addresses.append(address)
  110. @staticmethod
  111. def resolutionComplete():
  112. _callback()
  113. self._reactor.nameResolver.resolveHostName(
  114. EndpointReceiver, hostname, portNumber=portNumber
  115. )
  116. return r
  117. class BlacklistingAgentWrapper(Agent):
  118. """
  119. An Agent wrapper which will prevent access to IP addresses being accessed
  120. directly (without an IP address lookup).
  121. """
  122. def __init__(self, agent, reactor, ip_whitelist=None, ip_blacklist=None):
  123. """
  124. Args:
  125. agent (twisted.web.client.Agent): The Agent to wrap.
  126. reactor (twisted.internet.reactor)
  127. ip_whitelist (netaddr.IPSet)
  128. ip_blacklist (netaddr.IPSet)
  129. """
  130. self._agent = agent
  131. self._ip_whitelist = ip_whitelist
  132. self._ip_blacklist = ip_blacklist
  133. def request(self, method, uri, headers=None, bodyProducer=None):
  134. h = urllib.parse.urlparse(uri.decode("ascii"))
  135. try:
  136. ip_address = IPAddress(h.hostname)
  137. if check_against_blacklist(
  138. ip_address, self._ip_whitelist, self._ip_blacklist
  139. ):
  140. logger.info("Blocking access to %s due to blacklist" % (ip_address,))
  141. e = SynapseError(403, "IP address blocked by IP blacklist entry")
  142. return defer.fail(Failure(e))
  143. except Exception:
  144. # Not an IP
  145. pass
  146. return self._agent.request(
  147. method, uri, headers=headers, bodyProducer=bodyProducer
  148. )
  149. class SimpleHttpClient(object):
  150. """
  151. A simple, no-frills HTTP client with methods that wrap up common ways of
  152. using HTTP in Matrix
  153. """
  154. def __init__(
  155. self,
  156. hs,
  157. treq_args={},
  158. ip_whitelist=None,
  159. ip_blacklist=None,
  160. http_proxy=None,
  161. https_proxy=None,
  162. ):
  163. """
  164. Args:
  165. hs (synapse.server.HomeServer)
  166. treq_args (dict): Extra keyword arguments to be given to treq.request.
  167. ip_blacklist (netaddr.IPSet): The IP addresses that are blacklisted that
  168. we may not request.
  169. ip_whitelist (netaddr.IPSet): The whitelisted IP addresses, that we can
  170. request if it were otherwise caught in a blacklist.
  171. http_proxy (bytes): proxy server to use for http connections. host[:port]
  172. https_proxy (bytes): proxy server to use for https connections. host[:port]
  173. """
  174. self.hs = hs
  175. self._ip_whitelist = ip_whitelist
  176. self._ip_blacklist = ip_blacklist
  177. self._extra_treq_args = treq_args
  178. self.user_agent = hs.version_string
  179. self.clock = hs.get_clock()
  180. if hs.config.user_agent_suffix:
  181. self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
  182. self.user_agent = self.user_agent.encode("ascii")
  183. if self._ip_blacklist:
  184. real_reactor = hs.get_reactor()
  185. # If we have an IP blacklist, we need to use a DNS resolver which
  186. # filters out blacklisted IP addresses, to prevent DNS rebinding.
  187. nameResolver = IPBlacklistingResolver(
  188. real_reactor, self._ip_whitelist, self._ip_blacklist
  189. )
  190. @implementer(IReactorPluggableNameResolver)
  191. class Reactor(object):
  192. def __getattr__(_self, attr):
  193. if attr == "nameResolver":
  194. return nameResolver
  195. else:
  196. return getattr(real_reactor, attr)
  197. self.reactor = Reactor()
  198. else:
  199. self.reactor = hs.get_reactor()
  200. # the pusher makes lots of concurrent SSL connections to sygnal, and
  201. # tends to do so in batches, so we need to allow the pool to keep
  202. # lots of idle connections around.
  203. pool = HTTPConnectionPool(self.reactor)
  204. pool.maxPersistentPerHost = max((100 * CACHE_SIZE_FACTOR, 5))
  205. pool.cachedConnectionTimeout = 2 * 60
  206. # The default context factory in Twisted 14.0.0 (which we require) is
  207. # BrowserLikePolicyForHTTPS which will do regular cert validation
  208. # 'like a browser'
  209. self.agent = ProxyAgent(
  210. self.reactor,
  211. connectTimeout=15,
  212. contextFactory=self.hs.get_http_client_context_factory(),
  213. pool=pool,
  214. http_proxy=http_proxy,
  215. https_proxy=https_proxy,
  216. )
  217. if self._ip_blacklist:
  218. # If we have an IP blacklist, we then install the blacklisting Agent
  219. # which prevents direct access to IP addresses, that are not caught
  220. # by the DNS resolution.
  221. self.agent = BlacklistingAgentWrapper(
  222. self.agent,
  223. self.reactor,
  224. ip_whitelist=self._ip_whitelist,
  225. ip_blacklist=self._ip_blacklist,
  226. )
  227. @defer.inlineCallbacks
  228. def request(self, method, uri, data=None, headers=None):
  229. """
  230. Args:
  231. method (str): HTTP method to use.
  232. uri (str): URI to query.
  233. data (bytes): Data to send in the request body, if applicable.
  234. headers (t.w.http_headers.Headers): Request headers.
  235. """
  236. # A small wrapper around self.agent.request() so we can easily attach
  237. # counters to it
  238. outgoing_requests_counter.labels(method).inc()
  239. # log request but strip `access_token` (AS requests for example include this)
  240. logger.info("Sending request %s %s", method, redact_uri(uri))
  241. with start_active_span(
  242. "outgoing-client-request",
  243. tags={
  244. tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
  245. tags.HTTP_METHOD: method,
  246. tags.HTTP_URL: uri,
  247. },
  248. finish_on_close=True,
  249. ):
  250. try:
  251. body_producer = None
  252. if data is not None:
  253. body_producer = QuieterFileBodyProducer(BytesIO(data))
  254. request_deferred = treq.request(
  255. method,
  256. uri,
  257. agent=self.agent,
  258. data=body_producer,
  259. headers=headers,
  260. **self._extra_treq_args
  261. )
  262. request_deferred = timeout_deferred(
  263. request_deferred,
  264. 60,
  265. self.hs.get_reactor(),
  266. cancelled_to_request_timed_out_error,
  267. )
  268. response = yield make_deferred_yieldable(request_deferred)
  269. incoming_responses_counter.labels(method, response.code).inc()
  270. logger.info(
  271. "Received response to %s %s: %s",
  272. method,
  273. redact_uri(uri),
  274. response.code,
  275. )
  276. return response
  277. except Exception as e:
  278. incoming_responses_counter.labels(method, "ERR").inc()
  279. logger.info(
  280. "Error sending request to %s %s: %s %s",
  281. method,
  282. redact_uri(uri),
  283. type(e).__name__,
  284. e.args[0],
  285. )
  286. set_tag(tags.ERROR, True)
  287. set_tag("error_reason", e.args[0])
  288. raise
  289. @defer.inlineCallbacks
  290. def post_urlencoded_get_json(self, uri, args={}, headers=None):
  291. """
  292. Args:
  293. uri (str):
  294. args (dict[str, str|List[str]]): query params
  295. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  296. header name to a list of values for that header
  297. Returns:
  298. Deferred[object]: parsed json
  299. Raises:
  300. HttpResponseException: On a non-2xx HTTP response.
  301. ValueError: if the response was not JSON
  302. """
  303. # TODO: Do we ever want to log message contents?
  304. logger.debug("post_urlencoded_get_json args: %s", args)
  305. query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True).encode(
  306. "utf8"
  307. )
  308. actual_headers = {
  309. b"Content-Type": [b"application/x-www-form-urlencoded"],
  310. b"User-Agent": [self.user_agent],
  311. }
  312. if headers:
  313. actual_headers.update(headers)
  314. response = yield self.request(
  315. "POST", uri, headers=Headers(actual_headers), data=query_bytes
  316. )
  317. body = yield make_deferred_yieldable(readBody(response))
  318. if 200 <= response.code < 300:
  319. return json.loads(body)
  320. else:
  321. raise HttpResponseException(response.code, response.phrase, body)
  322. @defer.inlineCallbacks
  323. def post_json_get_json(self, uri, post_json, headers=None):
  324. """
  325. Args:
  326. uri (str):
  327. post_json (object):
  328. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  329. header name to a list of values for that header
  330. Returns:
  331. Deferred[object]: parsed json
  332. Raises:
  333. HttpResponseException: On a non-2xx HTTP response.
  334. ValueError: if the response was not JSON
  335. """
  336. json_str = encode_canonical_json(post_json)
  337. logger.debug("HTTP POST %s -> %s", json_str, uri)
  338. actual_headers = {
  339. b"Content-Type": [b"application/json"],
  340. b"User-Agent": [self.user_agent],
  341. }
  342. if headers:
  343. actual_headers.update(headers)
  344. response = yield self.request(
  345. "POST", uri, headers=Headers(actual_headers), data=json_str
  346. )
  347. body = yield make_deferred_yieldable(readBody(response))
  348. if 200 <= response.code < 300:
  349. return json.loads(body)
  350. else:
  351. raise HttpResponseException(response.code, response.phrase, body)
  352. @defer.inlineCallbacks
  353. def get_json(self, uri, args={}, headers=None):
  354. """ Gets some json from the given URI.
  355. Args:
  356. uri (str): The URI to request, not including query parameters
  357. args (dict): A dictionary used to create query strings, defaults to
  358. None.
  359. **Note**: The value of each key is assumed to be an iterable
  360. and *not* a string.
  361. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  362. header name to a list of values for that header
  363. Returns:
  364. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  365. HTTP body as JSON.
  366. Raises:
  367. HttpResponseException On a non-2xx HTTP response.
  368. ValueError: if the response was not JSON
  369. """
  370. body = yield self.get_raw(uri, args, headers=headers)
  371. return json.loads(body)
  372. @defer.inlineCallbacks
  373. def put_json(self, uri, json_body, args={}, headers=None):
  374. """ Puts some json to the given URI.
  375. Args:
  376. uri (str): The URI to request, not including query parameters
  377. json_body (dict): The JSON to put in the HTTP body,
  378. args (dict): A dictionary used to create query strings, defaults to
  379. None.
  380. **Note**: The value of each key is assumed to be an iterable
  381. and *not* a string.
  382. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  383. header name to a list of values for that header
  384. Returns:
  385. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  386. HTTP body as JSON.
  387. Raises:
  388. HttpResponseException On a non-2xx HTTP response.
  389. ValueError: if the response was not JSON
  390. """
  391. if len(args):
  392. query_bytes = urllib.parse.urlencode(args, True)
  393. uri = "%s?%s" % (uri, query_bytes)
  394. json_str = encode_canonical_json(json_body)
  395. actual_headers = {
  396. b"Content-Type": [b"application/json"],
  397. b"User-Agent": [self.user_agent],
  398. }
  399. if headers:
  400. actual_headers.update(headers)
  401. response = yield self.request(
  402. "PUT", uri, headers=Headers(actual_headers), data=json_str
  403. )
  404. body = yield make_deferred_yieldable(readBody(response))
  405. if 200 <= response.code < 300:
  406. return json.loads(body)
  407. else:
  408. raise HttpResponseException(response.code, response.phrase, body)
  409. @defer.inlineCallbacks
  410. def get_raw(self, uri, args={}, headers=None):
  411. """ Gets raw text from the given URI.
  412. Args:
  413. uri (str): The URI to request, not including query parameters
  414. args (dict): A dictionary used to create query strings, defaults to
  415. None.
  416. **Note**: The value of each key is assumed to be an iterable
  417. and *not* a string.
  418. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  419. header name to a list of values for that header
  420. Returns:
  421. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  422. HTTP body at text.
  423. Raises:
  424. HttpResponseException on a non-2xx HTTP response.
  425. """
  426. if len(args):
  427. query_bytes = urllib.parse.urlencode(args, True)
  428. uri = "%s?%s" % (uri, query_bytes)
  429. actual_headers = {b"User-Agent": [self.user_agent]}
  430. if headers:
  431. actual_headers.update(headers)
  432. response = yield self.request("GET", uri, headers=Headers(actual_headers))
  433. body = yield make_deferred_yieldable(readBody(response))
  434. if 200 <= response.code < 300:
  435. return body
  436. else:
  437. raise HttpResponseException(response.code, response.phrase, body)
  438. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  439. # The two should be factored out.
  440. @defer.inlineCallbacks
  441. def get_file(self, url, output_stream, max_size=None, headers=None):
  442. """GETs a file from a given URL
  443. Args:
  444. url (str): The URL to GET
  445. output_stream (file): File to write the response body to.
  446. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  447. header name to a list of values for that header
  448. Returns:
  449. A (int,dict,string,int) tuple of the file length, dict of the response
  450. headers, absolute URI of the response and HTTP response code.
  451. """
  452. actual_headers = {b"User-Agent": [self.user_agent]}
  453. if headers:
  454. actual_headers.update(headers)
  455. response = yield self.request("GET", url, headers=Headers(actual_headers))
  456. resp_headers = dict(response.headers.getAllRawHeaders())
  457. if (
  458. b"Content-Length" in resp_headers
  459. and int(resp_headers[b"Content-Length"][0]) > max_size
  460. ):
  461. logger.warning("Requested URL is too large > %r bytes" % (self.max_size,))
  462. raise SynapseError(
  463. 502,
  464. "Requested file is too large > %r bytes" % (self.max_size,),
  465. Codes.TOO_LARGE,
  466. )
  467. if response.code > 299:
  468. logger.warning("Got %d when downloading %s" % (response.code, url))
  469. raise SynapseError(502, "Got error %d" % (response.code,), Codes.UNKNOWN)
  470. # TODO: if our Content-Type is HTML or something, just read the first
  471. # N bytes into RAM rather than saving it all to disk only to read it
  472. # straight back in again
  473. try:
  474. length = yield make_deferred_yieldable(
  475. _readBodyToFile(response, output_stream, max_size)
  476. )
  477. except SynapseError:
  478. # This can happen e.g. because the body is too large.
  479. raise
  480. except Exception as e:
  481. raise_from(SynapseError(502, ("Failed to download remote body: %s" % e)), e)
  482. return (
  483. length,
  484. resp_headers,
  485. response.request.absoluteURI.decode("ascii"),
  486. response.code,
  487. )
  488. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  489. # The two should be factored out.
  490. class _ReadBodyToFileProtocol(protocol.Protocol):
  491. def __init__(self, stream, deferred, max_size):
  492. self.stream = stream
  493. self.deferred = deferred
  494. self.length = 0
  495. self.max_size = max_size
  496. def dataReceived(self, data):
  497. self.stream.write(data)
  498. self.length += len(data)
  499. if self.max_size is not None and self.length >= self.max_size:
  500. self.deferred.errback(
  501. SynapseError(
  502. 502,
  503. "Requested file is too large > %r bytes" % (self.max_size,),
  504. Codes.TOO_LARGE,
  505. )
  506. )
  507. self.deferred = defer.Deferred()
  508. self.transport.loseConnection()
  509. def connectionLost(self, reason):
  510. if reason.check(ResponseDone):
  511. self.deferred.callback(self.length)
  512. elif reason.check(PotentialDataLoss):
  513. # stolen from https://github.com/twisted/treq/pull/49/files
  514. # http://twistedmatrix.com/trac/ticket/4840
  515. self.deferred.callback(self.length)
  516. else:
  517. self.deferred.errback(reason)
  518. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  519. # The two should be factored out.
  520. def _readBodyToFile(response, stream, max_size):
  521. d = defer.Deferred()
  522. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  523. return d
  524. def encode_urlencode_args(args):
  525. return {k: encode_urlencode_arg(v) for k, v in args.items()}
  526. def encode_urlencode_arg(arg):
  527. if isinstance(arg, text_type):
  528. return arg.encode("utf-8")
  529. elif isinstance(arg, list):
  530. return [encode_urlencode_arg(i) for i in arg]
  531. else:
  532. return arg
  533. def _print_ex(e):
  534. if hasattr(e, "reasons") and e.reasons:
  535. for ex in e.reasons:
  536. _print_ex(ex)
  537. else:
  538. logger.exception(e)
  539. class InsecureInterceptableContextFactory(ssl.ContextFactory):
  540. """
  541. Factory for PyOpenSSL SSL contexts which accepts any certificate for any domain.
  542. Do not use this since it allows an attacker to intercept your communications.
  543. """
  544. def __init__(self):
  545. self._context = SSL.Context(SSL.SSLv23_METHOD)
  546. self._context.set_verify(VERIFY_NONE, lambda *_: None)
  547. def getContext(self, hostname=None, port=None):
  548. return self._context
  549. def creatorForNetloc(self, hostname, port):
  550. return self