federation_client.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer
  16. from .federation_base import FederationBase
  17. from synapse.api.constants import Membership
  18. from .units import Edu
  19. from synapse.api.errors import (
  20. CodeMessageException, HttpResponseException, SynapseError,
  21. )
  22. from synapse.util import unwrapFirstError
  23. from synapse.util.async import concurrently_execute
  24. from synapse.util.caches.expiringcache import ExpiringCache
  25. from synapse.util.logutils import log_function
  26. from synapse.events import FrozenEvent
  27. import synapse.metrics
  28. from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
  29. import copy
  30. import itertools
  31. import logging
  32. import random
  33. logger = logging.getLogger(__name__)
  34. # synapse.federation.federation_client is a silly name
  35. metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
  36. sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
  37. sent_edus_counter = metrics.register_counter("sent_edus")
  38. sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
  39. class FederationClient(FederationBase):
  40. def start_get_pdu_cache(self):
  41. self._get_pdu_cache = ExpiringCache(
  42. cache_name="get_pdu_cache",
  43. clock=self._clock,
  44. max_len=1000,
  45. expiry_ms=120 * 1000,
  46. reset_expiry_on_get=False,
  47. )
  48. self._get_pdu_cache.start()
  49. @log_function
  50. def send_pdu(self, pdu, destinations):
  51. """Informs the replication layer about a new PDU generated within the
  52. home server that should be transmitted to others.
  53. TODO: Figure out when we should actually resolve the deferred.
  54. Args:
  55. pdu (Pdu): The new Pdu.
  56. Returns:
  57. Deferred: Completes when we have successfully processed the PDU
  58. and replicated it to any interested remote home servers.
  59. """
  60. order = self._order
  61. self._order += 1
  62. sent_pdus_destination_dist.inc_by(len(destinations))
  63. logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
  64. # TODO, add errback, etc.
  65. self._transaction_queue.enqueue_pdu(pdu, destinations, order)
  66. logger.debug(
  67. "[%s] transaction_layer.enqueue_pdu... done",
  68. pdu.event_id
  69. )
  70. @log_function
  71. def send_edu(self, destination, edu_type, content):
  72. edu = Edu(
  73. origin=self.server_name,
  74. destination=destination,
  75. edu_type=edu_type,
  76. content=content,
  77. )
  78. sent_edus_counter.inc()
  79. # TODO, add errback, etc.
  80. self._transaction_queue.enqueue_edu(edu)
  81. return defer.succeed(None)
  82. @log_function
  83. def send_failure(self, failure, destination):
  84. self._transaction_queue.enqueue_failure(failure, destination)
  85. return defer.succeed(None)
  86. @log_function
  87. def make_query(self, destination, query_type, args,
  88. retry_on_dns_fail=False):
  89. """Sends a federation Query to a remote homeserver of the given type
  90. and arguments.
  91. Args:
  92. destination (str): Domain name of the remote homeserver
  93. query_type (str): Category of the query type; should match the
  94. handler name used in register_query_handler().
  95. args (dict): Mapping of strings to strings containing the details
  96. of the query request.
  97. Returns:
  98. a Deferred which will eventually yield a JSON object from the
  99. response
  100. """
  101. sent_queries_counter.inc(query_type)
  102. return self.transport_layer.make_query(
  103. destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
  104. )
  105. @log_function
  106. def query_client_keys(self, destination, content):
  107. """Query device keys for a device hosted on a remote server.
  108. Args:
  109. destination (str): Domain name of the remote homeserver
  110. content (dict): The query content.
  111. Returns:
  112. a Deferred which will eventually yield a JSON object from the
  113. response
  114. """
  115. sent_queries_counter.inc("client_device_keys")
  116. return self.transport_layer.query_client_keys(destination, content)
  117. @log_function
  118. def claim_client_keys(self, destination, content):
  119. """Claims one-time keys for a device hosted on a remote server.
  120. Args:
  121. destination (str): Domain name of the remote homeserver
  122. content (dict): The query content.
  123. Returns:
  124. a Deferred which will eventually yield a JSON object from the
  125. response
  126. """
  127. sent_queries_counter.inc("client_one_time_keys")
  128. return self.transport_layer.claim_client_keys(destination, content)
  129. @defer.inlineCallbacks
  130. @log_function
  131. def backfill(self, dest, context, limit, extremities):
  132. """Requests some more historic PDUs for the given context from the
  133. given destination server.
  134. Args:
  135. dest (str): The remote home server to ask.
  136. context (str): The context to backfill.
  137. limit (int): The maximum number of PDUs to return.
  138. extremities (list): List of PDU id and origins of the first pdus
  139. we have seen from the context
  140. Returns:
  141. Deferred: Results in the received PDUs.
  142. """
  143. logger.debug("backfill extrem=%s", extremities)
  144. # If there are no extremeties then we've (probably) reached the start.
  145. if not extremities:
  146. return
  147. transaction_data = yield self.transport_layer.backfill(
  148. dest, context, extremities, limit)
  149. logger.debug("backfill transaction_data=%s", repr(transaction_data))
  150. pdus = [
  151. self.event_from_pdu_json(p, outlier=False)
  152. for p in transaction_data["pdus"]
  153. ]
  154. # FIXME: We should handle signature failures more gracefully.
  155. pdus[:] = yield defer.gatherResults(
  156. self._check_sigs_and_hashes(pdus),
  157. consumeErrors=True,
  158. ).addErrback(unwrapFirstError)
  159. defer.returnValue(pdus)
  160. @defer.inlineCallbacks
  161. @log_function
  162. def get_pdu(self, destinations, event_id, outlier=False, timeout=None):
  163. """Requests the PDU with given origin and ID from the remote home
  164. servers.
  165. Will attempt to get the PDU from each destination in the list until
  166. one succeeds.
  167. This will persist the PDU locally upon receipt.
  168. Args:
  169. destinations (list): Which home servers to query
  170. pdu_origin (str): The home server that originally sent the pdu.
  171. event_id (str)
  172. outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if
  173. it's from an arbitary point in the context as opposed to part
  174. of the current block of PDUs. Defaults to `False`
  175. timeout (int): How long to try (in ms) each destination for before
  176. moving to the next destination. None indicates no timeout.
  177. Returns:
  178. Deferred: Results in the requested PDU.
  179. """
  180. # TODO: Rate limit the number of times we try and get the same event.
  181. if self._get_pdu_cache:
  182. e = self._get_pdu_cache.get(event_id)
  183. if e:
  184. defer.returnValue(e)
  185. pdu = None
  186. for destination in destinations:
  187. try:
  188. limiter = yield get_retry_limiter(
  189. destination,
  190. self._clock,
  191. self.store,
  192. )
  193. with limiter:
  194. transaction_data = yield self.transport_layer.get_event(
  195. destination, event_id, timeout=timeout,
  196. )
  197. logger.debug("transaction_data %r", transaction_data)
  198. pdu_list = [
  199. self.event_from_pdu_json(p, outlier=outlier)
  200. for p in transaction_data["pdus"]
  201. ]
  202. if pdu_list and pdu_list[0]:
  203. pdu = pdu_list[0]
  204. # Check signatures are correct.
  205. pdu = yield self._check_sigs_and_hashes([pdu])[0]
  206. break
  207. except SynapseError:
  208. logger.info(
  209. "Failed to get PDU %s from %s because %s",
  210. event_id, destination, e,
  211. )
  212. continue
  213. except CodeMessageException as e:
  214. if 400 <= e.code < 500:
  215. raise
  216. logger.info(
  217. "Failed to get PDU %s from %s because %s",
  218. event_id, destination, e,
  219. )
  220. continue
  221. except NotRetryingDestination as e:
  222. logger.info(e.message)
  223. continue
  224. except Exception as e:
  225. logger.info(
  226. "Failed to get PDU %s from %s because %s",
  227. event_id, destination, e,
  228. )
  229. continue
  230. if self._get_pdu_cache is not None and pdu:
  231. self._get_pdu_cache[event_id] = pdu
  232. defer.returnValue(pdu)
  233. @defer.inlineCallbacks
  234. @log_function
  235. def get_state_for_room(self, destination, room_id, event_id):
  236. """Requests all of the `current` state PDUs for a given room from
  237. a remote home server.
  238. Args:
  239. destination (str): The remote homeserver to query for the state.
  240. room_id (str): The id of the room we're interested in.
  241. event_id (str): The id of the event we want the state at.
  242. Returns:
  243. Deferred: Results in a list of PDUs.
  244. """
  245. result = yield self.transport_layer.get_room_state(
  246. destination, room_id, event_id=event_id,
  247. )
  248. pdus = [
  249. self.event_from_pdu_json(p, outlier=True) for p in result["pdus"]
  250. ]
  251. auth_chain = [
  252. self.event_from_pdu_json(p, outlier=True)
  253. for p in result.get("auth_chain", [])
  254. ]
  255. signed_pdus = yield self._check_sigs_and_hash_and_fetch(
  256. destination, pdus, outlier=True
  257. )
  258. signed_auth = yield self._check_sigs_and_hash_and_fetch(
  259. destination, auth_chain, outlier=True
  260. )
  261. signed_auth.sort(key=lambda e: e.depth)
  262. defer.returnValue((signed_pdus, signed_auth))
  263. @defer.inlineCallbacks
  264. @log_function
  265. def get_event_auth(self, destination, room_id, event_id):
  266. res = yield self.transport_layer.get_event_auth(
  267. destination, room_id, event_id,
  268. )
  269. auth_chain = [
  270. self.event_from_pdu_json(p, outlier=True)
  271. for p in res["auth_chain"]
  272. ]
  273. signed_auth = yield self._check_sigs_and_hash_and_fetch(
  274. destination, auth_chain, outlier=True
  275. )
  276. signed_auth.sort(key=lambda e: e.depth)
  277. defer.returnValue(signed_auth)
  278. @defer.inlineCallbacks
  279. def make_membership_event(self, destinations, room_id, user_id, membership,
  280. content={},):
  281. """
  282. Creates an m.room.member event, with context, without participating in the room.
  283. Does so by asking one of the already participating servers to create an
  284. event with proper context.
  285. Note that this does not append any events to any graphs.
  286. Args:
  287. destinations (str): Candidate homeservers which are probably
  288. participating in the room.
  289. room_id (str): The room in which the event will happen.
  290. user_id (str): The user whose membership is being evented.
  291. membership (str): The "membership" property of the event. Must be
  292. one of "join" or "leave".
  293. content (object): Any additional data to put into the content field
  294. of the event.
  295. Return:
  296. A tuple of (origin (str), event (object)) where origin is the remote
  297. homeserver which generated the event.
  298. """
  299. valid_memberships = {Membership.JOIN, Membership.LEAVE}
  300. if membership not in valid_memberships:
  301. raise RuntimeError(
  302. "make_membership_event called with membership='%s', must be one of %s" %
  303. (membership, ",".join(valid_memberships))
  304. )
  305. for destination in destinations:
  306. if destination == self.server_name:
  307. continue
  308. try:
  309. ret = yield self.transport_layer.make_membership_event(
  310. destination, room_id, user_id, membership
  311. )
  312. pdu_dict = ret["event"]
  313. logger.debug("Got response to make_%s: %s", membership, pdu_dict)
  314. pdu_dict["content"].update(content)
  315. # The protoevent received over the JSON wire may not have all
  316. # the required fields. Lets just gloss over that because
  317. # there's some we never care about
  318. if "prev_state" not in pdu_dict:
  319. pdu_dict["prev_state"] = []
  320. defer.returnValue(
  321. (destination, self.event_from_pdu_json(pdu_dict))
  322. )
  323. break
  324. except CodeMessageException:
  325. raise
  326. except Exception as e:
  327. logger.warn(
  328. "Failed to make_%s via %s: %s",
  329. membership, destination, e.message
  330. )
  331. raise
  332. raise RuntimeError("Failed to send to any server.")
  333. @defer.inlineCallbacks
  334. def send_join(self, destinations, pdu):
  335. for destination in destinations:
  336. if destination == self.server_name:
  337. continue
  338. try:
  339. time_now = self._clock.time_msec()
  340. _, content = yield self.transport_layer.send_join(
  341. destination=destination,
  342. room_id=pdu.room_id,
  343. event_id=pdu.event_id,
  344. content=pdu.get_pdu_json(time_now),
  345. )
  346. logger.debug("Got content: %s", content)
  347. state = [
  348. self.event_from_pdu_json(p, outlier=True)
  349. for p in content.get("state", [])
  350. ]
  351. auth_chain = [
  352. self.event_from_pdu_json(p, outlier=True)
  353. for p in content.get("auth_chain", [])
  354. ]
  355. pdus = {
  356. p.event_id: p
  357. for p in itertools.chain(state, auth_chain)
  358. }
  359. valid_pdus = yield self._check_sigs_and_hash_and_fetch(
  360. destination, pdus.values(),
  361. outlier=True,
  362. )
  363. valid_pdus_map = {
  364. p.event_id: p
  365. for p in valid_pdus
  366. }
  367. # NB: We *need* to copy to ensure that we don't have multiple
  368. # references being passed on, as that causes... issues.
  369. signed_state = [
  370. copy.copy(valid_pdus_map[p.event_id])
  371. for p in state
  372. if p.event_id in valid_pdus_map
  373. ]
  374. signed_auth = [
  375. valid_pdus_map[p.event_id]
  376. for p in auth_chain
  377. if p.event_id in valid_pdus_map
  378. ]
  379. # NB: We *need* to copy to ensure that we don't have multiple
  380. # references being passed on, as that causes... issues.
  381. for s in signed_state:
  382. s.internal_metadata = copy.deepcopy(s.internal_metadata)
  383. auth_chain.sort(key=lambda e: e.depth)
  384. defer.returnValue({
  385. "state": signed_state,
  386. "auth_chain": signed_auth,
  387. "origin": destination,
  388. })
  389. except CodeMessageException:
  390. raise
  391. except Exception as e:
  392. logger.exception(
  393. "Failed to send_join via %s: %s",
  394. destination, e.message
  395. )
  396. raise RuntimeError("Failed to send to any server.")
  397. @defer.inlineCallbacks
  398. def send_invite(self, destination, room_id, event_id, pdu):
  399. time_now = self._clock.time_msec()
  400. code, content = yield self.transport_layer.send_invite(
  401. destination=destination,
  402. room_id=room_id,
  403. event_id=event_id,
  404. content=pdu.get_pdu_json(time_now),
  405. )
  406. pdu_dict = content["event"]
  407. logger.debug("Got response to send_invite: %s", pdu_dict)
  408. pdu = self.event_from_pdu_json(pdu_dict)
  409. # Check signatures are correct.
  410. pdu = yield self._check_sigs_and_hash(pdu)
  411. # FIXME: We should handle signature failures more gracefully.
  412. defer.returnValue(pdu)
  413. @defer.inlineCallbacks
  414. def send_leave(self, destinations, pdu):
  415. for destination in destinations:
  416. if destination == self.server_name:
  417. continue
  418. try:
  419. time_now = self._clock.time_msec()
  420. _, content = yield self.transport_layer.send_leave(
  421. destination=destination,
  422. room_id=pdu.room_id,
  423. event_id=pdu.event_id,
  424. content=pdu.get_pdu_json(time_now),
  425. )
  426. logger.debug("Got content: %s", content)
  427. defer.returnValue(None)
  428. except CodeMessageException:
  429. raise
  430. except Exception as e:
  431. logger.exception(
  432. "Failed to send_leave via %s: %s",
  433. destination, e.message
  434. )
  435. raise RuntimeError("Failed to send to any server.")
  436. @defer.inlineCallbacks
  437. def get_public_rooms(self, destinations):
  438. results_by_server = {}
  439. @defer.inlineCallbacks
  440. def _get_result(s):
  441. if s == self.server_name:
  442. defer.returnValue()
  443. try:
  444. result = yield self.transport_layer.get_public_rooms(s)
  445. results_by_server[s] = result
  446. except:
  447. logger.exception("Error getting room list from server %r", s)
  448. yield concurrently_execute(_get_result, destinations, 3)
  449. defer.returnValue(results_by_server)
  450. @defer.inlineCallbacks
  451. def query_auth(self, destination, room_id, event_id, local_auth):
  452. """
  453. Params:
  454. destination (str)
  455. event_it (str)
  456. local_auth (list)
  457. """
  458. time_now = self._clock.time_msec()
  459. send_content = {
  460. "auth_chain": [e.get_pdu_json(time_now) for e in local_auth],
  461. }
  462. code, content = yield self.transport_layer.send_query_auth(
  463. destination=destination,
  464. room_id=room_id,
  465. event_id=event_id,
  466. content=send_content,
  467. )
  468. auth_chain = [
  469. self.event_from_pdu_json(e)
  470. for e in content["auth_chain"]
  471. ]
  472. signed_auth = yield self._check_sigs_and_hash_and_fetch(
  473. destination, auth_chain, outlier=True
  474. )
  475. signed_auth.sort(key=lambda e: e.depth)
  476. ret = {
  477. "auth_chain": signed_auth,
  478. "rejects": content.get("rejects", []),
  479. "missing": content.get("missing", []),
  480. }
  481. defer.returnValue(ret)
  482. @defer.inlineCallbacks
  483. def get_missing_events(self, destination, room_id, earliest_events_ids,
  484. latest_events, limit, min_depth):
  485. """Tries to fetch events we are missing. This is called when we receive
  486. an event without having received all of its ancestors.
  487. Args:
  488. destination (str)
  489. room_id (str)
  490. earliest_events_ids (list): List of event ids. Effectively the
  491. events we expected to receive, but haven't. `get_missing_events`
  492. should only return events that didn't happen before these.
  493. latest_events (list): List of events we have received that we don't
  494. have all previous events for.
  495. limit (int): Maximum number of events to return.
  496. min_depth (int): Minimum depth of events tor return.
  497. """
  498. try:
  499. content = yield self.transport_layer.get_missing_events(
  500. destination=destination,
  501. room_id=room_id,
  502. earliest_events=earliest_events_ids,
  503. latest_events=[e.event_id for e in latest_events],
  504. limit=limit,
  505. min_depth=min_depth,
  506. )
  507. events = [
  508. self.event_from_pdu_json(e)
  509. for e in content.get("events", [])
  510. ]
  511. signed_events = yield self._check_sigs_and_hash_and_fetch(
  512. destination, events, outlier=False
  513. )
  514. have_gotten_all_from_destination = True
  515. except HttpResponseException as e:
  516. if not e.code == 400:
  517. raise
  518. # We are probably hitting an old server that doesn't support
  519. # get_missing_events
  520. signed_events = []
  521. have_gotten_all_from_destination = False
  522. if len(signed_events) >= limit:
  523. defer.returnValue(signed_events)
  524. servers = yield self.store.get_joined_hosts_for_room(room_id)
  525. servers = set(servers)
  526. servers.discard(self.server_name)
  527. failed_to_fetch = set()
  528. while len(signed_events) < limit:
  529. # Are we missing any?
  530. seen_events = set(earliest_events_ids)
  531. seen_events.update(e.event_id for e in signed_events if e)
  532. missing_events = {}
  533. for e in itertools.chain(latest_events, signed_events):
  534. if e.depth > min_depth:
  535. missing_events.update({
  536. e_id: e.depth for e_id, _ in e.prev_events
  537. if e_id not in seen_events
  538. and e_id not in failed_to_fetch
  539. })
  540. if not missing_events:
  541. break
  542. have_seen = yield self.store.have_events(missing_events)
  543. for k in have_seen:
  544. missing_events.pop(k, None)
  545. if not missing_events:
  546. break
  547. # Okay, we haven't gotten everything yet. Lets get them.
  548. ordered_missing = sorted(missing_events.items(), key=lambda x: x[0])
  549. if have_gotten_all_from_destination:
  550. servers.discard(destination)
  551. def random_server_list():
  552. srvs = list(servers)
  553. random.shuffle(srvs)
  554. return srvs
  555. deferreds = [
  556. self.get_pdu(
  557. destinations=random_server_list(),
  558. event_id=e_id,
  559. )
  560. for e_id, depth in ordered_missing[:limit - len(signed_events)]
  561. ]
  562. res = yield defer.DeferredList(deferreds, consumeErrors=True)
  563. for (result, val), (e_id, _) in zip(res, ordered_missing):
  564. if result and val:
  565. signed_events.append(val)
  566. else:
  567. failed_to_fetch.add(e_id)
  568. defer.returnValue(signed_events)
  569. def event_from_pdu_json(self, pdu_json, outlier=False):
  570. event = FrozenEvent(
  571. pdu_json
  572. )
  573. event.internal_metadata.outlier = outlier
  574. return event
  575. @defer.inlineCallbacks
  576. def forward_third_party_invite(self, destinations, room_id, event_dict):
  577. for destination in destinations:
  578. if destination == self.server_name:
  579. continue
  580. try:
  581. yield self.transport_layer.exchange_third_party_invite(
  582. destination=destination,
  583. room_id=room_id,
  584. event_dict=event_dict,
  585. )
  586. defer.returnValue(None)
  587. except CodeMessageException:
  588. raise
  589. except Exception as e:
  590. logger.exception(
  591. "Failed to send_third_party_invite via %s: %s",
  592. destination, e.message
  593. )
  594. raise RuntimeError("Failed to send to any server.")