federation_server.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import simplejson as json
  17. from twisted.internet import defer
  18. from synapse.api.errors import AuthError, FederationError, SynapseError, NotFoundError
  19. from synapse.crypto.event_signing import compute_event_signature
  20. from synapse.federation.federation_base import (
  21. FederationBase,
  22. event_from_pdu_json,
  23. )
  24. from synapse.federation.persistence import TransactionActions
  25. from synapse.federation.units import Edu, Transaction
  26. import synapse.metrics
  27. from synapse.types import get_domain_from_id
  28. from synapse.util import async
  29. from synapse.util.caches.response_cache import ResponseCache
  30. from synapse.util.logcontext import make_deferred_yieldable, preserve_fn
  31. from synapse.util.logutils import log_function
  32. # when processing incoming transactions, we try to handle multiple rooms in
  33. # parallel, up to this limit.
  34. TRANSACTION_CONCURRENCY_LIMIT = 10
  35. logger = logging.getLogger(__name__)
  36. # synapse.federation.federation_server is a silly name
  37. metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
  38. received_pdus_counter = metrics.register_counter("received_pdus")
  39. received_edus_counter = metrics.register_counter("received_edus")
  40. received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
  41. class FederationServer(FederationBase):
  42. def __init__(self, hs):
  43. super(FederationServer, self).__init__(hs)
  44. self.auth = hs.get_auth()
  45. self.handler = hs.get_handlers().federation_handler
  46. self._server_linearizer = async.Linearizer("fed_server")
  47. self._transaction_linearizer = async.Linearizer("fed_txn_handler")
  48. self.transaction_actions = TransactionActions(self.store)
  49. self.registry = hs.get_federation_registry()
  50. # We cache responses to state queries, as they take a while and often
  51. # come in waves.
  52. self._state_resp_cache = ResponseCache(hs, timeout_ms=30000)
  53. @defer.inlineCallbacks
  54. @log_function
  55. def on_backfill_request(self, origin, room_id, versions, limit):
  56. with (yield self._server_linearizer.queue((origin, room_id))):
  57. pdus = yield self.handler.on_backfill_request(
  58. origin, room_id, versions, limit
  59. )
  60. res = self._transaction_from_pdus(pdus).get_dict()
  61. defer.returnValue((200, res))
  62. @defer.inlineCallbacks
  63. @log_function
  64. def on_incoming_transaction(self, transaction_data):
  65. # keep this as early as possible to make the calculated origin ts as
  66. # accurate as possible.
  67. request_time = self._clock.time_msec()
  68. transaction = Transaction(**transaction_data)
  69. if not transaction.transaction_id:
  70. raise Exception("Transaction missing transaction_id")
  71. if not transaction.origin:
  72. raise Exception("Transaction missing origin")
  73. logger.debug("[%s] Got transaction", transaction.transaction_id)
  74. # use a linearizer to ensure that we don't process the same transaction
  75. # multiple times in parallel.
  76. with (yield self._transaction_linearizer.queue(
  77. (transaction.origin, transaction.transaction_id),
  78. )):
  79. result = yield self._handle_incoming_transaction(
  80. transaction, request_time,
  81. )
  82. defer.returnValue(result)
  83. @defer.inlineCallbacks
  84. def _handle_incoming_transaction(self, transaction, request_time):
  85. """ Process an incoming transaction and return the HTTP response
  86. Args:
  87. transaction (Transaction): incoming transaction
  88. request_time (int): timestamp that the HTTP request arrived at
  89. Returns:
  90. Deferred[(int, object)]: http response code and body
  91. """
  92. response = yield self.transaction_actions.have_responded(transaction)
  93. if response:
  94. logger.debug(
  95. "[%s] We've already responded to this request",
  96. transaction.transaction_id
  97. )
  98. defer.returnValue(response)
  99. return
  100. logger.debug("[%s] Transaction is new", transaction.transaction_id)
  101. received_pdus_counter.inc_by(len(transaction.pdus))
  102. pdus_by_room = {}
  103. for p in transaction.pdus:
  104. if "unsigned" in p:
  105. unsigned = p["unsigned"]
  106. if "age" in unsigned:
  107. p["age"] = unsigned["age"]
  108. if "age" in p:
  109. p["age_ts"] = request_time - int(p["age"])
  110. del p["age"]
  111. event = event_from_pdu_json(p)
  112. room_id = event.room_id
  113. pdus_by_room.setdefault(room_id, []).append(event)
  114. pdu_results = {}
  115. # we can process different rooms in parallel (which is useful if they
  116. # require callouts to other servers to fetch missing events), but
  117. # impose a limit to avoid going too crazy with ram/cpu.
  118. @defer.inlineCallbacks
  119. def process_pdus_for_room(room_id):
  120. logger.debug("Processing PDUs for %s", room_id)
  121. for pdu in pdus_by_room[room_id]:
  122. event_id = pdu.event_id
  123. try:
  124. yield self._handle_received_pdu(
  125. transaction.origin, pdu
  126. )
  127. pdu_results[event_id] = {}
  128. except FederationError as e:
  129. logger.warn("Error handling PDU %s: %s", event_id, e)
  130. pdu_results[event_id] = {"error": str(e)}
  131. except Exception as e:
  132. pdu_results[event_id] = {"error": str(e)}
  133. logger.exception("Failed to handle PDU %s", event_id)
  134. yield async.concurrently_execute(
  135. process_pdus_for_room, pdus_by_room.keys(),
  136. TRANSACTION_CONCURRENCY_LIMIT,
  137. )
  138. if hasattr(transaction, "edus"):
  139. for edu in (Edu(**x) for x in transaction.edus):
  140. yield self.received_edu(
  141. transaction.origin,
  142. edu.edu_type,
  143. edu.content
  144. )
  145. pdu_failures = getattr(transaction, "pdu_failures", [])
  146. for failure in pdu_failures:
  147. logger.info("Got failure %r", failure)
  148. response = {
  149. "pdus": pdu_results,
  150. }
  151. logger.debug("Returning: %s", str(response))
  152. yield self.transaction_actions.set_response(
  153. transaction,
  154. 200, response
  155. )
  156. defer.returnValue((200, response))
  157. @defer.inlineCallbacks
  158. def received_edu(self, origin, edu_type, content):
  159. received_edus_counter.inc()
  160. yield self.registry.on_edu(edu_type, origin, content)
  161. @defer.inlineCallbacks
  162. @log_function
  163. def on_context_state_request(self, origin, room_id, event_id):
  164. if not event_id:
  165. raise NotImplementedError("Specify an event")
  166. in_room = yield self.auth.check_host_in_room(room_id, origin)
  167. if not in_room:
  168. raise AuthError(403, "Host not in room.")
  169. result = self._state_resp_cache.get((room_id, event_id))
  170. if not result:
  171. with (yield self._server_linearizer.queue((origin, room_id))):
  172. d = self._state_resp_cache.set(
  173. (room_id, event_id),
  174. preserve_fn(self._on_context_state_request_compute)(room_id, event_id)
  175. )
  176. resp = yield make_deferred_yieldable(d)
  177. else:
  178. resp = yield make_deferred_yieldable(result)
  179. defer.returnValue((200, resp))
  180. @defer.inlineCallbacks
  181. def on_state_ids_request(self, origin, room_id, event_id):
  182. if not event_id:
  183. raise NotImplementedError("Specify an event")
  184. in_room = yield self.auth.check_host_in_room(room_id, origin)
  185. if not in_room:
  186. raise AuthError(403, "Host not in room.")
  187. state_ids = yield self.handler.get_state_ids_for_pdu(
  188. room_id, event_id,
  189. )
  190. auth_chain_ids = yield self.store.get_auth_chain_ids(state_ids)
  191. defer.returnValue((200, {
  192. "pdu_ids": state_ids,
  193. "auth_chain_ids": auth_chain_ids,
  194. }))
  195. @defer.inlineCallbacks
  196. def _on_context_state_request_compute(self, room_id, event_id):
  197. pdus = yield self.handler.get_state_for_pdu(
  198. room_id, event_id,
  199. )
  200. auth_chain = yield self.store.get_auth_chain(
  201. [pdu.event_id for pdu in pdus]
  202. )
  203. for event in auth_chain:
  204. # We sign these again because there was a bug where we
  205. # incorrectly signed things the first time round
  206. if self.hs.is_mine_id(event.event_id):
  207. event.signatures.update(
  208. compute_event_signature(
  209. event,
  210. self.hs.hostname,
  211. self.hs.config.signing_key[0]
  212. )
  213. )
  214. defer.returnValue({
  215. "pdus": [pdu.get_pdu_json() for pdu in pdus],
  216. "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain],
  217. })
  218. @defer.inlineCallbacks
  219. @log_function
  220. def on_pdu_request(self, origin, event_id):
  221. pdu = yield self._get_persisted_pdu(origin, event_id)
  222. if pdu:
  223. defer.returnValue(
  224. (200, self._transaction_from_pdus([pdu]).get_dict())
  225. )
  226. else:
  227. defer.returnValue((404, ""))
  228. @defer.inlineCallbacks
  229. @log_function
  230. def on_pull_request(self, origin, versions):
  231. raise NotImplementedError("Pull transactions not implemented")
  232. @defer.inlineCallbacks
  233. def on_query_request(self, query_type, args):
  234. received_queries_counter.inc(query_type)
  235. resp = yield self.registry.on_query(query_type, args)
  236. defer.returnValue((200, resp))
  237. @defer.inlineCallbacks
  238. def on_make_join_request(self, room_id, user_id):
  239. pdu = yield self.handler.on_make_join_request(room_id, user_id)
  240. time_now = self._clock.time_msec()
  241. defer.returnValue({"event": pdu.get_pdu_json(time_now)})
  242. @defer.inlineCallbacks
  243. def on_invite_request(self, origin, content):
  244. pdu = event_from_pdu_json(content)
  245. ret_pdu = yield self.handler.on_invite_request(origin, pdu)
  246. time_now = self._clock.time_msec()
  247. defer.returnValue((200, {"event": ret_pdu.get_pdu_json(time_now)}))
  248. @defer.inlineCallbacks
  249. def on_send_join_request(self, origin, content):
  250. logger.debug("on_send_join_request: content: %s", content)
  251. pdu = event_from_pdu_json(content)
  252. logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures)
  253. res_pdus = yield self.handler.on_send_join_request(origin, pdu)
  254. time_now = self._clock.time_msec()
  255. defer.returnValue((200, {
  256. "state": [p.get_pdu_json(time_now) for p in res_pdus["state"]],
  257. "auth_chain": [
  258. p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]
  259. ],
  260. }))
  261. @defer.inlineCallbacks
  262. def on_make_leave_request(self, room_id, user_id):
  263. pdu = yield self.handler.on_make_leave_request(room_id, user_id)
  264. time_now = self._clock.time_msec()
  265. defer.returnValue({"event": pdu.get_pdu_json(time_now)})
  266. @defer.inlineCallbacks
  267. def on_send_leave_request(self, origin, content):
  268. logger.debug("on_send_leave_request: content: %s", content)
  269. pdu = event_from_pdu_json(content)
  270. logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures)
  271. yield self.handler.on_send_leave_request(origin, pdu)
  272. defer.returnValue((200, {}))
  273. @defer.inlineCallbacks
  274. def on_event_auth(self, origin, room_id, event_id):
  275. with (yield self._server_linearizer.queue((origin, room_id))):
  276. time_now = self._clock.time_msec()
  277. auth_pdus = yield self.handler.on_event_auth(event_id)
  278. res = {
  279. "auth_chain": [a.get_pdu_json(time_now) for a in auth_pdus],
  280. }
  281. defer.returnValue((200, res))
  282. @defer.inlineCallbacks
  283. def on_query_auth_request(self, origin, content, room_id, event_id):
  284. """
  285. Content is a dict with keys::
  286. auth_chain (list): A list of events that give the auth chain.
  287. missing (list): A list of event_ids indicating what the other
  288. side (`origin`) think we're missing.
  289. rejects (dict): A mapping from event_id to a 2-tuple of reason
  290. string and a proof (or None) of why the event was rejected.
  291. The keys of this dict give the list of events the `origin` has
  292. rejected.
  293. Args:
  294. origin (str)
  295. content (dict)
  296. event_id (str)
  297. Returns:
  298. Deferred: Results in `dict` with the same format as `content`
  299. """
  300. with (yield self._server_linearizer.queue((origin, room_id))):
  301. auth_chain = [
  302. event_from_pdu_json(e)
  303. for e in content["auth_chain"]
  304. ]
  305. signed_auth = yield self._check_sigs_and_hash_and_fetch(
  306. origin, auth_chain, outlier=True
  307. )
  308. ret = yield self.handler.on_query_auth(
  309. origin,
  310. event_id,
  311. signed_auth,
  312. content.get("rejects", []),
  313. content.get("missing", []),
  314. )
  315. time_now = self._clock.time_msec()
  316. send_content = {
  317. "auth_chain": [
  318. e.get_pdu_json(time_now)
  319. for e in ret["auth_chain"]
  320. ],
  321. "rejects": ret.get("rejects", []),
  322. "missing": ret.get("missing", []),
  323. }
  324. defer.returnValue(
  325. (200, send_content)
  326. )
  327. @log_function
  328. def on_query_client_keys(self, origin, content):
  329. return self.on_query_request("client_keys", content)
  330. def on_query_user_devices(self, origin, user_id):
  331. return self.on_query_request("user_devices", user_id)
  332. @defer.inlineCallbacks
  333. @log_function
  334. def on_claim_client_keys(self, origin, content):
  335. query = []
  336. for user_id, device_keys in content.get("one_time_keys", {}).items():
  337. for device_id, algorithm in device_keys.items():
  338. query.append((user_id, device_id, algorithm))
  339. results = yield self.store.claim_e2e_one_time_keys(query)
  340. json_result = {}
  341. for user_id, device_keys in results.items():
  342. for device_id, keys in device_keys.items():
  343. for key_id, json_bytes in keys.items():
  344. json_result.setdefault(user_id, {})[device_id] = {
  345. key_id: json.loads(json_bytes)
  346. }
  347. logger.info(
  348. "Claimed one-time-keys: %s",
  349. ",".join((
  350. "%s for %s:%s" % (key_id, user_id, device_id)
  351. for user_id, user_keys in json_result.iteritems()
  352. for device_id, device_keys in user_keys.iteritems()
  353. for key_id, _ in device_keys.iteritems()
  354. )),
  355. )
  356. defer.returnValue({"one_time_keys": json_result})
  357. @defer.inlineCallbacks
  358. @log_function
  359. def on_get_missing_events(self, origin, room_id, earliest_events,
  360. latest_events, limit, min_depth):
  361. with (yield self._server_linearizer.queue((origin, room_id))):
  362. logger.info(
  363. "on_get_missing_events: earliest_events: %r, latest_events: %r,"
  364. " limit: %d, min_depth: %d",
  365. earliest_events, latest_events, limit, min_depth
  366. )
  367. missing_events = yield self.handler.on_get_missing_events(
  368. origin, room_id, earliest_events, latest_events, limit, min_depth
  369. )
  370. if len(missing_events) < 5:
  371. logger.info(
  372. "Returning %d events: %r", len(missing_events), missing_events
  373. )
  374. else:
  375. logger.info("Returning %d events", len(missing_events))
  376. time_now = self._clock.time_msec()
  377. defer.returnValue({
  378. "events": [ev.get_pdu_json(time_now) for ev in missing_events],
  379. })
  380. @log_function
  381. def on_openid_userinfo(self, token):
  382. ts_now_ms = self._clock.time_msec()
  383. return self.store.get_user_id_for_open_id_token(token, ts_now_ms)
  384. @log_function
  385. def _get_persisted_pdu(self, origin, event_id, do_auth=True):
  386. """ Get a PDU from the database with given origin and id.
  387. Returns:
  388. Deferred: Results in a `Pdu`.
  389. """
  390. return self.handler.get_persisted_pdu(
  391. origin, event_id, do_auth=do_auth
  392. )
  393. def _transaction_from_pdus(self, pdu_list):
  394. """Returns a new Transaction containing the given PDUs suitable for
  395. transmission.
  396. """
  397. time_now = self._clock.time_msec()
  398. pdus = [p.get_pdu_json(time_now) for p in pdu_list]
  399. return Transaction(
  400. origin=self.server_name,
  401. pdus=pdus,
  402. origin_server_ts=int(time_now),
  403. destination=None,
  404. )
  405. @defer.inlineCallbacks
  406. def _handle_received_pdu(self, origin, pdu):
  407. """ Process a PDU received in a federation /send/ transaction.
  408. Args:
  409. origin (str): server which sent the pdu
  410. pdu (FrozenEvent): received pdu
  411. Returns (Deferred): completes with None
  412. Raises: FederationError if the signatures / hash do not match
  413. """
  414. # check that it's actually being sent from a valid destination to
  415. # workaround bug #1753 in 0.18.5 and 0.18.6
  416. if origin != get_domain_from_id(pdu.event_id):
  417. # We continue to accept join events from any server; this is
  418. # necessary for the federation join dance to work correctly.
  419. # (When we join over federation, the "helper" server is
  420. # responsible for sending out the join event, rather than the
  421. # origin. See bug #1893).
  422. if not (
  423. pdu.type == 'm.room.member' and
  424. pdu.content and
  425. pdu.content.get("membership", None) == 'join'
  426. ):
  427. logger.info(
  428. "Discarding PDU %s from invalid origin %s",
  429. pdu.event_id, origin
  430. )
  431. return
  432. else:
  433. logger.info(
  434. "Accepting join PDU %s from %s",
  435. pdu.event_id, origin
  436. )
  437. # Check signature.
  438. try:
  439. pdu = yield self._check_sigs_and_hash(pdu)
  440. except SynapseError as e:
  441. raise FederationError(
  442. "ERROR",
  443. e.code,
  444. e.msg,
  445. affected=pdu.event_id,
  446. )
  447. yield self.handler.on_receive_pdu(origin, pdu, get_missing=True)
  448. def __str__(self):
  449. return "<ReplicationLayer(%s)>" % self.server_name
  450. @defer.inlineCallbacks
  451. def exchange_third_party_invite(
  452. self,
  453. sender_user_id,
  454. target_user_id,
  455. room_id,
  456. signed,
  457. ):
  458. ret = yield self.handler.exchange_third_party_invite(
  459. sender_user_id,
  460. target_user_id,
  461. room_id,
  462. signed,
  463. )
  464. defer.returnValue(ret)
  465. @defer.inlineCallbacks
  466. def on_exchange_third_party_invite_request(self, origin, room_id, event_dict):
  467. ret = yield self.handler.on_exchange_third_party_invite_request(
  468. origin, room_id, event_dict
  469. )
  470. defer.returnValue(ret)
  471. class FederationHandlerRegistry(object):
  472. """Allows classes to register themselves as handlers for a given EDU or
  473. query type for incoming federation traffic.
  474. """
  475. def __init__(self):
  476. self.edu_handlers = {}
  477. self.query_handlers = {}
  478. def register_edu_handler(self, edu_type, handler):
  479. """Sets the handler callable that will be used to handle an incoming
  480. federation EDU of the given type.
  481. Args:
  482. edu_type (str): The type of the incoming EDU to register handler for
  483. handler (Callable[[str, dict]]): A callable invoked on incoming EDU
  484. of the given type. The arguments are the origin server name and
  485. the EDU contents.
  486. """
  487. if edu_type in self.edu_handlers:
  488. raise KeyError("Already have an EDU handler for %s" % (edu_type,))
  489. self.edu_handlers[edu_type] = handler
  490. def register_query_handler(self, query_type, handler):
  491. """Sets the handler callable that will be used to handle an incoming
  492. federation query of the given type.
  493. Args:
  494. query_type (str): Category name of the query, which should match
  495. the string used by make_query.
  496. handler (Callable[[dict], Deferred[dict]]): Invoked to handle
  497. incoming queries of this type. The return will be yielded
  498. on and the result used as the response to the query request.
  499. """
  500. if query_type in self.query_handlers:
  501. raise KeyError(
  502. "Already have a Query handler for %s" % (query_type,)
  503. )
  504. self.query_handlers[query_type] = handler
  505. @defer.inlineCallbacks
  506. def on_edu(self, edu_type, origin, content):
  507. handler = self.edu_handlers.get(edu_type)
  508. if not handler:
  509. logger.warn("No handler registered for EDU type %s", edu_type)
  510. try:
  511. yield handler(origin, content)
  512. except SynapseError as e:
  513. logger.info("Failed to handle edu %r: %r", edu_type, e)
  514. except Exception as e:
  515. logger.exception("Failed to handle edu %r", edu_type)
  516. def on_query(self, query_type, args):
  517. handler = self.query_handlers.get(query_type)
  518. if not handler:
  519. logger.warn("No handler registered for query type %s", query_type)
  520. raise NotFoundError("No handler for Query type '%s'" % (query_type,))
  521. return handler(args)