client.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from io import BytesIO
  18. from six import raise_from, text_type
  19. from six.moves import urllib
  20. import treq
  21. from canonicaljson import encode_canonical_json, json
  22. from netaddr import IPAddress
  23. from prometheus_client import Counter
  24. from zope.interface import implementer, provider
  25. from OpenSSL import SSL
  26. from OpenSSL.SSL import VERIFY_NONE
  27. from twisted.internet import defer, protocol, ssl
  28. from twisted.internet.interfaces import (
  29. IReactorPluggableNameResolver,
  30. IResolutionReceiver,
  31. )
  32. from twisted.python.failure import Failure
  33. from twisted.web._newclient import ResponseDone
  34. from twisted.web.client import Agent, HTTPConnectionPool, readBody
  35. from twisted.web.http import PotentialDataLoss
  36. from twisted.web.http_headers import Headers
  37. from synapse.api.errors import Codes, HttpResponseException, SynapseError
  38. from synapse.http import (
  39. QuieterFileBodyProducer,
  40. cancelled_to_request_timed_out_error,
  41. redact_uri,
  42. )
  43. from synapse.http.proxyagent import ProxyAgent
  44. from synapse.logging.context import make_deferred_yieldable
  45. from synapse.logging.opentracing import set_tag, start_active_span, tags
  46. from synapse.util.async_helpers import timeout_deferred
  47. from synapse.util.caches import CACHE_SIZE_FACTOR
  48. logger = logging.getLogger(__name__)
  49. outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
  50. incoming_responses_counter = Counter(
  51. "synapse_http_client_responses", "", ["method", "code"]
  52. )
  53. def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
  54. """
  55. Args:
  56. ip_address (netaddr.IPAddress)
  57. ip_whitelist (netaddr.IPSet)
  58. ip_blacklist (netaddr.IPSet)
  59. """
  60. if ip_address in ip_blacklist:
  61. if ip_whitelist is None or ip_address not in ip_whitelist:
  62. return True
  63. return False
  64. class IPBlacklistingResolver(object):
  65. """
  66. A proxy for reactor.nameResolver which only produces non-blacklisted IP
  67. addresses, preventing DNS rebinding attacks on URL preview.
  68. """
  69. def __init__(self, reactor, ip_whitelist, ip_blacklist):
  70. """
  71. Args:
  72. reactor (twisted.internet.reactor)
  73. ip_whitelist (netaddr.IPSet)
  74. ip_blacklist (netaddr.IPSet)
  75. """
  76. self._reactor = reactor
  77. self._ip_whitelist = ip_whitelist
  78. self._ip_blacklist = ip_blacklist
  79. def resolveHostName(self, recv, hostname, portNumber=0):
  80. r = recv()
  81. addresses = []
  82. def _callback():
  83. r.resolutionBegan(None)
  84. has_bad_ip = False
  85. for i in addresses:
  86. ip_address = IPAddress(i.host)
  87. if check_against_blacklist(
  88. ip_address, self._ip_whitelist, self._ip_blacklist
  89. ):
  90. logger.info(
  91. "Dropped %s from DNS resolution to %s due to blacklist"
  92. % (ip_address, hostname)
  93. )
  94. has_bad_ip = True
  95. # if we have a blacklisted IP, we'd like to raise an error to block the
  96. # request, but all we can really do from here is claim that there were no
  97. # valid results.
  98. if not has_bad_ip:
  99. for i in addresses:
  100. r.addressResolved(i)
  101. r.resolutionComplete()
  102. @provider(IResolutionReceiver)
  103. class EndpointReceiver(object):
  104. @staticmethod
  105. def resolutionBegan(resolutionInProgress):
  106. pass
  107. @staticmethod
  108. def addressResolved(address):
  109. addresses.append(address)
  110. @staticmethod
  111. def resolutionComplete():
  112. _callback()
  113. self._reactor.nameResolver.resolveHostName(
  114. EndpointReceiver, hostname, portNumber=portNumber
  115. )
  116. return r
  117. class BlacklistingAgentWrapper(Agent):
  118. """
  119. An Agent wrapper which will prevent access to IP addresses being accessed
  120. directly (without an IP address lookup).
  121. """
  122. def __init__(self, agent, reactor, ip_whitelist=None, ip_blacklist=None):
  123. """
  124. Args:
  125. agent (twisted.web.client.Agent): The Agent to wrap.
  126. reactor (twisted.internet.reactor)
  127. ip_whitelist (netaddr.IPSet)
  128. ip_blacklist (netaddr.IPSet)
  129. """
  130. self._agent = agent
  131. self._ip_whitelist = ip_whitelist
  132. self._ip_blacklist = ip_blacklist
  133. def request(self, method, uri, headers=None, bodyProducer=None):
  134. h = urllib.parse.urlparse(uri.decode("ascii"))
  135. try:
  136. ip_address = IPAddress(h.hostname)
  137. if check_against_blacklist(
  138. ip_address, self._ip_whitelist, self._ip_blacklist
  139. ):
  140. logger.info("Blocking access to %s due to blacklist" % (ip_address,))
  141. e = SynapseError(403, "IP address blocked by IP blacklist entry")
  142. return defer.fail(Failure(e))
  143. except Exception:
  144. # Not an IP
  145. pass
  146. return self._agent.request(
  147. method, uri, headers=headers, bodyProducer=bodyProducer
  148. )
  149. class SimpleHttpClient(object):
  150. """
  151. A simple, no-frills HTTP client with methods that wrap up common ways of
  152. using HTTP in Matrix
  153. """
  154. def __init__(
  155. self,
  156. hs,
  157. treq_args={},
  158. ip_whitelist=None,
  159. ip_blacklist=None,
  160. http_proxy=None,
  161. https_proxy=None,
  162. ):
  163. """
  164. Args:
  165. hs (synapse.server.HomeServer)
  166. treq_args (dict): Extra keyword arguments to be given to treq.request.
  167. ip_blacklist (netaddr.IPSet): The IP addresses that are blacklisted that
  168. we may not request.
  169. ip_whitelist (netaddr.IPSet): The whitelisted IP addresses, that we can
  170. request if it were otherwise caught in a blacklist.
  171. http_proxy (bytes): proxy server to use for http connections. host[:port]
  172. https_proxy (bytes): proxy server to use for https connections. host[:port]
  173. """
  174. self.hs = hs
  175. self._ip_whitelist = ip_whitelist
  176. self._ip_blacklist = ip_blacklist
  177. self._extra_treq_args = treq_args
  178. self.user_agent = hs.version_string
  179. self.clock = hs.get_clock()
  180. if hs.config.user_agent_suffix:
  181. self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
  182. self.user_agent = self.user_agent.encode("ascii")
  183. if self._ip_blacklist:
  184. real_reactor = hs.get_reactor()
  185. # If we have an IP blacklist, we need to use a DNS resolver which
  186. # filters out blacklisted IP addresses, to prevent DNS rebinding.
  187. nameResolver = IPBlacklistingResolver(
  188. real_reactor, self._ip_whitelist, self._ip_blacklist
  189. )
  190. @implementer(IReactorPluggableNameResolver)
  191. class Reactor(object):
  192. def __getattr__(_self, attr):
  193. if attr == "nameResolver":
  194. return nameResolver
  195. else:
  196. return getattr(real_reactor, attr)
  197. self.reactor = Reactor()
  198. else:
  199. self.reactor = hs.get_reactor()
  200. # the pusher makes lots of concurrent SSL connections to sygnal, and
  201. # tends to do so in batches, so we need to allow the pool to keep
  202. # lots of idle connections around.
  203. pool = HTTPConnectionPool(self.reactor)
  204. pool.maxPersistentPerHost = max((100 * CACHE_SIZE_FACTOR, 5))
  205. pool.cachedConnectionTimeout = 2 * 60
  206. self.agent = ProxyAgent(
  207. self.reactor,
  208. connectTimeout=15,
  209. contextFactory=self.hs.get_http_client_context_factory(),
  210. pool=pool,
  211. http_proxy=http_proxy,
  212. https_proxy=https_proxy,
  213. )
  214. if self._ip_blacklist:
  215. # If we have an IP blacklist, we then install the blacklisting Agent
  216. # which prevents direct access to IP addresses, that are not caught
  217. # by the DNS resolution.
  218. self.agent = BlacklistingAgentWrapper(
  219. self.agent,
  220. self.reactor,
  221. ip_whitelist=self._ip_whitelist,
  222. ip_blacklist=self._ip_blacklist,
  223. )
  224. @defer.inlineCallbacks
  225. def request(self, method, uri, data=None, headers=None):
  226. """
  227. Args:
  228. method (str): HTTP method to use.
  229. uri (str): URI to query.
  230. data (bytes): Data to send in the request body, if applicable.
  231. headers (t.w.http_headers.Headers): Request headers.
  232. """
  233. # A small wrapper around self.agent.request() so we can easily attach
  234. # counters to it
  235. outgoing_requests_counter.labels(method).inc()
  236. # log request but strip `access_token` (AS requests for example include this)
  237. logger.info("Sending request %s %s", method, redact_uri(uri))
  238. with start_active_span(
  239. "outgoing-client-request",
  240. tags={
  241. tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
  242. tags.HTTP_METHOD: method,
  243. tags.HTTP_URL: uri,
  244. },
  245. finish_on_close=True,
  246. ):
  247. try:
  248. body_producer = None
  249. if data is not None:
  250. body_producer = QuieterFileBodyProducer(BytesIO(data))
  251. request_deferred = treq.request(
  252. method,
  253. uri,
  254. agent=self.agent,
  255. data=body_producer,
  256. headers=headers,
  257. **self._extra_treq_args
  258. )
  259. request_deferred = timeout_deferred(
  260. request_deferred,
  261. 60,
  262. self.hs.get_reactor(),
  263. cancelled_to_request_timed_out_error,
  264. )
  265. response = yield make_deferred_yieldable(request_deferred)
  266. incoming_responses_counter.labels(method, response.code).inc()
  267. logger.info(
  268. "Received response to %s %s: %s",
  269. method,
  270. redact_uri(uri),
  271. response.code,
  272. )
  273. return response
  274. except Exception as e:
  275. incoming_responses_counter.labels(method, "ERR").inc()
  276. logger.info(
  277. "Error sending request to %s %s: %s %s",
  278. method,
  279. redact_uri(uri),
  280. type(e).__name__,
  281. e.args[0],
  282. )
  283. set_tag(tags.ERROR, True)
  284. set_tag("error_reason", e.args[0])
  285. raise
  286. @defer.inlineCallbacks
  287. def post_urlencoded_get_json(self, uri, args={}, headers=None):
  288. """
  289. Args:
  290. uri (str):
  291. args (dict[str, str|List[str]]): query params
  292. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  293. header name to a list of values for that header
  294. Returns:
  295. Deferred[object]: parsed json
  296. Raises:
  297. HttpResponseException: On a non-2xx HTTP response.
  298. ValueError: if the response was not JSON
  299. """
  300. # TODO: Do we ever want to log message contents?
  301. logger.debug("post_urlencoded_get_json args: %s", args)
  302. query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True).encode(
  303. "utf8"
  304. )
  305. actual_headers = {
  306. b"Content-Type": [b"application/x-www-form-urlencoded"],
  307. b"User-Agent": [self.user_agent],
  308. b"Accept": [b"application/json"],
  309. }
  310. if headers:
  311. actual_headers.update(headers)
  312. response = yield self.request(
  313. "POST", uri, headers=Headers(actual_headers), data=query_bytes
  314. )
  315. body = yield make_deferred_yieldable(readBody(response))
  316. if 200 <= response.code < 300:
  317. return json.loads(body)
  318. else:
  319. raise HttpResponseException(response.code, response.phrase, body)
  320. @defer.inlineCallbacks
  321. def post_json_get_json(self, uri, post_json, headers=None):
  322. """
  323. Args:
  324. uri (str):
  325. post_json (object):
  326. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  327. header name to a list of values for that header
  328. Returns:
  329. Deferred[object]: parsed json
  330. Raises:
  331. HttpResponseException: On a non-2xx HTTP response.
  332. ValueError: if the response was not JSON
  333. """
  334. json_str = encode_canonical_json(post_json)
  335. logger.debug("HTTP POST %s -> %s", json_str, uri)
  336. actual_headers = {
  337. b"Content-Type": [b"application/json"],
  338. b"User-Agent": [self.user_agent],
  339. b"Accept": [b"application/json"],
  340. }
  341. if headers:
  342. actual_headers.update(headers)
  343. response = yield self.request(
  344. "POST", uri, headers=Headers(actual_headers), data=json_str
  345. )
  346. body = yield make_deferred_yieldable(readBody(response))
  347. if 200 <= response.code < 300:
  348. return json.loads(body)
  349. else:
  350. raise HttpResponseException(response.code, response.phrase, body)
  351. @defer.inlineCallbacks
  352. def get_json(self, uri, args={}, headers=None):
  353. """ Gets some json from the given URI.
  354. Args:
  355. uri (str): The URI to request, not including query parameters
  356. args (dict): A dictionary used to create query strings, defaults to
  357. None.
  358. **Note**: The value of each key is assumed to be an iterable
  359. and *not* a string.
  360. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  361. header name to a list of values for that header
  362. Returns:
  363. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  364. HTTP body as JSON.
  365. Raises:
  366. HttpResponseException On a non-2xx HTTP response.
  367. ValueError: if the response was not JSON
  368. """
  369. actual_headers = {b"Accept": [b"application/json"]}
  370. if headers:
  371. actual_headers.update(headers)
  372. body = yield self.get_raw(uri, args, headers=headers)
  373. return json.loads(body)
  374. @defer.inlineCallbacks
  375. def put_json(self, uri, json_body, args={}, headers=None):
  376. """ Puts some json to the given URI.
  377. Args:
  378. uri (str): The URI to request, not including query parameters
  379. json_body (dict): The JSON to put in the HTTP body,
  380. args (dict): A dictionary used to create query strings, defaults to
  381. None.
  382. **Note**: The value of each key is assumed to be an iterable
  383. and *not* a string.
  384. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  385. header name to a list of values for that header
  386. Returns:
  387. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  388. HTTP body as JSON.
  389. Raises:
  390. HttpResponseException On a non-2xx HTTP response.
  391. ValueError: if the response was not JSON
  392. """
  393. if len(args):
  394. query_bytes = urllib.parse.urlencode(args, True)
  395. uri = "%s?%s" % (uri, query_bytes)
  396. json_str = encode_canonical_json(json_body)
  397. actual_headers = {
  398. b"Content-Type": [b"application/json"],
  399. b"User-Agent": [self.user_agent],
  400. b"Accept": [b"application/json"],
  401. }
  402. if headers:
  403. actual_headers.update(headers)
  404. response = yield self.request(
  405. "PUT", uri, headers=Headers(actual_headers), data=json_str
  406. )
  407. body = yield make_deferred_yieldable(readBody(response))
  408. if 200 <= response.code < 300:
  409. return json.loads(body)
  410. else:
  411. raise HttpResponseException(response.code, response.phrase, body)
  412. @defer.inlineCallbacks
  413. def get_raw(self, uri, args={}, headers=None):
  414. """ Gets raw text from the given URI.
  415. Args:
  416. uri (str): The URI to request, not including query parameters
  417. args (dict): A dictionary used to create query strings, defaults to
  418. None.
  419. **Note**: The value of each key is assumed to be an iterable
  420. and *not* a string.
  421. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  422. header name to a list of values for that header
  423. Returns:
  424. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  425. HTTP body at text.
  426. Raises:
  427. HttpResponseException on a non-2xx HTTP response.
  428. """
  429. if len(args):
  430. query_bytes = urllib.parse.urlencode(args, True)
  431. uri = "%s?%s" % (uri, query_bytes)
  432. actual_headers = {b"User-Agent": [self.user_agent]}
  433. if headers:
  434. actual_headers.update(headers)
  435. response = yield self.request("GET", uri, headers=Headers(actual_headers))
  436. body = yield make_deferred_yieldable(readBody(response))
  437. if 200 <= response.code < 300:
  438. return body
  439. else:
  440. raise HttpResponseException(response.code, response.phrase, body)
  441. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  442. # The two should be factored out.
  443. @defer.inlineCallbacks
  444. def get_file(self, url, output_stream, max_size=None, headers=None):
  445. """GETs a file from a given URL
  446. Args:
  447. url (str): The URL to GET
  448. output_stream (file): File to write the response body to.
  449. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  450. header name to a list of values for that header
  451. Returns:
  452. A (int,dict,string,int) tuple of the file length, dict of the response
  453. headers, absolute URI of the response and HTTP response code.
  454. """
  455. actual_headers = {b"User-Agent": [self.user_agent]}
  456. if headers:
  457. actual_headers.update(headers)
  458. response = yield self.request("GET", url, headers=Headers(actual_headers))
  459. resp_headers = dict(response.headers.getAllRawHeaders())
  460. if (
  461. b"Content-Length" in resp_headers
  462. and int(resp_headers[b"Content-Length"][0]) > max_size
  463. ):
  464. logger.warning("Requested URL is too large > %r bytes" % (self.max_size,))
  465. raise SynapseError(
  466. 502,
  467. "Requested file is too large > %r bytes" % (self.max_size,),
  468. Codes.TOO_LARGE,
  469. )
  470. if response.code > 299:
  471. logger.warning("Got %d when downloading %s" % (response.code, url))
  472. raise SynapseError(502, "Got error %d" % (response.code,), Codes.UNKNOWN)
  473. # TODO: if our Content-Type is HTML or something, just read the first
  474. # N bytes into RAM rather than saving it all to disk only to read it
  475. # straight back in again
  476. try:
  477. length = yield make_deferred_yieldable(
  478. _readBodyToFile(response, output_stream, max_size)
  479. )
  480. except SynapseError:
  481. # This can happen e.g. because the body is too large.
  482. raise
  483. except Exception as e:
  484. raise_from(SynapseError(502, ("Failed to download remote body: %s" % e)), e)
  485. return (
  486. length,
  487. resp_headers,
  488. response.request.absoluteURI.decode("ascii"),
  489. response.code,
  490. )
  491. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  492. # The two should be factored out.
  493. class _ReadBodyToFileProtocol(protocol.Protocol):
  494. def __init__(self, stream, deferred, max_size):
  495. self.stream = stream
  496. self.deferred = deferred
  497. self.length = 0
  498. self.max_size = max_size
  499. def dataReceived(self, data):
  500. self.stream.write(data)
  501. self.length += len(data)
  502. if self.max_size is not None and self.length >= self.max_size:
  503. self.deferred.errback(
  504. SynapseError(
  505. 502,
  506. "Requested file is too large > %r bytes" % (self.max_size,),
  507. Codes.TOO_LARGE,
  508. )
  509. )
  510. self.deferred = defer.Deferred()
  511. self.transport.loseConnection()
  512. def connectionLost(self, reason):
  513. if reason.check(ResponseDone):
  514. self.deferred.callback(self.length)
  515. elif reason.check(PotentialDataLoss):
  516. # stolen from https://github.com/twisted/treq/pull/49/files
  517. # http://twistedmatrix.com/trac/ticket/4840
  518. self.deferred.callback(self.length)
  519. else:
  520. self.deferred.errback(reason)
  521. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  522. # The two should be factored out.
  523. def _readBodyToFile(response, stream, max_size):
  524. d = defer.Deferred()
  525. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  526. return d
  527. def encode_urlencode_args(args):
  528. return {k: encode_urlencode_arg(v) for k, v in args.items()}
  529. def encode_urlencode_arg(arg):
  530. if isinstance(arg, text_type):
  531. return arg.encode("utf-8")
  532. elif isinstance(arg, list):
  533. return [encode_urlencode_arg(i) for i in arg]
  534. else:
  535. return arg
  536. def _print_ex(e):
  537. if hasattr(e, "reasons") and e.reasons:
  538. for ex in e.reasons:
  539. _print_ex(ex)
  540. else:
  541. logger.exception(e)
  542. class InsecureInterceptableContextFactory(ssl.ContextFactory):
  543. """
  544. Factory for PyOpenSSL SSL contexts which accepts any certificate for any domain.
  545. Do not use this since it allows an attacker to intercept your communications.
  546. """
  547. def __init__(self):
  548. self._context = SSL.Context(SSL.SSLv23_METHOD)
  549. self._context.set_verify(VERIFY_NONE, lambda *_: None)
  550. def getContext(self, hostname=None, port=None):
  551. return self._context
  552. def creatorForNetloc(self, hostname, port):
  553. return self