client.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. import urllib
  18. from six import StringIO
  19. from canonicaljson import encode_canonical_json, json
  20. from prometheus_client import Counter
  21. from OpenSSL import SSL
  22. from OpenSSL.SSL import VERIFY_NONE
  23. from twisted.internet import defer, protocol, reactor, ssl, task
  24. from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
  25. from twisted.web._newclient import ResponseDone
  26. from twisted.web.client import Agent, BrowserLikeRedirectAgent, ContentDecoderAgent
  27. from twisted.web.client import FileBodyProducer as TwistedFileBodyProducer
  28. from twisted.web.client import (
  29. GzipDecoder,
  30. HTTPConnectionPool,
  31. PartialDownloadError,
  32. readBody,
  33. )
  34. from twisted.web.http import PotentialDataLoss
  35. from twisted.web.http_headers import Headers
  36. from synapse.api.errors import (
  37. CodeMessageException,
  38. Codes,
  39. MatrixCodeMessageException,
  40. SynapseError,
  41. )
  42. from synapse.http import cancelled_to_request_timed_out_error, redact_uri
  43. from synapse.http.endpoint import SpiderEndpoint
  44. from synapse.util.async import add_timeout_to_deferred
  45. from synapse.util.caches import CACHE_SIZE_FACTOR
  46. from synapse.util.logcontext import make_deferred_yieldable
  47. logger = logging.getLogger(__name__)
  48. outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
  49. incoming_responses_counter = Counter("synapse_http_client_responses", "",
  50. ["method", "code"])
  51. class SimpleHttpClient(object):
  52. """
  53. A simple, no-frills HTTP client with methods that wrap up common ways of
  54. using HTTP in Matrix
  55. """
  56. def __init__(self, hs):
  57. self.hs = hs
  58. pool = HTTPConnectionPool(reactor)
  59. # the pusher makes lots of concurrent SSL connections to sygnal, and
  60. # tends to do so in batches, so we need to allow the pool to keep lots
  61. # of idle connections around.
  62. pool.maxPersistentPerHost = max((100 * CACHE_SIZE_FACTOR, 5))
  63. pool.cachedConnectionTimeout = 2 * 60
  64. # The default context factory in Twisted 14.0.0 (which we require) is
  65. # BrowserLikePolicyForHTTPS which will do regular cert validation
  66. # 'like a browser'
  67. self.agent = Agent(
  68. reactor,
  69. connectTimeout=15,
  70. contextFactory=hs.get_http_client_context_factory(),
  71. pool=pool,
  72. )
  73. self.user_agent = hs.version_string
  74. self.clock = hs.get_clock()
  75. if hs.config.user_agent_suffix:
  76. self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix,)
  77. @defer.inlineCallbacks
  78. def request(self, method, uri, *args, **kwargs):
  79. # A small wrapper around self.agent.request() so we can easily attach
  80. # counters to it
  81. outgoing_requests_counter.labels(method).inc()
  82. # log request but strip `access_token` (AS requests for example include this)
  83. logger.info("Sending request %s %s", method, redact_uri(uri))
  84. try:
  85. request_deferred = self.agent.request(
  86. method, uri, *args, **kwargs
  87. )
  88. add_timeout_to_deferred(
  89. request_deferred, 60, self.hs.get_reactor(),
  90. cancelled_to_request_timed_out_error,
  91. )
  92. response = yield make_deferred_yieldable(request_deferred)
  93. incoming_responses_counter.labels(method, response.code).inc()
  94. logger.info(
  95. "Received response to %s %s: %s",
  96. method, redact_uri(uri), response.code
  97. )
  98. defer.returnValue(response)
  99. except Exception as e:
  100. incoming_responses_counter.labels(method, "ERR").inc()
  101. logger.info(
  102. "Error sending request to %s %s: %s %s",
  103. method, redact_uri(uri), type(e).__name__, e.message
  104. )
  105. raise
  106. @defer.inlineCallbacks
  107. def post_urlencoded_get_json(self, uri, args={}, headers=None):
  108. """
  109. Args:
  110. uri (str):
  111. args (dict[str, str|List[str]]): query params
  112. headers (dict[str, List[str]]|None): If not None, a map from
  113. header name to a list of values for that header
  114. Returns:
  115. Deferred[object]: parsed json
  116. """
  117. # TODO: Do we ever want to log message contents?
  118. logger.debug("post_urlencoded_get_json args: %s", args)
  119. query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
  120. actual_headers = {
  121. b"Content-Type": [b"application/x-www-form-urlencoded"],
  122. b"User-Agent": [self.user_agent],
  123. }
  124. if headers:
  125. actual_headers.update(headers)
  126. response = yield self.request(
  127. "POST",
  128. uri.encode("ascii"),
  129. headers=Headers(actual_headers),
  130. bodyProducer=FileBodyProducer(StringIO(query_bytes))
  131. )
  132. body = yield make_deferred_yieldable(readBody(response))
  133. defer.returnValue(json.loads(body))
  134. @defer.inlineCallbacks
  135. def post_json_get_json(self, uri, post_json, headers=None):
  136. """
  137. Args:
  138. uri (str):
  139. post_json (object):
  140. headers (dict[str, List[str]]|None): If not None, a map from
  141. header name to a list of values for that header
  142. Returns:
  143. Deferred[object]: parsed json
  144. """
  145. json_str = encode_canonical_json(post_json)
  146. logger.debug("HTTP POST %s -> %s", json_str, uri)
  147. actual_headers = {
  148. b"Content-Type": [b"application/json"],
  149. b"User-Agent": [self.user_agent],
  150. }
  151. if headers:
  152. actual_headers.update(headers)
  153. response = yield self.request(
  154. "POST",
  155. uri.encode("ascii"),
  156. headers=Headers(actual_headers),
  157. bodyProducer=FileBodyProducer(StringIO(json_str))
  158. )
  159. body = yield make_deferred_yieldable(readBody(response))
  160. if 200 <= response.code < 300:
  161. defer.returnValue(json.loads(body))
  162. else:
  163. raise self._exceptionFromFailedRequest(response, body)
  164. defer.returnValue(json.loads(body))
  165. @defer.inlineCallbacks
  166. def get_json(self, uri, args={}, headers=None):
  167. """ Gets some json from the given URI.
  168. Args:
  169. uri (str): The URI to request, not including query parameters
  170. args (dict): A dictionary used to create query strings, defaults to
  171. None.
  172. **Note**: The value of each key is assumed to be an iterable
  173. and *not* a string.
  174. headers (dict[str, List[str]]|None): If not None, a map from
  175. header name to a list of values for that header
  176. Returns:
  177. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  178. HTTP body as JSON.
  179. Raises:
  180. On a non-2xx HTTP response. The response body will be used as the
  181. error message.
  182. """
  183. try:
  184. body = yield self.get_raw(uri, args, headers=headers)
  185. defer.returnValue(json.loads(body))
  186. except CodeMessageException as e:
  187. raise self._exceptionFromFailedRequest(e.code, e.msg)
  188. @defer.inlineCallbacks
  189. def put_json(self, uri, json_body, args={}, headers=None):
  190. """ Puts some json to the given URI.
  191. Args:
  192. uri (str): The URI to request, not including query parameters
  193. json_body (dict): The JSON to put in the HTTP body,
  194. args (dict): A dictionary used to create query strings, defaults to
  195. None.
  196. **Note**: The value of each key is assumed to be an iterable
  197. and *not* a string.
  198. headers (dict[str, List[str]]|None): If not None, a map from
  199. header name to a list of values for that header
  200. Returns:
  201. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  202. HTTP body as JSON.
  203. Raises:
  204. On a non-2xx HTTP response.
  205. """
  206. if len(args):
  207. query_bytes = urllib.urlencode(args, True)
  208. uri = "%s?%s" % (uri, query_bytes)
  209. json_str = encode_canonical_json(json_body)
  210. actual_headers = {
  211. b"Content-Type": [b"application/json"],
  212. b"User-Agent": [self.user_agent],
  213. }
  214. if headers:
  215. actual_headers.update(headers)
  216. response = yield self.request(
  217. "PUT",
  218. uri.encode("ascii"),
  219. headers=Headers(actual_headers),
  220. bodyProducer=FileBodyProducer(StringIO(json_str))
  221. )
  222. body = yield make_deferred_yieldable(readBody(response))
  223. if 200 <= response.code < 300:
  224. defer.returnValue(json.loads(body))
  225. else:
  226. # NB: This is explicitly not json.loads(body)'d because the contract
  227. # of CodeMessageException is a *string* message. Callers can always
  228. # load it into JSON if they want.
  229. raise CodeMessageException(response.code, body)
  230. @defer.inlineCallbacks
  231. def get_raw(self, uri, args={}, headers=None):
  232. """ Gets raw text from the given URI.
  233. Args:
  234. uri (str): The URI to request, not including query parameters
  235. args (dict): A dictionary used to create query strings, defaults to
  236. None.
  237. **Note**: The value of each key is assumed to be an iterable
  238. and *not* a string.
  239. headers (dict[str, List[str]]|None): If not None, a map from
  240. header name to a list of values for that header
  241. Returns:
  242. Deferred: Succeeds when we get *any* 2xx HTTP response, with the
  243. HTTP body at text.
  244. Raises:
  245. On a non-2xx HTTP response. The response body will be used as the
  246. error message.
  247. """
  248. if len(args):
  249. query_bytes = urllib.urlencode(args, True)
  250. uri = "%s?%s" % (uri, query_bytes)
  251. actual_headers = {
  252. b"User-Agent": [self.user_agent],
  253. }
  254. if headers:
  255. actual_headers.update(headers)
  256. response = yield self.request(
  257. "GET",
  258. uri.encode("ascii"),
  259. headers=Headers(actual_headers),
  260. )
  261. body = yield make_deferred_yieldable(readBody(response))
  262. if 200 <= response.code < 300:
  263. defer.returnValue(body)
  264. else:
  265. raise CodeMessageException(response.code, body)
  266. def _exceptionFromFailedRequest(self, response, body):
  267. try:
  268. jsonBody = json.loads(body)
  269. errcode = jsonBody['errcode']
  270. error = jsonBody['error']
  271. return MatrixCodeMessageException(response.code, error, errcode)
  272. except (ValueError, KeyError):
  273. return CodeMessageException(response.code, body)
  274. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  275. # The two should be factored out.
  276. @defer.inlineCallbacks
  277. def get_file(self, url, output_stream, max_size=None, headers=None):
  278. """GETs a file from a given URL
  279. Args:
  280. url (str): The URL to GET
  281. output_stream (file): File to write the response body to.
  282. headers (dict[str, List[str]]|None): If not None, a map from
  283. header name to a list of values for that header
  284. Returns:
  285. A (int,dict,string,int) tuple of the file length, dict of the response
  286. headers, absolute URI of the response and HTTP response code.
  287. """
  288. actual_headers = {
  289. b"User-Agent": [self.user_agent],
  290. }
  291. if headers:
  292. actual_headers.update(headers)
  293. response = yield self.request(
  294. "GET",
  295. url.encode("ascii"),
  296. headers=Headers(actual_headers),
  297. )
  298. resp_headers = dict(response.headers.getAllRawHeaders())
  299. if 'Content-Length' in resp_headers and resp_headers['Content-Length'] > max_size:
  300. logger.warn("Requested URL is too large > %r bytes" % (self.max_size,))
  301. raise SynapseError(
  302. 502,
  303. "Requested file is too large > %r bytes" % (self.max_size,),
  304. Codes.TOO_LARGE,
  305. )
  306. if response.code > 299:
  307. logger.warn("Got %d when downloading %s" % (response.code, url))
  308. raise SynapseError(
  309. 502,
  310. "Got error %d" % (response.code,),
  311. Codes.UNKNOWN,
  312. )
  313. # TODO: if our Content-Type is HTML or something, just read the first
  314. # N bytes into RAM rather than saving it all to disk only to read it
  315. # straight back in again
  316. try:
  317. length = yield make_deferred_yieldable(_readBodyToFile(
  318. response, output_stream, max_size,
  319. ))
  320. except Exception as e:
  321. logger.exception("Failed to download body")
  322. raise SynapseError(
  323. 502,
  324. ("Failed to download remote body: %s" % e),
  325. Codes.UNKNOWN,
  326. )
  327. defer.returnValue(
  328. (length, resp_headers, response.request.absoluteURI, response.code),
  329. )
  330. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  331. # The two should be factored out.
  332. class _ReadBodyToFileProtocol(protocol.Protocol):
  333. def __init__(self, stream, deferred, max_size):
  334. self.stream = stream
  335. self.deferred = deferred
  336. self.length = 0
  337. self.max_size = max_size
  338. def dataReceived(self, data):
  339. self.stream.write(data)
  340. self.length += len(data)
  341. if self.max_size is not None and self.length >= self.max_size:
  342. self.deferred.errback(SynapseError(
  343. 502,
  344. "Requested file is too large > %r bytes" % (self.max_size,),
  345. Codes.TOO_LARGE,
  346. ))
  347. self.deferred = defer.Deferred()
  348. self.transport.loseConnection()
  349. def connectionLost(self, reason):
  350. if reason.check(ResponseDone):
  351. self.deferred.callback(self.length)
  352. elif reason.check(PotentialDataLoss):
  353. # stolen from https://github.com/twisted/treq/pull/49/files
  354. # http://twistedmatrix.com/trac/ticket/4840
  355. self.deferred.callback(self.length)
  356. else:
  357. self.deferred.errback(reason)
  358. # XXX: FIXME: This is horribly copy-pasted from matrixfederationclient.
  359. # The two should be factored out.
  360. def _readBodyToFile(response, stream, max_size):
  361. d = defer.Deferred()
  362. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  363. return d
  364. class CaptchaServerHttpClient(SimpleHttpClient):
  365. """
  366. Separate HTTP client for talking to google's captcha servers
  367. Only slightly special because accepts partial download responses
  368. used only by c/s api v1
  369. """
  370. @defer.inlineCallbacks
  371. def post_urlencoded_get_raw(self, url, args={}):
  372. query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
  373. response = yield self.request(
  374. "POST",
  375. url.encode("ascii"),
  376. bodyProducer=FileBodyProducer(StringIO(query_bytes)),
  377. headers=Headers({
  378. b"Content-Type": [b"application/x-www-form-urlencoded"],
  379. b"User-Agent": [self.user_agent],
  380. })
  381. )
  382. try:
  383. body = yield make_deferred_yieldable(readBody(response))
  384. defer.returnValue(body)
  385. except PartialDownloadError as e:
  386. # twisted dislikes google's response, no content length.
  387. defer.returnValue(e.response)
  388. class SpiderEndpointFactory(object):
  389. def __init__(self, hs):
  390. self.blacklist = hs.config.url_preview_ip_range_blacklist
  391. self.whitelist = hs.config.url_preview_ip_range_whitelist
  392. self.policyForHTTPS = hs.get_http_client_context_factory()
  393. def endpointForURI(self, uri):
  394. logger.info("Getting endpoint for %s", uri.toBytes())
  395. if uri.scheme == "http":
  396. endpoint_factory = HostnameEndpoint
  397. elif uri.scheme == "https":
  398. tlsCreator = self.policyForHTTPS.creatorForNetloc(uri.host, uri.port)
  399. def endpoint_factory(reactor, host, port, **kw):
  400. return wrapClientTLS(
  401. tlsCreator,
  402. HostnameEndpoint(reactor, host, port, **kw))
  403. else:
  404. logger.warn("Can't get endpoint for unrecognised scheme %s", uri.scheme)
  405. return None
  406. return SpiderEndpoint(
  407. reactor, uri.host, uri.port, self.blacklist, self.whitelist,
  408. endpoint=endpoint_factory, endpoint_kw_args=dict(timeout=15),
  409. )
  410. class SpiderHttpClient(SimpleHttpClient):
  411. """
  412. Separate HTTP client for spidering arbitrary URLs.
  413. Special in that it follows retries and has a UA that looks
  414. like a browser.
  415. used by the preview_url endpoint in the content repo.
  416. """
  417. def __init__(self, hs):
  418. SimpleHttpClient.__init__(self, hs)
  419. # clobber the base class's agent and UA:
  420. self.agent = ContentDecoderAgent(
  421. BrowserLikeRedirectAgent(
  422. Agent.usingEndpointFactory(
  423. reactor,
  424. SpiderEndpointFactory(hs)
  425. )
  426. ), [(b'gzip', GzipDecoder)]
  427. )
  428. # We could look like Chrome:
  429. # self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko)
  430. # Chrome Safari" % hs.version_string)
  431. def encode_urlencode_args(args):
  432. return {k: encode_urlencode_arg(v) for k, v in args.items()}
  433. def encode_urlencode_arg(arg):
  434. if isinstance(arg, unicode):
  435. return arg.encode('utf-8')
  436. elif isinstance(arg, list):
  437. return [encode_urlencode_arg(i) for i in arg]
  438. else:
  439. return arg
  440. def _print_ex(e):
  441. if hasattr(e, "reasons") and e.reasons:
  442. for ex in e.reasons:
  443. _print_ex(ex)
  444. else:
  445. logger.exception(e)
  446. class InsecureInterceptableContextFactory(ssl.ContextFactory):
  447. """
  448. Factory for PyOpenSSL SSL contexts which accepts any certificate for any domain.
  449. Do not use this since it allows an attacker to intercept your communications.
  450. """
  451. def __init__(self):
  452. self._context = SSL.Context(SSL.SSLv23_METHOD)
  453. self._context.set_verify(VERIFY_NONE, lambda *_: None)
  454. def getContext(self, hostname=None, port=None):
  455. return self._context
  456. def creatorForNetloc(self, hostname, port):
  457. return self
  458. class FileBodyProducer(TwistedFileBodyProducer):
  459. """Workaround for https://twistedmatrix.com/trac/ticket/8473
  460. We override the pauseProducing and resumeProducing methods in twisted's
  461. FileBodyProducer so that they do not raise exceptions if the task has
  462. already completed.
  463. """
  464. def pauseProducing(self):
  465. try:
  466. super(FileBodyProducer, self).pauseProducing()
  467. except task.TaskDone:
  468. # task has already completed
  469. pass
  470. def resumeProducing(self):
  471. try:
  472. super(FileBodyProducer, self).resumeProducing()
  473. except task.NotPaused:
  474. # task was not paused (probably because it had already completed)
  475. pass