client.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from six import text_type
  18. from six.moves import urllib
  19. import treq
  20. from canonicaljson import encode_canonical_json, json
  21. from netaddr import IPAddress
  22. from prometheus_client import Counter
  23. from zope.interface import implementer, provider
  24. from OpenSSL import SSL
  25. from OpenSSL.SSL import VERIFY_NONE
  26. from twisted.internet import defer, protocol, ssl
  27. from twisted.internet.interfaces import (
  28. IReactorPluggableNameResolver,
  29. IResolutionReceiver,
  30. )
  31. from twisted.python.failure import Failure
  32. from twisted.web._newclient import ResponseDone
  33. from twisted.web.client import Agent, HTTPConnectionPool, PartialDownloadError, readBody
  34. from twisted.web.http import PotentialDataLoss
  35. from twisted.web.http_headers import Headers
  36. from synapse.api.errors import Codes, HttpResponseException, SynapseError
  37. from synapse.http import cancelled_to_request_timed_out_error, redact_uri
  38. from synapse.util.async_helpers import timeout_deferred
  39. from synapse.util.caches import CACHE_SIZE_FACTOR
  40. from synapse.util.logcontext import make_deferred_yieldable
  41. logger = logging.getLogger(__name__)
  42. outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
  43. incoming_responses_counter = Counter(
  44. "synapse_http_client_responses", "", ["method", "code"]
  45. )
  46. def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
  47. """
  48. Args:
  49. ip_address (netaddr.IPAddress)
  50. ip_whitelist (netaddr.IPSet)
  51. ip_blacklist (netaddr.IPSet)
  52. """
  53. if ip_address in ip_blacklist:
  54. if ip_whitelist is None or ip_address not in ip_whitelist:
  55. return True
  56. return False
  57. class IPBlacklistingResolver(object):
  58. """
  59. A proxy for reactor.nameResolver which only produces non-blacklisted IP
  60. addresses, preventing DNS rebinding attacks on URL preview.
  61. """
  62. def __init__(self, reactor, ip_whitelist, ip_blacklist):
  63. """
  64. Args:
  65. reactor (twisted.internet.reactor)
  66. ip_whitelist (netaddr.IPSet)
  67. ip_blacklist (netaddr.IPSet)
  68. """
  69. self._reactor = reactor
  70. self._ip_whitelist = ip_whitelist
  71. self._ip_blacklist = ip_blacklist
  72. def resolveHostName(self, recv, hostname, portNumber=0):
  73. r = recv()
  74. d = defer.Deferred()
  75. addresses = []
  76. @provider(IResolutionReceiver)
  77. class EndpointReceiver(object):
  78. @staticmethod
  79. def resolutionBegan(resolutionInProgress):
  80. pass
  81. @staticmethod
  82. def addressResolved(address):
  83. ip_address = IPAddress(address.host)
  84. if check_against_blacklist(
  85. ip_address, self._ip_whitelist, self._ip_blacklist
  86. ):
  87. logger.info(
  88. "Dropped %s from DNS resolution to %s" % (ip_address, hostname)
  89. )
  90. raise SynapseError(403, "IP address blocked by IP blacklist entry")
  91. addresses.append(address)
  92. @staticmethod
  93. def resolutionComplete():
  94. d.callback(addresses)
  95. self._reactor.nameResolver.resolveHostName(
  96. EndpointReceiver, hostname, portNumber=portNumber
  97. )
  98. def _callback(addrs):
  99. r.resolutionBegan(None)
  100. for i in addrs:
  101. r.addressResolved(i)
  102. r.resolutionComplete()
  103. d.addCallback(_callback)
  104. return r
  105. class BlacklistingAgentWrapper(Agent):
  106. """
  107. An Agent wrapper which will prevent access to IP addresses being accessed
  108. directly (without an IP address lookup).
  109. """
  110. def __init__(self, agent, reactor, ip_whitelist=None, ip_blacklist=None):
  111. """
  112. Args:
  113. agent (twisted.web.client.Agent): The Agent to wrap.
  114. reactor (twisted.internet.reactor)
  115. ip_whitelist (netaddr.IPSet)
  116. ip_blacklist (netaddr.IPSet)
  117. """
  118. self._agent = agent
  119. self._ip_whitelist = ip_whitelist
  120. self._ip_blacklist = ip_blacklist
  121. def request(self, method, uri, headers=None, bodyProducer=None):
  122. h = urllib.parse.urlparse(uri.decode('ascii'))
  123. try:
  124. ip_address = IPAddress(h.hostname)
  125. if check_against_blacklist(
  126. ip_address, self._ip_whitelist, self._ip_blacklist
  127. ):
  128. logger.info(
  129. "Blocking access to %s because of blacklist" % (ip_address,)
  130. )
  131. e = SynapseError(403, "IP address blocked by IP blacklist entry")
  132. return defer.fail(Failure(e))
  133. except Exception:
  134. # Not an IP
  135. pass
  136. return self._agent.request(
  137. method, uri, headers=headers, bodyProducer=bodyProducer
  138. )
  139. class SimpleHttpClient(object):
  140. """
  141. A simple, no-frills HTTP client with methods that wrap up common ways of
  142. using HTTP in Matrix
  143. """
  144. def __init__(self, hs, treq_args={}, ip_whitelist=None, ip_blacklist=None):
  145. """
  146. Args:
  147. hs (synapse.server.HomeServer)
  148. treq_args (dict): Extra keyword arguments to be given to treq.request.
  149. ip_blacklist (netaddr.IPSet): The IP addresses that are blacklisted that
  150. we may not request.
  151. ip_whitelist (netaddr.IPSet): The whitelisted IP addresses, that we can
  152. request if it were otherwise caught in a blacklist.
  153. """
  154. self.hs = hs
  155. self._ip_whitelist = ip_whitelist
  156. self._ip_blacklist = ip_blacklist
  157. self._extra_treq_args = treq_args
  158. self.user_agent = hs.version_string
  159. self.clock = hs.get_clock()
  160. if hs.config.user_agent_suffix:
  161. self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
  162. self.user_agent = self.user_agent.encode('ascii')
  163. if self._ip_blacklist:
  164. real_reactor = hs.get_reactor()
  165. # If we have an IP blacklist, we need to use a DNS resolver which
  166. # filters out blacklisted IP addresses, to prevent DNS rebinding.
  167. nameResolver = IPBlacklistingResolver(
  168. real_reactor, self._ip_whitelist, self._ip_blacklist
  169. )
  170. @implementer(IReactorPluggableNameResolver)
  171. class Reactor(object):
  172. def __getattr__(_self, attr):
  173. if attr == "nameResolver":
  174. return nameResolver
  175. else:
  176. return getattr(real_reactor, attr)
  177. self.reactor = Reactor()
  178. else:
  179. self.reactor = hs.get_reactor()
  180. # the pusher makes lots of concurrent SSL connections to sygnal, and
  181. # tends to do so in batches, so we need to allow the pool to keep
  182. # lots of idle connections around.
  183. pool = HTTPConnectionPool(self.reactor)
  184. pool.maxPersistentPerHost = max((100 * CACHE_SIZE_FACTOR, 5))
  185. pool.cachedConnectionTimeout = 2 * 60
  186. # The default context factory in Twisted 14.0.0 (which we require) is
  187. # BrowserLikePolicyForHTTPS which will do regular cert validation
  188. # 'like a browser'
  189. self.agent = Agent(
  190. self.reactor,
  191. connectTimeout=15,
  192. contextFactory=self.hs.get_http_client_context_factory(),
  193. pool=pool,
  194. )
  195. if self._ip_blacklist:
  196. # If we have an IP blacklist, we then install the blacklisting Agent
  197. # which prevents direct access to IP addresses, that are not caught
  198. # by the DNS resolution.
  199. self.agent = BlacklistingAgentWrapper(
  200. self.agent,
  201. self.reactor,
  202. ip_whitelist=self._ip_whitelist,
  203. ip_blacklist=self._ip_blacklist,
  204. )
  205. @defer.inlineCallbacks
  206. def request(self, method, uri, data=b'', headers=None):
  207. """
  208. Args:
  209. method (str): HTTP method to use.
  210. uri (str): URI to query.
  211. data (bytes): Data to send in the request body, if applicable.
  212. headers (t.w.http_headers.Headers): Request headers.
  213. Raises:
  214. SynapseError: If the IP is blacklisted.
  215. """
  216. # A small wrapper around self.agent.request() so we can easily attach
  217. # counters to it
  218. outgoing_requests_counter.labels(method).inc()
  219. # log request but strip `access_token` (AS requests for example include this)
  220. logger.info("Sending request %s %s", method, redact_uri(uri))
  221. try:
  222. request_deferred = treq.request(
  223. method,
  224. uri,
  225. agent=self.agent,
  226. data=data,
  227. headers=headers,
  228. **self._extra_treq_args
  229. )
  230. request_deferred = timeout_deferred(
  231. request_deferred,
  232. 60,
  233. self.hs.get_reactor(),
  234. cancelled_to_request_timed_out_error,
  235. )
  236. response = yield make_deferred_yieldable(request_deferred)
  237. incoming_responses_counter.labels(method, response.code).inc()
  238. logger.info(
  239. "Received response to %s %s: %s", method, redact_uri(uri), response.code
  240. )
  241. defer.returnValue(response)
  242. except Exception as e:
  243. incoming_responses_counter.labels(method, "ERR").inc()
  244. logger.info(
  245. "Error sending request to %s %s: %s %s",
  246. method,
  247. redact_uri(uri),
  248. type(e).__name__,
  249. e.args[0],
  250. )
  251. raise
  252. @defer.inlineCallbacks
  253. def post_urlencoded_get_json(self, uri, args={}, headers=None):
  254. """
  255. Args:
  256. uri (str):
  257. args (dict[str, str|List[str]]): query params
  258. headers (dict[str, List[str]]|None): If not None, a map from
  259. header name to a list of values for that header
  260. Returns:
  261. Deferred[object]: parsed json
  262. Raises:
  263. HttpResponseException: On a non-2xx HTTP response.
  264. ValueError: if the response was not JSON
  265. """
  266. # TODO: Do we ever want to log message contents?
  267. logger.debug("post_urlencoded_get_json args: %s", args)
  268. query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True).encode(
  269. "utf8"
  270. )
  271. actual_headers = {
  272. b"Content-Type": [b"application/x-www-form-urlencoded"],
  273. b"User-Agent": [self.user_agent],
  274. }
  275. if headers:
  276. actual_headers.update(headers)
  277. response = yield self.request(
  278. "POST", uri, headers=Headers(actual_headers), data=query_bytes
  279. )
  280. if 200 <= response.code < 300:
  281. body = yield make_deferred_yieldable(treq.json_content(response))
  282. defer.returnValue(body)
  283. else:
  284. raise HttpResponseException(response.code, response.phrase, body)
  285. @defer.inlineCallbacks
  286. def post_json_get_json(self, uri, post_json, headers=None):
  287. """
  288. Args:
  289. uri (str):
  290. post_json (object):
  291. headers (dict[str, List[str]]|None): If not None, a map from
  292. header name to a list of values for that header
  293. Returns:
  294. Deferred[object]: parsed json
  295. Raises:
  296. HttpResponseException: On a non-2xx HTTP response.
  297. ValueError: if the response was not JSON
  298. """
  299. json_str = encode_canonical_json(post_json)
  300. logger.debug("HTTP POST %s -> %s", json_str, uri)
  301. actual_headers = {
  302. b"Content-Type": [b"application/json"],
  303. b"User-Agent": [self.user_agent],
  304. }
  305. if headers:
  306. actual_headers.update(headers)
  307. response = yield self.request(
  308. "POST", uri, headers=Headers(actual_headers), data=json_str
  309. )
  310. body = yield make_deferred_yieldable(readBody(response))
  311. if 200 <= response.code < 300:
  312. defer.returnValue(json.loads(body))
  313. else:
  314. raise HttpResponseException(response.code, response.phrase, body)
  315. @defer.inlineCallbacks
  316. def get_json(self, uri, args={}, headers=None):
  317. """ Gets some json from the given URI.
  318. Args:
  319. uri (str): The URI to request, not including query parameters
  320. args (dict): A dictionary used to create query strings, defaults to
  321. None.
  322. **Note**: The value of each key is assumed to be an iterable
  323. and *not* a string.
  324. headers (dict[str, List[str]]|None): If not None, a map from
  325. header name to a list of values for that header
  326. Returns:
  327. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  328. HTTP body as JSON.
  329. Raises:
  330. HttpResponseException On a non-2xx HTTP response.
  331. ValueError: if the response was not JSON
  332. """
  333. body = yield self.get_raw(uri, args, headers=headers)
  334. defer.returnValue(json.loads(body))
  335. @defer.inlineCallbacks
  336. def put_json(self, uri, json_body, args={}, headers=None):
  337. """ Puts some json to the given URI.
  338. Args:
  339. uri (str): The URI to request, not including query parameters
  340. json_body (dict): The JSON to put in the HTTP body,
  341. args (dict): A dictionary used to create query strings, defaults to
  342. None.
  343. **Note**: The value of each key is assumed to be an iterable
  344. and *not* a string.
  345. headers (dict[str, List[str]]|None): If not None, a map from
  346. header name to a list of values for that header
  347. Returns:
  348. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  349. HTTP body as JSON.
  350. Raises:
  351. HttpResponseException On a non-2xx HTTP response.
  352. ValueError: if the response was not JSON
  353. """
  354. if len(args):
  355. query_bytes = urllib.parse.urlencode(args, True)
  356. uri = "%s?%s" % (uri, query_bytes)
  357. json_str = encode_canonical_json(json_body)
  358. actual_headers = {
  359. b"Content-Type": [b"application/json"],
  360. b"User-Agent": [self.user_agent],
  361. }
  362. if headers:
  363. actual_headers.update(headers)
  364. response = yield self.request(
  365. "PUT", uri, headers=Headers(actual_headers), data=json_str
  366. )
  367. body = yield make_deferred_yieldable(readBody(response))
  368. if 200 <= response.code < 300:
  369. defer.returnValue(json.loads(body))
  370. else:
  371. raise HttpResponseException(response.code, response.phrase, body)
  372. @defer.inlineCallbacks
  373. def get_raw(self, uri, args={}, headers=None):
  374. """ Gets raw text from the given URI.
  375. Args:
  376. uri (str): The URI to request, not including query parameters
  377. args (dict): A dictionary used to create query strings, defaults to
  378. None.
  379. **Note**: The value of each key is assumed to be an iterable
  380. and *not* a string.
  381. headers (dict[str, List[str]]|None): If not None, a map from
  382. header name to a list of values for that header
  383. Returns:
  384. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  385. HTTP body at text.
  386. Raises:
  387. HttpResponseException on a non-2xx HTTP response.
  388. """
  389. if len(args):
  390. query_bytes = urllib.parse.urlencode(args, True)
  391. uri = "%s?%s" % (uri, query_bytes)
  392. actual_headers = {b"User-Agent": [self.user_agent]}
  393. if headers:
  394. actual_headers.update(headers)
  395. response = yield self.request("GET", uri, headers=Headers(actual_headers))
  396. body = yield make_deferred_yieldable(readBody(response))
  397. if 200 <= response.code < 300:
  398. defer.returnValue(body)
  399. else:
  400. raise HttpResponseException(response.code, response.phrase, body)
  401. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  402. # The two should be factored out.
  403. @defer.inlineCallbacks
  404. def get_file(self, url, output_stream, max_size=None, headers=None):
  405. """GETs a file from a given URL
  406. Args:
  407. url (str): The URL to GET
  408. output_stream (file): File to write the response body to.
  409. headers (dict[str, List[str]]|None): If not None, a map from
  410. header name to a list of values for that header
  411. Returns:
  412. A (int,dict,string,int) tuple of the file length, dict of the response
  413. headers, absolute URI of the response and HTTP response code.
  414. """
  415. actual_headers = {b"User-Agent": [self.user_agent]}
  416. if headers:
  417. actual_headers.update(headers)
  418. response = yield self.request("GET", url, headers=Headers(actual_headers))
  419. resp_headers = dict(response.headers.getAllRawHeaders())
  420. if (
  421. b'Content-Length' in resp_headers
  422. and int(resp_headers[b'Content-Length'][0]) > max_size
  423. ):
  424. logger.warn("Requested URL is too large > %r bytes" % (self.max_size,))
  425. raise SynapseError(
  426. 502,
  427. "Requested file is too large > %r bytes" % (self.max_size,),
  428. Codes.TOO_LARGE,
  429. )
  430. if response.code > 299:
  431. logger.warn("Got %d when downloading %s" % (response.code, url))
  432. raise SynapseError(502, "Got error %d" % (response.code,), Codes.UNKNOWN)
  433. # TODO: if our Content-Type is HTML or something, just read the first
  434. # N bytes into RAM rather than saving it all to disk only to read it
  435. # straight back in again
  436. try:
  437. length = yield make_deferred_yieldable(
  438. _readBodyToFile(response, output_stream, max_size)
  439. )
  440. except Exception as e:
  441. logger.exception("Failed to download body")
  442. raise SynapseError(
  443. 502, ("Failed to download remote body: %s" % e), Codes.UNKNOWN
  444. )
  445. defer.returnValue(
  446. (
  447. length,
  448. resp_headers,
  449. response.request.absoluteURI.decode('ascii'),
  450. response.code,
  451. )
  452. )
  453. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  454. # The two should be factored out.
  455. class _ReadBodyToFileProtocol(protocol.Protocol):
  456. def __init__(self, stream, deferred, max_size):
  457. self.stream = stream
  458. self.deferred = deferred
  459. self.length = 0
  460. self.max_size = max_size
  461. def dataReceived(self, data):
  462. self.stream.write(data)
  463. self.length += len(data)
  464. if self.max_size is not None and self.length >= self.max_size:
  465. self.deferred.errback(
  466. SynapseError(
  467. 502,
  468. "Requested file is too large > %r bytes" % (self.max_size,),
  469. Codes.TOO_LARGE,
  470. )
  471. )
  472. self.deferred = defer.Deferred()
  473. self.transport.loseConnection()
  474. def connectionLost(self, reason):
  475. if reason.check(ResponseDone):
  476. self.deferred.callback(self.length)
  477. elif reason.check(PotentialDataLoss):
  478. # stolen from https://github.com/twisted/treq/pull/49/files
  479. # http://twistedmatrix.com/trac/ticket/4840
  480. self.deferred.callback(self.length)
  481. else:
  482. self.deferred.errback(reason)
  483. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  484. # The two should be factored out.
  485. def _readBodyToFile(response, stream, max_size):
  486. d = defer.Deferred()
  487. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  488. return d
  489. class CaptchaServerHttpClient(SimpleHttpClient):
  490. """
  491. Separate HTTP client for talking to google's captcha servers
  492. Only slightly special because accepts partial download responses
  493. used only by c/s api v1
  494. """
  495. @defer.inlineCallbacks
  496. def post_urlencoded_get_raw(self, url, args={}):
  497. query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True)
  498. response = yield self.request(
  499. "POST",
  500. url,
  501. data=query_bytes,
  502. headers=Headers(
  503. {
  504. b"Content-Type": [b"application/x-www-form-urlencoded"],
  505. b"User-Agent": [self.user_agent],
  506. }
  507. ),
  508. )
  509. try:
  510. body = yield make_deferred_yieldable(readBody(response))
  511. defer.returnValue(body)
  512. except PartialDownloadError as e:
  513. # twisted dislikes google's response, no content length.
  514. defer.returnValue(e.response)
  515. def encode_urlencode_args(args):
  516. return {k: encode_urlencode_arg(v) for k, v in args.items()}
  517. def encode_urlencode_arg(arg):
  518. if isinstance(arg, text_type):
  519. return arg.encode('utf-8')
  520. elif isinstance(arg, list):
  521. return [encode_urlencode_arg(i) for i in arg]
  522. else:
  523. return arg
  524. def _print_ex(e):
  525. if hasattr(e, "reasons") and e.reasons:
  526. for ex in e.reasons:
  527. _print_ex(ex)
  528. else:
  529. logger.exception(e)
  530. class InsecureInterceptableContextFactory(ssl.ContextFactory):
  531. """
  532. Factory for PyOpenSSL SSL contexts which accepts any certificate for any domain.
  533. Do not use this since it allows an attacker to intercept your communications.
  534. """
  535. def __init__(self):
  536. self._context = SSL.Context(SSL.SSLv23_METHOD)
  537. self._context.set_verify(VERIFY_NONE, lambda *_: None)
  538. def getContext(self, hostname=None, port=None):
  539. return self._context
  540. def creatorForNetloc(self, hostname, port):
  541. return self