client.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from io import BytesIO
  18. from six import raise_from, text_type
  19. from six.moves import urllib
  20. import treq
  21. from canonicaljson import encode_canonical_json, json
  22. from netaddr import IPAddress
  23. from prometheus_client import Counter
  24. from zope.interface import implementer, provider
  25. from OpenSSL import SSL
  26. from OpenSSL.SSL import VERIFY_NONE
  27. from twisted.internet import defer, protocol, ssl
  28. from twisted.internet.interfaces import (
  29. IReactorPluggableNameResolver,
  30. IResolutionReceiver,
  31. )
  32. from twisted.python.failure import Failure
  33. from twisted.web._newclient import ResponseDone
  34. from twisted.web.client import Agent, HTTPConnectionPool, PartialDownloadError, readBody
  35. from twisted.web.http import PotentialDataLoss
  36. from twisted.web.http_headers import Headers
  37. from synapse.api.errors import Codes, HttpResponseException, SynapseError
  38. from synapse.http import (
  39. QuieterFileBodyProducer,
  40. cancelled_to_request_timed_out_error,
  41. redact_uri,
  42. )
  43. from synapse.logging.context import make_deferred_yieldable
  44. from synapse.util.async_helpers import timeout_deferred
  45. from synapse.util.caches import CACHE_SIZE_FACTOR
  46. logger = logging.getLogger(__name__)
  47. outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
  48. incoming_responses_counter = Counter(
  49. "synapse_http_client_responses", "", ["method", "code"]
  50. )
  51. def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
  52. """
  53. Args:
  54. ip_address (netaddr.IPAddress)
  55. ip_whitelist (netaddr.IPSet)
  56. ip_blacklist (netaddr.IPSet)
  57. """
  58. if ip_address in ip_blacklist:
  59. if ip_whitelist is None or ip_address not in ip_whitelist:
  60. return True
  61. return False
  62. class IPBlacklistingResolver(object):
  63. """
  64. A proxy for reactor.nameResolver which only produces non-blacklisted IP
  65. addresses, preventing DNS rebinding attacks on URL preview.
  66. """
  67. def __init__(self, reactor, ip_whitelist, ip_blacklist):
  68. """
  69. Args:
  70. reactor (twisted.internet.reactor)
  71. ip_whitelist (netaddr.IPSet)
  72. ip_blacklist (netaddr.IPSet)
  73. """
  74. self._reactor = reactor
  75. self._ip_whitelist = ip_whitelist
  76. self._ip_blacklist = ip_blacklist
  77. def resolveHostName(self, recv, hostname, portNumber=0):
  78. r = recv()
  79. addresses = []
  80. def _callback():
  81. r.resolutionBegan(None)
  82. has_bad_ip = False
  83. for i in addresses:
  84. ip_address = IPAddress(i.host)
  85. if check_against_blacklist(
  86. ip_address, self._ip_whitelist, self._ip_blacklist
  87. ):
  88. logger.info(
  89. "Dropped %s from DNS resolution to %s due to blacklist"
  90. % (ip_address, hostname)
  91. )
  92. has_bad_ip = True
  93. # if we have a blacklisted IP, we'd like to raise an error to block the
  94. # request, but all we can really do from here is claim that there were no
  95. # valid results.
  96. if not has_bad_ip:
  97. for i in addresses:
  98. r.addressResolved(i)
  99. r.resolutionComplete()
  100. @provider(IResolutionReceiver)
  101. class EndpointReceiver(object):
  102. @staticmethod
  103. def resolutionBegan(resolutionInProgress):
  104. pass
  105. @staticmethod
  106. def addressResolved(address):
  107. addresses.append(address)
  108. @staticmethod
  109. def resolutionComplete():
  110. _callback()
  111. self._reactor.nameResolver.resolveHostName(
  112. EndpointReceiver, hostname, portNumber=portNumber
  113. )
  114. return r
  115. class BlacklistingAgentWrapper(Agent):
  116. """
  117. An Agent wrapper which will prevent access to IP addresses being accessed
  118. directly (without an IP address lookup).
  119. """
  120. def __init__(self, agent, reactor, ip_whitelist=None, ip_blacklist=None):
  121. """
  122. Args:
  123. agent (twisted.web.client.Agent): The Agent to wrap.
  124. reactor (twisted.internet.reactor)
  125. ip_whitelist (netaddr.IPSet)
  126. ip_blacklist (netaddr.IPSet)
  127. """
  128. self._agent = agent
  129. self._ip_whitelist = ip_whitelist
  130. self._ip_blacklist = ip_blacklist
  131. def request(self, method, uri, headers=None, bodyProducer=None):
  132. h = urllib.parse.urlparse(uri.decode("ascii"))
  133. try:
  134. ip_address = IPAddress(h.hostname)
  135. if check_against_blacklist(
  136. ip_address, self._ip_whitelist, self._ip_blacklist
  137. ):
  138. logger.info("Blocking access to %s due to blacklist" % (ip_address,))
  139. e = SynapseError(403, "IP address blocked by IP blacklist entry")
  140. return defer.fail(Failure(e))
  141. except Exception:
  142. # Not an IP
  143. pass
  144. return self._agent.request(
  145. method, uri, headers=headers, bodyProducer=bodyProducer
  146. )
  147. class SimpleHttpClient(object):
  148. """
  149. A simple, no-frills HTTP client with methods that wrap up common ways of
  150. using HTTP in Matrix
  151. """
  152. def __init__(self, hs, treq_args={}, ip_whitelist=None, ip_blacklist=None):
  153. """
  154. Args:
  155. hs (synapse.server.HomeServer)
  156. treq_args (dict): Extra keyword arguments to be given to treq.request.
  157. ip_blacklist (netaddr.IPSet): The IP addresses that are blacklisted that
  158. we may not request.
  159. ip_whitelist (netaddr.IPSet): The whitelisted IP addresses, that we can
  160. request if it were otherwise caught in a blacklist.
  161. """
  162. self.hs = hs
  163. self._ip_whitelist = ip_whitelist
  164. self._ip_blacklist = ip_blacklist
  165. self._extra_treq_args = treq_args
  166. self.user_agent = hs.version_string
  167. self.clock = hs.get_clock()
  168. if hs.config.user_agent_suffix:
  169. self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
  170. self.user_agent = self.user_agent.encode("ascii")
  171. if self._ip_blacklist:
  172. real_reactor = hs.get_reactor()
  173. # If we have an IP blacklist, we need to use a DNS resolver which
  174. # filters out blacklisted IP addresses, to prevent DNS rebinding.
  175. nameResolver = IPBlacklistingResolver(
  176. real_reactor, self._ip_whitelist, self._ip_blacklist
  177. )
  178. @implementer(IReactorPluggableNameResolver)
  179. class Reactor(object):
  180. def __getattr__(_self, attr):
  181. if attr == "nameResolver":
  182. return nameResolver
  183. else:
  184. return getattr(real_reactor, attr)
  185. self.reactor = Reactor()
  186. else:
  187. self.reactor = hs.get_reactor()
  188. # the pusher makes lots of concurrent SSL connections to sygnal, and
  189. # tends to do so in batches, so we need to allow the pool to keep
  190. # lots of idle connections around.
  191. pool = HTTPConnectionPool(self.reactor)
  192. pool.maxPersistentPerHost = max((100 * CACHE_SIZE_FACTOR, 5))
  193. pool.cachedConnectionTimeout = 2 * 60
  194. # The default context factory in Twisted 14.0.0 (which we require) is
  195. # BrowserLikePolicyForHTTPS which will do regular cert validation
  196. # 'like a browser'
  197. self.agent = Agent(
  198. self.reactor,
  199. connectTimeout=15,
  200. contextFactory=self.hs.get_http_client_context_factory(),
  201. pool=pool,
  202. )
  203. if self._ip_blacklist:
  204. # If we have an IP blacklist, we then install the blacklisting Agent
  205. # which prevents direct access to IP addresses, that are not caught
  206. # by the DNS resolution.
  207. self.agent = BlacklistingAgentWrapper(
  208. self.agent,
  209. self.reactor,
  210. ip_whitelist=self._ip_whitelist,
  211. ip_blacklist=self._ip_blacklist,
  212. )
  213. @defer.inlineCallbacks
  214. def request(self, method, uri, data=None, headers=None):
  215. """
  216. Args:
  217. method (str): HTTP method to use.
  218. uri (str): URI to query.
  219. data (bytes): Data to send in the request body, if applicable.
  220. headers (t.w.http_headers.Headers): Request headers.
  221. """
  222. # A small wrapper around self.agent.request() so we can easily attach
  223. # counters to it
  224. outgoing_requests_counter.labels(method).inc()
  225. # log request but strip `access_token` (AS requests for example include this)
  226. logger.info("Sending request %s %s", method, redact_uri(uri))
  227. try:
  228. body_producer = None
  229. if data is not None:
  230. body_producer = QuieterFileBodyProducer(BytesIO(data))
  231. request_deferred = treq.request(
  232. method,
  233. uri,
  234. agent=self.agent,
  235. data=body_producer,
  236. headers=headers,
  237. **self._extra_treq_args
  238. )
  239. request_deferred = timeout_deferred(
  240. request_deferred,
  241. 60,
  242. self.hs.get_reactor(),
  243. cancelled_to_request_timed_out_error,
  244. )
  245. response = yield make_deferred_yieldable(request_deferred)
  246. incoming_responses_counter.labels(method, response.code).inc()
  247. logger.info(
  248. "Received response to %s %s: %s", method, redact_uri(uri), response.code
  249. )
  250. return response
  251. except Exception as e:
  252. incoming_responses_counter.labels(method, "ERR").inc()
  253. logger.info(
  254. "Error sending request to %s %s: %s %s",
  255. method,
  256. redact_uri(uri),
  257. type(e).__name__,
  258. e.args[0],
  259. )
  260. raise
  261. @defer.inlineCallbacks
  262. def post_urlencoded_get_json(self, uri, args={}, headers=None):
  263. """
  264. Args:
  265. uri (str):
  266. args (dict[str, str|List[str]]): query params
  267. headers (dict[str, List[str]]|None): If not None, a map from
  268. header name to a list of values for that header
  269. Returns:
  270. Deferred[object]: parsed json
  271. Raises:
  272. HttpResponseException: On a non-2xx HTTP response.
  273. ValueError: if the response was not JSON
  274. """
  275. # TODO: Do we ever want to log message contents?
  276. logger.debug("post_urlencoded_get_json args: %s", args)
  277. query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True).encode(
  278. "utf8"
  279. )
  280. actual_headers = {
  281. b"Content-Type": [b"application/x-www-form-urlencoded"],
  282. b"User-Agent": [self.user_agent],
  283. }
  284. if headers:
  285. actual_headers.update(headers)
  286. response = yield self.request(
  287. "POST", uri, headers=Headers(actual_headers), data=query_bytes
  288. )
  289. body = yield make_deferred_yieldable(readBody(response))
  290. if 200 <= response.code < 300:
  291. return json.loads(body)
  292. else:
  293. raise HttpResponseException(response.code, response.phrase, body)
  294. @defer.inlineCallbacks
  295. def post_json_get_json(self, uri, post_json, headers=None):
  296. """
  297. Args:
  298. uri (str):
  299. post_json (object):
  300. headers (dict[str, List[str]]|None): If not None, a map from
  301. header name to a list of values for that header
  302. Returns:
  303. Deferred[object]: parsed json
  304. Raises:
  305. HttpResponseException: On a non-2xx HTTP response.
  306. ValueError: if the response was not JSON
  307. """
  308. json_str = encode_canonical_json(post_json)
  309. logger.debug("HTTP POST %s -> %s", json_str, uri)
  310. actual_headers = {
  311. b"Content-Type": [b"application/json"],
  312. b"User-Agent": [self.user_agent],
  313. }
  314. if headers:
  315. actual_headers.update(headers)
  316. response = yield self.request(
  317. "POST", uri, headers=Headers(actual_headers), data=json_str
  318. )
  319. body = yield make_deferred_yieldable(readBody(response))
  320. if 200 <= response.code < 300:
  321. return json.loads(body)
  322. else:
  323. raise HttpResponseException(response.code, response.phrase, body)
  324. @defer.inlineCallbacks
  325. def get_json(self, uri, args={}, headers=None):
  326. """ Gets some json from the given URI.
  327. Args:
  328. uri (str): The URI to request, not including query parameters
  329. args (dict): A dictionary used to create query strings, defaults to
  330. None.
  331. **Note**: The value of each key is assumed to be an iterable
  332. and *not* a string.
  333. headers (dict[str, List[str]]|None): If not None, a map from
  334. header name to a list of values for that header
  335. Returns:
  336. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  337. HTTP body as JSON.
  338. Raises:
  339. HttpResponseException On a non-2xx HTTP response.
  340. ValueError: if the response was not JSON
  341. """
  342. body = yield self.get_raw(uri, args, headers=headers)
  343. return json.loads(body)
  344. @defer.inlineCallbacks
  345. def put_json(self, uri, json_body, args={}, headers=None):
  346. """ Puts some json to the given URI.
  347. Args:
  348. uri (str): The URI to request, not including query parameters
  349. json_body (dict): The JSON to put in the HTTP body,
  350. args (dict): A dictionary used to create query strings, defaults to
  351. None.
  352. **Note**: The value of each key is assumed to be an iterable
  353. and *not* a string.
  354. headers (dict[str, List[str]]|None): If not None, a map from
  355. header name to a list of values for that header
  356. Returns:
  357. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  358. HTTP body as JSON.
  359. Raises:
  360. HttpResponseException On a non-2xx HTTP response.
  361. ValueError: if the response was not JSON
  362. """
  363. if len(args):
  364. query_bytes = urllib.parse.urlencode(args, True)
  365. uri = "%s?%s" % (uri, query_bytes)
  366. json_str = encode_canonical_json(json_body)
  367. actual_headers = {
  368. b"Content-Type": [b"application/json"],
  369. b"User-Agent": [self.user_agent],
  370. }
  371. if headers:
  372. actual_headers.update(headers)
  373. response = yield self.request(
  374. "PUT", uri, headers=Headers(actual_headers), data=json_str
  375. )
  376. body = yield make_deferred_yieldable(readBody(response))
  377. if 200 <= response.code < 300:
  378. return json.loads(body)
  379. else:
  380. raise HttpResponseException(response.code, response.phrase, body)
  381. @defer.inlineCallbacks
  382. def get_raw(self, uri, args={}, headers=None):
  383. """ Gets raw text from the given URI.
  384. Args:
  385. uri (str): The URI to request, not including query parameters
  386. args (dict): A dictionary used to create query strings, defaults to
  387. None.
  388. **Note**: The value of each key is assumed to be an iterable
  389. and *not* a string.
  390. headers (dict[str, List[str]]|None): If not None, a map from
  391. header name to a list of values for that header
  392. Returns:
  393. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  394. HTTP body at text.
  395. Raises:
  396. HttpResponseException on a non-2xx HTTP response.
  397. """
  398. if len(args):
  399. query_bytes = urllib.parse.urlencode(args, True)
  400. uri = "%s?%s" % (uri, query_bytes)
  401. actual_headers = {b"User-Agent": [self.user_agent]}
  402. if headers:
  403. actual_headers.update(headers)
  404. response = yield self.request("GET", uri, headers=Headers(actual_headers))
  405. body = yield make_deferred_yieldable(readBody(response))
  406. if 200 <= response.code < 300:
  407. return body
  408. else:
  409. raise HttpResponseException(response.code, response.phrase, body)
  410. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  411. # The two should be factored out.
  412. @defer.inlineCallbacks
  413. def get_file(self, url, output_stream, max_size=None, headers=None):
  414. """GETs a file from a given URL
  415. Args:
  416. url (str): The URL to GET
  417. output_stream (file): File to write the response body to.
  418. headers (dict[str, List[str]]|None): If not None, a map from
  419. header name to a list of values for that header
  420. Returns:
  421. A (int,dict,string,int) tuple of the file length, dict of the response
  422. headers, absolute URI of the response and HTTP response code.
  423. """
  424. actual_headers = {b"User-Agent": [self.user_agent]}
  425. if headers:
  426. actual_headers.update(headers)
  427. response = yield self.request("GET", url, headers=Headers(actual_headers))
  428. resp_headers = dict(response.headers.getAllRawHeaders())
  429. if (
  430. b"Content-Length" in resp_headers
  431. and int(resp_headers[b"Content-Length"][0]) > max_size
  432. ):
  433. logger.warn("Requested URL is too large > %r bytes" % (self.max_size,))
  434. raise SynapseError(
  435. 502,
  436. "Requested file is too large > %r bytes" % (self.max_size,),
  437. Codes.TOO_LARGE,
  438. )
  439. if response.code > 299:
  440. logger.warn("Got %d when downloading %s" % (response.code, url))
  441. raise SynapseError(502, "Got error %d" % (response.code,), Codes.UNKNOWN)
  442. # TODO: if our Content-Type is HTML or something, just read the first
  443. # N bytes into RAM rather than saving it all to disk only to read it
  444. # straight back in again
  445. try:
  446. length = yield make_deferred_yieldable(
  447. _readBodyToFile(response, output_stream, max_size)
  448. )
  449. except SynapseError:
  450. # This can happen e.g. because the body is too large.
  451. raise
  452. except Exception as e:
  453. raise_from(SynapseError(502, ("Failed to download remote body: %s" % e)), e)
  454. return (
  455. length,
  456. resp_headers,
  457. response.request.absoluteURI.decode("ascii"),
  458. response.code,
  459. )
  460. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  461. # The two should be factored out.
  462. class _ReadBodyToFileProtocol(protocol.Protocol):
  463. def __init__(self, stream, deferred, max_size):
  464. self.stream = stream
  465. self.deferred = deferred
  466. self.length = 0
  467. self.max_size = max_size
  468. def dataReceived(self, data):
  469. self.stream.write(data)
  470. self.length += len(data)
  471. if self.max_size is not None and self.length >= self.max_size:
  472. self.deferred.errback(
  473. SynapseError(
  474. 502,
  475. "Requested file is too large > %r bytes" % (self.max_size,),
  476. Codes.TOO_LARGE,
  477. )
  478. )
  479. self.deferred = defer.Deferred()
  480. self.transport.loseConnection()
  481. def connectionLost(self, reason):
  482. if reason.check(ResponseDone):
  483. self.deferred.callback(self.length)
  484. elif reason.check(PotentialDataLoss):
  485. # stolen from https://github.com/twisted/treq/pull/49/files
  486. # http://twistedmatrix.com/trac/ticket/4840
  487. self.deferred.callback(self.length)
  488. else:
  489. self.deferred.errback(reason)
  490. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  491. # The two should be factored out.
  492. def _readBodyToFile(response, stream, max_size):
  493. d = defer.Deferred()
  494. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  495. return d
  496. class CaptchaServerHttpClient(SimpleHttpClient):
  497. """
  498. Separate HTTP client for talking to google's captcha servers
  499. Only slightly special because accepts partial download responses
  500. used only by c/s api v1
  501. """
  502. @defer.inlineCallbacks
  503. def post_urlencoded_get_raw(self, url, args={}):
  504. query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True)
  505. response = yield self.request(
  506. "POST",
  507. url,
  508. data=query_bytes,
  509. headers=Headers(
  510. {
  511. b"Content-Type": [b"application/x-www-form-urlencoded"],
  512. b"User-Agent": [self.user_agent],
  513. }
  514. ),
  515. )
  516. try:
  517. body = yield make_deferred_yieldable(readBody(response))
  518. return body
  519. except PartialDownloadError as e:
  520. # twisted dislikes google's response, no content length.
  521. return e.response
  522. def encode_urlencode_args(args):
  523. return {k: encode_urlencode_arg(v) for k, v in args.items()}
  524. def encode_urlencode_arg(arg):
  525. if isinstance(arg, text_type):
  526. return arg.encode("utf-8")
  527. elif isinstance(arg, list):
  528. return [encode_urlencode_arg(i) for i in arg]
  529. else:
  530. return arg
  531. def _print_ex(e):
  532. if hasattr(e, "reasons") and e.reasons:
  533. for ex in e.reasons:
  534. _print_ex(ex)
  535. else:
  536. logger.exception(e)
  537. class InsecureInterceptableContextFactory(ssl.ContextFactory):
  538. """
  539. Factory for PyOpenSSL SSL contexts which accepts any certificate for any domain.
  540. Do not use this since it allows an attacker to intercept your communications.
  541. """
  542. def __init__(self):
  543. self._context = SSL.Context(SSL.SSLv23_METHOD)
  544. self._context.set_verify(VERIFY_NONE, lambda *_: None)
  545. def getContext(self, hostname=None, port=None):
  546. return self._context
  547. def creatorForNetloc(self, hostname, port):
  548. return self