matrixfederationclient.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014, 2015 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer, reactor, protocol
  16. from twisted.internet.error import DNSLookupError
  17. from twisted.web.client import readBody, _AgentBase, _URI
  18. from twisted.web.http_headers import Headers
  19. from twisted.web._newclient import ResponseDone
  20. from synapse.http.endpoint import matrix_federation_endpoint
  21. from synapse.util.async import sleep
  22. from synapse.util.logcontext import PreserveLoggingContext
  23. from syutil.jsonutil import encode_canonical_json
  24. from synapse.api.errors import (
  25. SynapseError, Codes, HttpResponseException,
  26. )
  27. from syutil.crypto.jsonsign import sign_json
  28. import simplejson as json
  29. import logging
  30. import urllib
  31. import urlparse
  32. logger = logging.getLogger(__name__)
  33. class MatrixFederationHttpAgent(_AgentBase):
  34. def __init__(self, reactor, pool=None):
  35. _AgentBase.__init__(self, reactor, pool)
  36. def request(self, destination, endpoint, method, path, params, query,
  37. headers, body_producer):
  38. host = b""
  39. port = 0
  40. fragment = b""
  41. parsed_URI = _URI(b"http", destination, host, port, path, params,
  42. query, fragment)
  43. # Set the connection pool key to be the destination.
  44. key = destination
  45. return self._requestWithEndpoint(key, endpoint, method, parsed_URI,
  46. headers, body_producer,
  47. parsed_URI.originForm)
  48. class MatrixFederationHttpClient(object):
  49. """HTTP client used to talk to other homeservers over the federation
  50. protocol. Send client certificates and signs requests.
  51. Attributes:
  52. agent (twisted.web.client.Agent): The twisted Agent used to send the
  53. requests.
  54. """
  55. def __init__(self, hs):
  56. self.hs = hs
  57. self.signing_key = hs.config.signing_key[0]
  58. self.server_name = hs.hostname
  59. self.agent = MatrixFederationHttpAgent(reactor)
  60. self.clock = hs.get_clock()
  61. self.version_string = hs.version_string
  62. @defer.inlineCallbacks
  63. def _create_request(self, destination, method, path_bytes,
  64. body_callback, headers_dict={}, param_bytes=b"",
  65. query_bytes=b"", retry_on_dns_fail=True):
  66. """ Creates and sends a request to the given url
  67. """
  68. headers_dict[b"User-Agent"] = [self.version_string]
  69. headers_dict[b"Host"] = [destination]
  70. url_bytes = urlparse.urlunparse(
  71. ("", "", path_bytes, param_bytes, query_bytes, "",)
  72. )
  73. logger.info("Sending request to %s: %s %s",
  74. destination, method, url_bytes)
  75. logger.debug(
  76. "Types: %s",
  77. [
  78. type(destination), type(method), type(path_bytes),
  79. type(param_bytes),
  80. type(query_bytes)
  81. ]
  82. )
  83. # XXX: Would be much nicer to retry only at the transaction-layer
  84. # (once we have reliable transactions in place)
  85. retries_left = 5
  86. endpoint = self._getEndpoint(reactor, destination)
  87. while True:
  88. producer = None
  89. if body_callback:
  90. producer = body_callback(method, url_bytes, headers_dict)
  91. try:
  92. with PreserveLoggingContext():
  93. request_deferred = self.agent.request(
  94. destination,
  95. endpoint,
  96. method,
  97. path_bytes,
  98. param_bytes,
  99. query_bytes,
  100. Headers(headers_dict),
  101. producer
  102. )
  103. response = yield self.clock.time_bound_deferred(
  104. request_deferred,
  105. time_out=60,
  106. )
  107. logger.debug("Got response to %s", method)
  108. break
  109. except Exception as e:
  110. if not retry_on_dns_fail and isinstance(e, DNSLookupError):
  111. logger.warn(
  112. "DNS Lookup failed to %s with %s",
  113. destination,
  114. e
  115. )
  116. raise
  117. logger.warn(
  118. "Sending request failed to %s: %s %s: %s - %s",
  119. destination,
  120. method,
  121. url_bytes,
  122. type(e).__name__,
  123. _flatten_response_never_received(e),
  124. )
  125. if retries_left:
  126. yield sleep(2 ** (5 - retries_left))
  127. retries_left -= 1
  128. else:
  129. raise
  130. logger.info(
  131. "Received response %d %s for %s: %s %s",
  132. response.code,
  133. response.phrase,
  134. destination,
  135. method,
  136. url_bytes
  137. )
  138. if 200 <= response.code < 300:
  139. pass
  140. else:
  141. # :'(
  142. # Update transactions table?
  143. body = yield readBody(response)
  144. raise HttpResponseException(
  145. response.code, response.phrase, body
  146. )
  147. defer.returnValue(response)
  148. def sign_request(self, destination, method, url_bytes, headers_dict,
  149. content=None):
  150. request = {
  151. "method": method,
  152. "uri": url_bytes,
  153. "origin": self.server_name,
  154. "destination": destination,
  155. }
  156. if content is not None:
  157. request["content"] = content
  158. request = sign_json(request, self.server_name, self.signing_key)
  159. auth_headers = []
  160. for key, sig in request["signatures"][self.server_name].items():
  161. auth_headers.append(bytes(
  162. "X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
  163. self.server_name, key, sig,
  164. )
  165. ))
  166. headers_dict[b"Authorization"] = auth_headers
  167. @defer.inlineCallbacks
  168. def put_json(self, destination, path, data={}, json_data_callback=None):
  169. """ Sends the specifed json data using PUT
  170. Args:
  171. destination (str): The remote server to send the HTTP request
  172. to.
  173. path (str): The HTTP path.
  174. data (dict): A dict containing the data that will be used as
  175. the request body. This will be encoded as JSON.
  176. json_data_callback (callable): A callable returning the dict to
  177. use as the request body.
  178. Returns:
  179. Deferred: Succeeds when we get a 2xx HTTP response. The result
  180. will be the decoded JSON body. On a 4xx or 5xx error response a
  181. CodeMessageException is raised.
  182. """
  183. if not json_data_callback:
  184. def json_data_callback():
  185. return data
  186. def body_callback(method, url_bytes, headers_dict):
  187. json_data = json_data_callback()
  188. self.sign_request(
  189. destination, method, url_bytes, headers_dict, json_data
  190. )
  191. producer = _JsonProducer(json_data)
  192. return producer
  193. response = yield self._create_request(
  194. destination.encode("ascii"),
  195. "PUT",
  196. path.encode("ascii"),
  197. body_callback=body_callback,
  198. headers_dict={"Content-Type": ["application/json"]},
  199. )
  200. if 200 <= response.code < 300:
  201. # We need to update the transactions table to say it was sent?
  202. c_type = response.headers.getRawHeaders("Content-Type")
  203. if "application/json" not in c_type:
  204. raise RuntimeError(
  205. "Content-Type not application/json"
  206. )
  207. logger.debug("Getting resp body")
  208. body = yield readBody(response)
  209. logger.debug("Got resp body")
  210. defer.returnValue(json.loads(body))
  211. @defer.inlineCallbacks
  212. def post_json(self, destination, path, data={}):
  213. """ Sends the specifed json data using POST
  214. Args:
  215. destination (str): The remote server to send the HTTP request
  216. to.
  217. path (str): The HTTP path.
  218. data (dict): A dict containing the data that will be used as
  219. the request body. This will be encoded as JSON.
  220. Returns:
  221. Deferred: Succeeds when we get a 2xx HTTP response. The result
  222. will be the decoded JSON body. On a 4xx or 5xx error response a
  223. CodeMessageException is raised.
  224. """
  225. def body_callback(method, url_bytes, headers_dict):
  226. self.sign_request(
  227. destination, method, url_bytes, headers_dict, data
  228. )
  229. return _JsonProducer(data)
  230. response = yield self._create_request(
  231. destination.encode("ascii"),
  232. "POST",
  233. path.encode("ascii"),
  234. body_callback=body_callback,
  235. headers_dict={"Content-Type": ["application/json"]},
  236. )
  237. if 200 <= response.code < 300:
  238. # We need to update the transactions table to say it was sent?
  239. c_type = response.headers.getRawHeaders("Content-Type")
  240. if "application/json" not in c_type:
  241. raise RuntimeError(
  242. "Content-Type not application/json"
  243. )
  244. logger.debug("Getting resp body")
  245. body = yield readBody(response)
  246. logger.debug("Got resp body")
  247. defer.returnValue(json.loads(body))
  248. @defer.inlineCallbacks
  249. def get_json(self, destination, path, args={}, retry_on_dns_fail=True):
  250. """ GETs some json from the given host homeserver and path
  251. Args:
  252. destination (str): The remote server to send the HTTP request
  253. to.
  254. path (str): The HTTP path.
  255. args (dict): A dictionary used to create query strings, defaults to
  256. None.
  257. Returns:
  258. Deferred: Succeeds when we get *any* HTTP response.
  259. The result of the deferred is a tuple of `(code, response)`,
  260. where `response` is a dict representing the decoded JSON body.
  261. """
  262. logger.debug("get_json args: %s", args)
  263. encoded_args = {}
  264. for k, vs in args.items():
  265. if isinstance(vs, basestring):
  266. vs = [vs]
  267. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  268. query_bytes = urllib.urlencode(encoded_args, True)
  269. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  270. def body_callback(method, url_bytes, headers_dict):
  271. self.sign_request(destination, method, url_bytes, headers_dict)
  272. return None
  273. response = yield self._create_request(
  274. destination.encode("ascii"),
  275. "GET",
  276. path.encode("ascii"),
  277. query_bytes=query_bytes,
  278. body_callback=body_callback,
  279. retry_on_dns_fail=retry_on_dns_fail
  280. )
  281. if 200 <= response.code < 300:
  282. # We need to update the transactions table to say it was sent?
  283. c_type = response.headers.getRawHeaders("Content-Type")
  284. if "application/json" not in c_type:
  285. raise RuntimeError(
  286. "Content-Type not application/json"
  287. )
  288. logger.debug("Getting resp body")
  289. body = yield readBody(response)
  290. logger.debug("Got resp body")
  291. defer.returnValue(json.loads(body))
  292. @defer.inlineCallbacks
  293. def get_file(self, destination, path, output_stream, args={},
  294. retry_on_dns_fail=True, max_size=None):
  295. """GETs a file from a given homeserver
  296. Args:
  297. destination (str): The remote server to send the HTTP request to.
  298. path (str): The HTTP path to GET.
  299. output_stream (file): File to write the response body to.
  300. args (dict): Optional dictionary used to create the query string.
  301. Returns:
  302. A (int,dict) tuple of the file length and a dict of the response
  303. headers.
  304. """
  305. encoded_args = {}
  306. for k, vs in args.items():
  307. if isinstance(vs, basestring):
  308. vs = [vs]
  309. encoded_args[k] = [v.encode("UTF-8") for v in vs]
  310. query_bytes = urllib.urlencode(encoded_args, True)
  311. logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
  312. def body_callback(method, url_bytes, headers_dict):
  313. self.sign_request(destination, method, url_bytes, headers_dict)
  314. return None
  315. response = yield self._create_request(
  316. destination.encode("ascii"),
  317. "GET",
  318. path.encode("ascii"),
  319. query_bytes=query_bytes,
  320. body_callback=body_callback,
  321. retry_on_dns_fail=retry_on_dns_fail
  322. )
  323. headers = dict(response.headers.getAllRawHeaders())
  324. try:
  325. length = yield _readBodyToFile(response, output_stream, max_size)
  326. except:
  327. logger.exception("Failed to download body")
  328. raise
  329. defer.returnValue((length, headers))
  330. def _getEndpoint(self, reactor, destination):
  331. return matrix_federation_endpoint(
  332. reactor, destination, timeout=10,
  333. ssl_context_factory=self.hs.tls_context_factory
  334. )
  335. class _ReadBodyToFileProtocol(protocol.Protocol):
  336. def __init__(self, stream, deferred, max_size):
  337. self.stream = stream
  338. self.deferred = deferred
  339. self.length = 0
  340. self.max_size = max_size
  341. def dataReceived(self, data):
  342. self.stream.write(data)
  343. self.length += len(data)
  344. if self.max_size is not None and self.length >= self.max_size:
  345. self.deferred.errback(SynapseError(
  346. 502,
  347. "Requested file is too large > %r bytes" % (self.max_size,),
  348. Codes.TOO_LARGE,
  349. ))
  350. self.deferred = defer.Deferred()
  351. self.transport.loseConnection()
  352. def connectionLost(self, reason):
  353. if reason.check(ResponseDone):
  354. self.deferred.callback(self.length)
  355. else:
  356. self.deferred.errback(reason)
  357. def _readBodyToFile(response, stream, max_size):
  358. d = defer.Deferred()
  359. response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size))
  360. return d
  361. class _JsonProducer(object):
  362. """ Used by the twisted http client to create the HTTP body from json
  363. """
  364. def __init__(self, jsn):
  365. self.reset(jsn)
  366. def reset(self, jsn):
  367. self.body = encode_canonical_json(jsn)
  368. self.length = len(self.body)
  369. def startProducing(self, consumer):
  370. consumer.write(self.body)
  371. return defer.succeed(None)
  372. def pauseProducing(self):
  373. pass
  374. def stopProducing(self):
  375. pass
  376. def _flatten_response_never_received(e):
  377. if hasattr(e, "reasons"):
  378. return ", ".join(
  379. _flatten_response_never_received(f.value)
  380. for f in e.reasons
  381. )
  382. else:
  383. return "%s: %s" % (type(e).__name__, e.message,)