client.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from io import BytesIO
  18. from six import raise_from, text_type
  19. from six.moves import urllib
  20. import treq
  21. from canonicaljson import encode_canonical_json, json
  22. from netaddr import IPAddress
  23. from prometheus_client import Counter
  24. from zope.interface import implementer, provider
  25. from OpenSSL import SSL
  26. from OpenSSL.SSL import VERIFY_NONE
  27. from twisted.internet import defer, protocol, ssl
  28. from twisted.internet.interfaces import (
  29. IReactorPluggableNameResolver,
  30. IResolutionReceiver,
  31. )
  32. from twisted.python.failure import Failure
  33. from twisted.web._newclient import ResponseDone
  34. from twisted.web.client import Agent, HTTPConnectionPool, readBody
  35. from twisted.web.http import PotentialDataLoss
  36. from twisted.web.http_headers import Headers
  37. from synapse.api.errors import Codes, HttpResponseException, SynapseError
  38. from synapse.http import (
  39. QuieterFileBodyProducer,
  40. cancelled_to_request_timed_out_error,
  41. redact_uri,
  42. )
  43. from synapse.http.proxyagent import ProxyAgent
  44. from synapse.logging.context import make_deferred_yieldable
  45. from synapse.logging.opentracing import set_tag, start_active_span, tags
  46. from synapse.util.async_helpers import timeout_deferred
  47. logger = logging.getLogger(__name__)
  48. outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
  49. incoming_responses_counter = Counter(
  50. "synapse_http_client_responses", "", ["method", "code"]
  51. )
  52. def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
  53. """
  54. Args:
  55. ip_address (netaddr.IPAddress)
  56. ip_whitelist (netaddr.IPSet)
  57. ip_blacklist (netaddr.IPSet)
  58. """
  59. if ip_address in ip_blacklist:
  60. if ip_whitelist is None or ip_address not in ip_whitelist:
  61. return True
  62. return False
  63. class IPBlacklistingResolver(object):
  64. """
  65. A proxy for reactor.nameResolver which only produces non-blacklisted IP
  66. addresses, preventing DNS rebinding attacks on URL preview.
  67. """
  68. def __init__(self, reactor, ip_whitelist, ip_blacklist):
  69. """
  70. Args:
  71. reactor (twisted.internet.reactor)
  72. ip_whitelist (netaddr.IPSet)
  73. ip_blacklist (netaddr.IPSet)
  74. """
  75. self._reactor = reactor
  76. self._ip_whitelist = ip_whitelist
  77. self._ip_blacklist = ip_blacklist
  78. def resolveHostName(self, recv, hostname, portNumber=0):
  79. r = recv()
  80. addresses = []
  81. def _callback():
  82. r.resolutionBegan(None)
  83. has_bad_ip = False
  84. for i in addresses:
  85. ip_address = IPAddress(i.host)
  86. if check_against_blacklist(
  87. ip_address, self._ip_whitelist, self._ip_blacklist
  88. ):
  89. logger.info(
  90. "Dropped %s from DNS resolution to %s due to blacklist"
  91. % (ip_address, hostname)
  92. )
  93. has_bad_ip = True
  94. # if we have a blacklisted IP, we'd like to raise an error to block the
  95. # request, but all we can really do from here is claim that there were no
  96. # valid results.
  97. if not has_bad_ip:
  98. for i in addresses:
  99. r.addressResolved(i)
  100. r.resolutionComplete()
  101. @provider(IResolutionReceiver)
  102. class EndpointReceiver(object):
  103. @staticmethod
  104. def resolutionBegan(resolutionInProgress):
  105. pass
  106. @staticmethod
  107. def addressResolved(address):
  108. addresses.append(address)
  109. @staticmethod
  110. def resolutionComplete():
  111. _callback()
  112. self._reactor.nameResolver.resolveHostName(
  113. EndpointReceiver, hostname, portNumber=portNumber
  114. )
  115. return r
  116. class BlacklistingAgentWrapper(Agent):
  117. """
  118. An Agent wrapper which will prevent access to IP addresses being accessed
  119. directly (without an IP address lookup).
  120. """
  121. def __init__(self, agent, reactor, ip_whitelist=None, ip_blacklist=None):
  122. """
  123. Args:
  124. agent (twisted.web.client.Agent): The Agent to wrap.
  125. reactor (twisted.internet.reactor)
  126. ip_whitelist (netaddr.IPSet)
  127. ip_blacklist (netaddr.IPSet)
  128. """
  129. self._agent = agent
  130. self._ip_whitelist = ip_whitelist
  131. self._ip_blacklist = ip_blacklist
  132. def request(self, method, uri, headers=None, bodyProducer=None):
  133. h = urllib.parse.urlparse(uri.decode("ascii"))
  134. try:
  135. ip_address = IPAddress(h.hostname)
  136. if check_against_blacklist(
  137. ip_address, self._ip_whitelist, self._ip_blacklist
  138. ):
  139. logger.info("Blocking access to %s due to blacklist" % (ip_address,))
  140. e = SynapseError(403, "IP address blocked by IP blacklist entry")
  141. return defer.fail(Failure(e))
  142. except Exception:
  143. # Not an IP
  144. pass
  145. return self._agent.request(
  146. method, uri, headers=headers, bodyProducer=bodyProducer
  147. )
  148. class SimpleHttpClient(object):
  149. """
  150. A simple, no-frills HTTP client with methods that wrap up common ways of
  151. using HTTP in Matrix
  152. """
  153. def __init__(
  154. self,
  155. hs,
  156. treq_args={},
  157. ip_whitelist=None,
  158. ip_blacklist=None,
  159. http_proxy=None,
  160. https_proxy=None,
  161. ):
  162. """
  163. Args:
  164. hs (synapse.server.HomeServer)
  165. treq_args (dict): Extra keyword arguments to be given to treq.request.
  166. ip_blacklist (netaddr.IPSet): The IP addresses that are blacklisted that
  167. we may not request.
  168. ip_whitelist (netaddr.IPSet): The whitelisted IP addresses, that we can
  169. request if it were otherwise caught in a blacklist.
  170. http_proxy (bytes): proxy server to use for http connections. host[:port]
  171. https_proxy (bytes): proxy server to use for https connections. host[:port]
  172. """
  173. self.hs = hs
  174. self._ip_whitelist = ip_whitelist
  175. self._ip_blacklist = ip_blacklist
  176. self._extra_treq_args = treq_args
  177. self.user_agent = hs.version_string
  178. self.clock = hs.get_clock()
  179. if hs.config.user_agent_suffix:
  180. self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
  181. self.user_agent = self.user_agent.encode("ascii")
  182. if self._ip_blacklist:
  183. real_reactor = hs.get_reactor()
  184. # If we have an IP blacklist, we need to use a DNS resolver which
  185. # filters out blacklisted IP addresses, to prevent DNS rebinding.
  186. nameResolver = IPBlacklistingResolver(
  187. real_reactor, self._ip_whitelist, self._ip_blacklist
  188. )
  189. @implementer(IReactorPluggableNameResolver)
  190. class Reactor(object):
  191. def __getattr__(_self, attr):
  192. if attr == "nameResolver":
  193. return nameResolver
  194. else:
  195. return getattr(real_reactor, attr)
  196. self.reactor = Reactor()
  197. else:
  198. self.reactor = hs.get_reactor()
  199. # the pusher makes lots of concurrent SSL connections to sygnal, and
  200. # tends to do so in batches, so we need to allow the pool to keep
  201. # lots of idle connections around.
  202. pool = HTTPConnectionPool(self.reactor)
  203. # XXX: The justification for using the cache factor here is that larger instances
  204. # will need both more cache and more connections.
  205. # Still, this should probably be a separate dial
  206. pool.maxPersistentPerHost = max((100 * hs.config.caches.global_factor, 5))
  207. pool.cachedConnectionTimeout = 2 * 60
  208. self.agent = ProxyAgent(
  209. self.reactor,
  210. connectTimeout=15,
  211. contextFactory=self.hs.get_http_client_context_factory(),
  212. pool=pool,
  213. http_proxy=http_proxy,
  214. https_proxy=https_proxy,
  215. )
  216. if self._ip_blacklist:
  217. # If we have an IP blacklist, we then install the blacklisting Agent
  218. # which prevents direct access to IP addresses, that are not caught
  219. # by the DNS resolution.
  220. self.agent = BlacklistingAgentWrapper(
  221. self.agent,
  222. self.reactor,
  223. ip_whitelist=self._ip_whitelist,
  224. ip_blacklist=self._ip_blacklist,
  225. )
  226. @defer.inlineCallbacks
  227. def request(self, method, uri, data=None, headers=None):
  228. """
  229. Args:
  230. method (str): HTTP method to use.
  231. uri (str): URI to query.
  232. data (bytes): Data to send in the request body, if applicable.
  233. headers (t.w.http_headers.Headers): Request headers.
  234. """
  235. # A small wrapper around self.agent.request() so we can easily attach
  236. # counters to it
  237. outgoing_requests_counter.labels(method).inc()
  238. # log request but strip `access_token` (AS requests for example include this)
  239. logger.info("Sending request %s %s", method, redact_uri(uri))
  240. with start_active_span(
  241. "outgoing-client-request",
  242. tags={
  243. tags.SPAN_KIND: tags.SPAN_KIND_RPC_CLIENT,
  244. tags.HTTP_METHOD: method,
  245. tags.HTTP_URL: uri,
  246. },
  247. finish_on_close=True,
  248. ):
  249. try:
  250. body_producer = None
  251. if data is not None:
  252. body_producer = QuieterFileBodyProducer(BytesIO(data))
  253. request_deferred = treq.request(
  254. method,
  255. uri,
  256. agent=self.agent,
  257. data=body_producer,
  258. headers=headers,
  259. **self._extra_treq_args
  260. )
  261. request_deferred = timeout_deferred(
  262. request_deferred,
  263. 60,
  264. self.hs.get_reactor(),
  265. cancelled_to_request_timed_out_error,
  266. )
  267. response = yield make_deferred_yieldable(request_deferred)
  268. incoming_responses_counter.labels(method, response.code).inc()
  269. logger.info(
  270. "Received response to %s %s: %s",
  271. method,
  272. redact_uri(uri),
  273. response.code,
  274. )
  275. return response
  276. except Exception as e:
  277. incoming_responses_counter.labels(method, "ERR").inc()
  278. logger.info(
  279. "Error sending request to %s %s: %s %s",
  280. method,
  281. redact_uri(uri),
  282. type(e).__name__,
  283. e.args[0],
  284. )
  285. set_tag(tags.ERROR, True)
  286. set_tag("error_reason", e.args[0])
  287. raise
  288. @defer.inlineCallbacks
  289. def post_urlencoded_get_json(self, uri, args={}, headers=None):
  290. """
  291. Args:
  292. uri (str):
  293. args (dict[str, str|List[str]]): query params
  294. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  295. header name to a list of values for that header
  296. Returns:
  297. Deferred[object]: parsed json
  298. Raises:
  299. HttpResponseException: On a non-2xx HTTP response.
  300. ValueError: if the response was not JSON
  301. """
  302. # TODO: Do we ever want to log message contents?
  303. logger.debug("post_urlencoded_get_json args: %s", args)
  304. query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True).encode(
  305. "utf8"
  306. )
  307. actual_headers = {
  308. b"Content-Type": [b"application/x-www-form-urlencoded"],
  309. b"User-Agent": [self.user_agent],
  310. b"Accept": [b"application/json"],
  311. }
  312. if headers:
  313. actual_headers.update(headers)
  314. response = yield self.request(
  315. "POST", uri, headers=Headers(actual_headers), data=query_bytes
  316. )
  317. body = yield make_deferred_yieldable(readBody(response))
  318. if 200 <= response.code < 300:
  319. return json.loads(body)
  320. else:
  321. raise HttpResponseException(response.code, response.phrase, body)
  322. @defer.inlineCallbacks
  323. def post_json_get_json(self, uri, post_json, headers=None):
  324. """
  325. Args:
  326. uri (str):
  327. post_json (object):
  328. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  329. header name to a list of values for that header
  330. Returns:
  331. Deferred[object]: parsed json
  332. Raises:
  333. HttpResponseException: On a non-2xx HTTP response.
  334. ValueError: if the response was not JSON
  335. """
  336. json_str = encode_canonical_json(post_json)
  337. logger.debug("HTTP POST %s -> %s", json_str, uri)
  338. actual_headers = {
  339. b"Content-Type": [b"application/json"],
  340. b"User-Agent": [self.user_agent],
  341. b"Accept": [b"application/json"],
  342. }
  343. if headers:
  344. actual_headers.update(headers)
  345. response = yield self.request(
  346. "POST", uri, headers=Headers(actual_headers), data=json_str
  347. )
  348. body = yield make_deferred_yieldable(readBody(response))
  349. if 200 <= response.code < 300:
  350. return json.loads(body)
  351. else:
  352. raise HttpResponseException(response.code, response.phrase, body)
  353. @defer.inlineCallbacks
  354. def get_json(self, uri, args={}, headers=None):
  355. """ Gets some json from the given URI.
  356. Args:
  357. uri (str): The URI to request, not including query parameters
  358. args (dict): A dictionary used to create query strings, defaults to
  359. None.
  360. **Note**: The value of each key is assumed to be an iterable
  361. and *not* a string.
  362. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  363. header name to a list of values for that header
  364. Returns:
  365. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  366. HTTP body as JSON.
  367. Raises:
  368. HttpResponseException On a non-2xx HTTP response.
  369. ValueError: if the response was not JSON
  370. """
  371. actual_headers = {b"Accept": [b"application/json"]}
  372. if headers:
  373. actual_headers.update(headers)
  374. body = yield self.get_raw(uri, args, headers=headers)
  375. return json.loads(body)
  376. @defer.inlineCallbacks
  377. def put_json(self, uri, json_body, args={}, headers=None):
  378. """ Puts some json to the given URI.
  379. Args:
  380. uri (str): The URI to request, not including query parameters
  381. json_body (dict): The JSON to put in the HTTP body,
  382. args (dict): A dictionary used to create query strings, defaults to
  383. None.
  384. **Note**: The value of each key is assumed to be an iterable
  385. and *not* a string.
  386. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  387. header name to a list of values for that header
  388. Returns:
  389. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  390. HTTP body as JSON.
  391. Raises:
  392. HttpResponseException On a non-2xx HTTP response.
  393. ValueError: if the response was not JSON
  394. """
  395. if len(args):
  396. query_bytes = urllib.parse.urlencode(args, True)
  397. uri = "%s?%s" % (uri, query_bytes)
  398. json_str = encode_canonical_json(json_body)
  399. actual_headers = {
  400. b"Content-Type": [b"application/json"],
  401. b"User-Agent": [self.user_agent],
  402. b"Accept": [b"application/json"],
  403. }
  404. if headers:
  405. actual_headers.update(headers)
  406. response = yield self.request(
  407. "PUT", uri, headers=Headers(actual_headers), data=json_str
  408. )
  409. body = yield make_deferred_yieldable(readBody(response))
  410. if 200 <= response.code < 300:
  411. return json.loads(body)
  412. else:
  413. raise HttpResponseException(response.code, response.phrase, body)
  414. @defer.inlineCallbacks
  415. def get_raw(self, uri, args={}, headers=None):
  416. """ Gets raw text from the given URI.
  417. Args:
  418. uri (str): The URI to request, not including query parameters
  419. args (dict): A dictionary used to create query strings, defaults to
  420. None.
  421. **Note**: The value of each key is assumed to be an iterable
  422. and *not* a string.
  423. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  424. header name to a list of values for that header
  425. Returns:
  426. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  427. HTTP body at text.
  428. Raises:
  429. HttpResponseException on a non-2xx HTTP response.
  430. """
  431. if len(args):
  432. query_bytes = urllib.parse.urlencode(args, True)
  433. uri = "%s?%s" % (uri, query_bytes)
  434. actual_headers = {b"User-Agent": [self.user_agent]}
  435. if headers:
  436. actual_headers.update(headers)
  437. response = yield self.request("GET", uri, headers=Headers(actual_headers))
  438. body = yield make_deferred_yieldable(readBody(response))
  439. if 200 <= response.code < 300:
  440. return body
  441. else:
  442. raise HttpResponseException(response.code, response.phrase, body)
  443. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  444. # The two should be factored out.
  445. @defer.inlineCallbacks
  446. def get_file(self, url, output_stream, max_size=None, headers=None):
  447. """GETs a file from a given URL
  448. Args:
  449. url (str): The URL to GET
  450. output_stream (file): File to write the response body to.
  451. headers (dict[str|bytes, List[str|bytes]]|None): If not None, a map from
  452. header name to a list of values for that header
  453. Returns:
  454. A (int,dict,string,int) tuple of the file length, dict of the response
  455. headers, absolute URI of the response and HTTP response code.
  456. """
  457. actual_headers = {b"User-Agent": [self.user_agent]}
  458. if headers:
  459. actual_headers.update(headers)
  460. response = yield self.request("GET", url, headers=Headers(actual_headers))
  461. resp_headers = dict(response.headers.getAllRawHeaders())
  462. if (
  463. b"Content-Length" in resp_headers
  464. and int(resp_headers[b"Content-Length"][0]) > max_size
  465. ):
  466. logger.warning("Requested URL is too large > %r bytes" % (self.max_size,))
  467. raise SynapseError(
  468. 502,
  469. "Requested file is too large > %r bytes" % (self.max_size,),
  470. Codes.TOO_LARGE,
  471. )
  472. if response.code > 299:
  473. logger.warning("Got %d when downloading %s" % (response.code, url))
  474. raise SynapseError(502, "Got error %d" % (response.code,), Codes.UNKNOWN)
  475. # TODO: if our Content-Type is HTML or something, just read the first
  476. # N bytes into RAM rather than saving it all to disk only to read it
  477. # straight back in again
  478. try:
  479. length = yield make_deferred_yieldable(
  480. _readBodyToFile(response, output_stream, max_size)
  481. )
  482. except SynapseError:
  483. # This can happen e.g. because the body is too large.
  484. raise
  485. except Exception as e:
  486. raise_from(SynapseError(502, ("Failed to download remote body: %s" % e)), e)
  487. return (
  488. length,
  489. resp_headers,
  490. response.request.absoluteURI.decode("ascii"),
  491. response.code,
  492. )
  493. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  494. # The two should be factored out.
  495. class _ReadBodyToFileProtocol(protocol.Protocol):
  496. def __init__(self, stream, deferred, max_size):
  497. self.stream = stream
  498. self.deferred = deferred
  499. self.length = 0
  500. self.max_size = max_size
  501. def dataReceived(self, data):
  502. self.stream.write(data)
  503. self.length += len(data)
  504. if self.max_size is not None and self.length >= self.max_size:
  505. self.deferred.errback(
  506. SynapseError(
  507. 502,
  508. "Requested file is too large > %r bytes" % (self.max_size,),
  509. Codes.TOO_LARGE,
  510. )
  511. )
  512. self.deferred = defer.Deferred()
  513. self.transport.loseConnection()
  514. def connectionLost(self, reason):
  515. if reason.check(ResponseDone):
  516. self.deferred.callback(self.length)
  517. elif reason.check(PotentialDataLoss):
  518. # stolen from https://github.com/twisted/treq/pull/49/files
  519. # http://twistedmatrix.com/trac/ticket/4840
  520. self.deferred.callback(self.length)
  521. else:
  522. self.deferred.errback(reason)
  523. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  524. # The two should be factored out.
  525. def _readBodyToFile(response, stream, max_size):
  526. d = defer.Deferred()
  527. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  528. return d
  529. def encode_urlencode_args(args):
  530. return {k: encode_urlencode_arg(v) for k, v in args.items()}
  531. def encode_urlencode_arg(arg):
  532. if isinstance(arg, text_type):
  533. return arg.encode("utf-8")
  534. elif isinstance(arg, list):
  535. return [encode_urlencode_arg(i) for i in arg]
  536. else:
  537. return arg
  538. def _print_ex(e):
  539. if hasattr(e, "reasons") and e.reasons:
  540. for ex in e.reasons:
  541. _print_ex(ex)
  542. else:
  543. logger.exception(e)
  544. class InsecureInterceptableContextFactory(ssl.ContextFactory):
  545. """
  546. Factory for PyOpenSSL SSL contexts which accepts any certificate for any domain.
  547. Do not use this since it allows an attacker to intercept your communications.
  548. """
  549. def __init__(self):
  550. self._context = SSL.Context(SSL.SSLv23_METHOD)
  551. self._context.set_verify(VERIFY_NONE, lambda *_: None)
  552. def getContext(self, hostname=None, port=None):
  553. return self._context
  554. def creatorForNetloc(self, hostname, port):
  555. return self