transaction_queue.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import datetime
  16. from twisted.internet import defer
  17. from .persistence import TransactionActions
  18. from .units import Transaction, Edu
  19. from synapse.api.errors import HttpResponseException, FederationDeniedError
  20. from synapse.util import logcontext, PreserveLoggingContext
  21. from synapse.util.async import run_on_reactor
  22. from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
  23. from synapse.util.metrics import measure_func
  24. from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
  25. import synapse.metrics
  26. from synapse.metrics import LaterGauge
  27. from synapse.metrics import (
  28. sent_edus_counter,
  29. sent_transactions_counter,
  30. events_processed_counter,
  31. )
  32. from prometheus_client import Counter
  33. from six import itervalues
  34. import logging
  35. logger = logging.getLogger(__name__)
  36. sent_pdus_destination_dist = Counter(
  37. "synapse_federation_transaction_queue_sent_pdu_destinations", ""
  38. )
  39. class TransactionQueue(object):
  40. """This class makes sure we only have one transaction in flight at
  41. a time for a given destination.
  42. It batches pending PDUs into single transactions.
  43. """
  44. def __init__(self, hs):
  45. self.server_name = hs.hostname
  46. self.store = hs.get_datastore()
  47. self.state = hs.get_state_handler()
  48. self.transaction_actions = TransactionActions(self.store)
  49. self.transport_layer = hs.get_federation_transport_client()
  50. self.clock = hs.get_clock()
  51. self.is_mine_id = hs.is_mine_id
  52. # Is a mapping from destinations -> deferreds. Used to keep track
  53. # of which destinations have transactions in flight and when they are
  54. # done
  55. self.pending_transactions = {}
  56. LaterGauge(
  57. "synapse_federation_transaction_queue_pending_destinations",
  58. "",
  59. [],
  60. lambda: len(self.pending_transactions),
  61. )
  62. # Is a mapping from destination -> list of
  63. # tuple(pending pdus, deferred, order)
  64. self.pending_pdus_by_dest = pdus = {}
  65. # destination -> list of tuple(edu, deferred)
  66. self.pending_edus_by_dest = edus = {}
  67. # Map of user_id -> UserPresenceState for all the pending presence
  68. # to be sent out by user_id. Entries here get processed and put in
  69. # pending_presence_by_dest
  70. self.pending_presence = {}
  71. # Map of destination -> user_id -> UserPresenceState of pending presence
  72. # to be sent to each destinations
  73. self.pending_presence_by_dest = presence = {}
  74. # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
  75. # based on their key (e.g. typing events by room_id)
  76. # Map of destination -> (edu_type, key) -> Edu
  77. self.pending_edus_keyed_by_dest = edus_keyed = {}
  78. LaterGauge(
  79. "synapse_federation_transaction_queue_pending_pdus",
  80. "",
  81. [],
  82. lambda: sum(map(len, pdus.values())),
  83. )
  84. LaterGauge(
  85. "synapse_federation_transaction_queue_pending_edus",
  86. "",
  87. [],
  88. lambda: (
  89. sum(map(len, edus.values()))
  90. + sum(map(len, presence.values()))
  91. + sum(map(len, edus_keyed.values()))
  92. ),
  93. )
  94. # destination -> list of tuple(failure, deferred)
  95. self.pending_failures_by_dest = {}
  96. # destination -> stream_id of last successfully sent to-device message.
  97. # NB: may be a long or an int.
  98. self.last_device_stream_id_by_dest = {}
  99. # destination -> stream_id of last successfully sent device list
  100. # update.
  101. self.last_device_list_stream_id_by_dest = {}
  102. # HACK to get unique tx id
  103. self._next_txn_id = int(self.clock.time_msec())
  104. self._order = 1
  105. self._is_processing = False
  106. self._last_poked_id = -1
  107. self._processing_pending_presence = False
  108. def can_send_to(self, destination):
  109. """Can we send messages to the given server?
  110. We can't send messages to ourselves. If we are running on localhost
  111. then we can only federation with other servers running on localhost.
  112. Otherwise we only federate with servers on a public domain.
  113. Args:
  114. destination(str): The server we are possibly trying to send to.
  115. Returns:
  116. bool: True if we can send to the server.
  117. """
  118. if destination == self.server_name:
  119. return False
  120. if self.server_name.startswith("localhost"):
  121. return destination.startswith("localhost")
  122. else:
  123. return not destination.startswith("localhost")
  124. def notify_new_events(self, current_id):
  125. """This gets called when we have some new events we might want to
  126. send out to other servers.
  127. """
  128. self._last_poked_id = max(current_id, self._last_poked_id)
  129. if self._is_processing:
  130. return
  131. # fire off a processing loop in the background. It's likely it will
  132. # outlast the current request, so run it in the sentinel logcontext.
  133. with PreserveLoggingContext():
  134. self._process_event_queue_loop()
  135. @defer.inlineCallbacks
  136. def _process_event_queue_loop(self):
  137. try:
  138. self._is_processing = True
  139. while True:
  140. last_token = yield self.store.get_federation_out_pos("events")
  141. next_token, events = yield self.store.get_all_new_events_stream(
  142. last_token, self._last_poked_id, limit=100,
  143. )
  144. logger.debug("Handling %s -> %s", last_token, next_token)
  145. if not events and next_token >= self._last_poked_id:
  146. break
  147. @defer.inlineCallbacks
  148. def handle_event(event):
  149. # Only send events for this server.
  150. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
  151. is_mine = self.is_mine_id(event.event_id)
  152. if not is_mine and send_on_behalf_of is None:
  153. return
  154. try:
  155. # Get the state from before the event.
  156. # We need to make sure that this is the state from before
  157. # the event and not from after it.
  158. # Otherwise if the last member on a server in a room is
  159. # banned then it won't receive the event because it won't
  160. # be in the room after the ban.
  161. destinations = yield self.state.get_current_hosts_in_room(
  162. event.room_id, latest_event_ids=[
  163. prev_id for prev_id, _ in event.prev_events
  164. ],
  165. )
  166. except Exception:
  167. logger.exception(
  168. "Failed to calculate hosts in room for event: %s",
  169. event.event_id,
  170. )
  171. return
  172. destinations = set(destinations)
  173. if send_on_behalf_of is not None:
  174. # If we are sending the event on behalf of another server
  175. # then it already has the event and there is no reason to
  176. # send the event to it.
  177. destinations.discard(send_on_behalf_of)
  178. logger.debug("Sending %s to %r", event, destinations)
  179. self._send_pdu(event, destinations)
  180. @defer.inlineCallbacks
  181. def handle_room_events(events):
  182. for event in events:
  183. yield handle_event(event)
  184. events_by_room = {}
  185. for event in events:
  186. events_by_room.setdefault(event.room_id, []).append(event)
  187. yield logcontext.make_deferred_yieldable(defer.gatherResults(
  188. [
  189. logcontext.run_in_background(handle_room_events, evs)
  190. for evs in itervalues(events_by_room)
  191. ],
  192. consumeErrors=True
  193. ))
  194. yield self.store.update_federation_out_pos(
  195. "events", next_token
  196. )
  197. if events:
  198. now = self.clock.time_msec()
  199. ts = yield self.store.get_received_ts(events[-1].event_id)
  200. synapse.metrics.event_processing_lag.labels(
  201. "federation_sender").set(now - ts)
  202. synapse.metrics.event_processing_last_ts.labels(
  203. "federation_sender").set(ts)
  204. events_processed_counter.inc(len(events))
  205. synapse.metrics.event_processing_positions.labels(
  206. "federation_sender").set(next_token)
  207. finally:
  208. self._is_processing = False
  209. def _send_pdu(self, pdu, destinations):
  210. # We loop through all destinations to see whether we already have
  211. # a transaction in progress. If we do, stick it in the pending_pdus
  212. # table and we'll get back to it later.
  213. order = self._order
  214. self._order += 1
  215. destinations = set(destinations)
  216. destinations = set(
  217. dest for dest in destinations if self.can_send_to(dest)
  218. )
  219. logger.debug("Sending to: %s", str(destinations))
  220. if not destinations:
  221. return
  222. sent_pdus_destination_dist.inc(len(destinations))
  223. for destination in destinations:
  224. self.pending_pdus_by_dest.setdefault(destination, []).append(
  225. (pdu, order)
  226. )
  227. self._attempt_new_transaction(destination)
  228. @logcontext.preserve_fn # the caller should not yield on this
  229. @defer.inlineCallbacks
  230. def send_presence(self, states):
  231. """Send the new presence states to the appropriate destinations.
  232. This actually queues up the presence states ready for sending and
  233. triggers a background task to process them and send out the transactions.
  234. Args:
  235. states (list(UserPresenceState))
  236. """
  237. # First we queue up the new presence by user ID, so multiple presence
  238. # updates in quick successtion are correctly handled
  239. # We only want to send presence for our own users, so lets always just
  240. # filter here just in case.
  241. self.pending_presence.update({
  242. state.user_id: state for state in states
  243. if self.is_mine_id(state.user_id)
  244. })
  245. # We then handle the new pending presence in batches, first figuring
  246. # out the destinations we need to send each state to and then poking it
  247. # to attempt a new transaction. We linearize this so that we don't
  248. # accidentally mess up the ordering and send multiple presence updates
  249. # in the wrong order
  250. if self._processing_pending_presence:
  251. return
  252. self._processing_pending_presence = True
  253. try:
  254. while True:
  255. states_map = self.pending_presence
  256. self.pending_presence = {}
  257. if not states_map:
  258. break
  259. yield self._process_presence_inner(list(states_map.values()))
  260. except Exception:
  261. logger.exception("Error sending presence states to servers")
  262. finally:
  263. self._processing_pending_presence = False
  264. @measure_func("txnqueue._process_presence")
  265. @defer.inlineCallbacks
  266. def _process_presence_inner(self, states):
  267. """Given a list of states populate self.pending_presence_by_dest and
  268. poke to send a new transaction to each destination
  269. Args:
  270. states (list(UserPresenceState))
  271. """
  272. hosts_and_states = yield get_interested_remotes(self.store, states, self.state)
  273. for destinations, states in hosts_and_states:
  274. for destination in destinations:
  275. if not self.can_send_to(destination):
  276. continue
  277. self.pending_presence_by_dest.setdefault(
  278. destination, {}
  279. ).update({
  280. state.user_id: state for state in states
  281. })
  282. self._attempt_new_transaction(destination)
  283. def send_edu(self, destination, edu_type, content, key=None):
  284. edu = Edu(
  285. origin=self.server_name,
  286. destination=destination,
  287. edu_type=edu_type,
  288. content=content,
  289. )
  290. if not self.can_send_to(destination):
  291. return
  292. sent_edus_counter.inc()
  293. if key:
  294. self.pending_edus_keyed_by_dest.setdefault(
  295. destination, {}
  296. )[(edu.edu_type, key)] = edu
  297. else:
  298. self.pending_edus_by_dest.setdefault(destination, []).append(edu)
  299. self._attempt_new_transaction(destination)
  300. def send_failure(self, failure, destination):
  301. if destination == self.server_name or destination == "localhost":
  302. return
  303. if not self.can_send_to(destination):
  304. return
  305. self.pending_failures_by_dest.setdefault(
  306. destination, []
  307. ).append(failure)
  308. self._attempt_new_transaction(destination)
  309. def send_device_messages(self, destination):
  310. if destination == self.server_name or destination == "localhost":
  311. return
  312. if not self.can_send_to(destination):
  313. return
  314. self._attempt_new_transaction(destination)
  315. def get_current_token(self):
  316. return 0
  317. def _attempt_new_transaction(self, destination):
  318. """Try to start a new transaction to this destination
  319. If there is already a transaction in progress to this destination,
  320. returns immediately. Otherwise kicks off the process of sending a
  321. transaction in the background.
  322. Args:
  323. destination (str):
  324. Returns:
  325. None
  326. """
  327. # list of (pending_pdu, deferred, order)
  328. if destination in self.pending_transactions:
  329. # XXX: pending_transactions can get stuck on by a never-ending
  330. # request at which point pending_pdus_by_dest just keeps growing.
  331. # we need application-layer timeouts of some flavour of these
  332. # requests
  333. logger.debug(
  334. "TX [%s] Transaction already in progress",
  335. destination
  336. )
  337. return
  338. logger.debug("TX [%s] Starting transaction loop", destination)
  339. # Drop the logcontext before starting the transaction. It doesn't
  340. # really make sense to log all the outbound transactions against
  341. # whatever path led us to this point: that's pretty arbitrary really.
  342. #
  343. # (this also means we can fire off _perform_transaction without
  344. # yielding)
  345. with logcontext.PreserveLoggingContext():
  346. self._transaction_transmission_loop(destination)
  347. @defer.inlineCallbacks
  348. def _transaction_transmission_loop(self, destination):
  349. pending_pdus = []
  350. try:
  351. self.pending_transactions[destination] = 1
  352. # This will throw if we wouldn't retry. We do this here so we fail
  353. # quickly, but we will later check this again in the http client,
  354. # hence why we throw the result away.
  355. yield get_retry_limiter(destination, self.clock, self.store)
  356. # XXX: what's this for?
  357. yield run_on_reactor()
  358. pending_pdus = []
  359. while True:
  360. device_message_edus, device_stream_id, dev_list_id = (
  361. yield self._get_new_device_messages(destination)
  362. )
  363. # BEGIN CRITICAL SECTION
  364. #
  365. # In order to avoid a race condition, we need to make sure that
  366. # the following code (from popping the queues up to the point
  367. # where we decide if we actually have any pending messages) is
  368. # atomic - otherwise new PDUs or EDUs might arrive in the
  369. # meantime, but not get sent because we hold the
  370. # pending_transactions flag.
  371. pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
  372. pending_edus = self.pending_edus_by_dest.pop(destination, [])
  373. pending_presence = self.pending_presence_by_dest.pop(destination, {})
  374. pending_failures = self.pending_failures_by_dest.pop(destination, [])
  375. pending_edus.extend(
  376. self.pending_edus_keyed_by_dest.pop(destination, {}).values()
  377. )
  378. pending_edus.extend(device_message_edus)
  379. if pending_presence:
  380. pending_edus.append(
  381. Edu(
  382. origin=self.server_name,
  383. destination=destination,
  384. edu_type="m.presence",
  385. content={
  386. "push": [
  387. format_user_presence_state(
  388. presence, self.clock.time_msec()
  389. )
  390. for presence in pending_presence.values()
  391. ]
  392. },
  393. )
  394. )
  395. if pending_pdus:
  396. logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
  397. destination, len(pending_pdus))
  398. if not pending_pdus and not pending_edus and not pending_failures:
  399. logger.debug("TX [%s] Nothing to send", destination)
  400. self.last_device_stream_id_by_dest[destination] = (
  401. device_stream_id
  402. )
  403. return
  404. # END CRITICAL SECTION
  405. success = yield self._send_new_transaction(
  406. destination, pending_pdus, pending_edus, pending_failures,
  407. )
  408. if success:
  409. sent_transactions_counter.inc()
  410. # Remove the acknowledged device messages from the database
  411. # Only bother if we actually sent some device messages
  412. if device_message_edus:
  413. yield self.store.delete_device_msgs_for_remote(
  414. destination, device_stream_id
  415. )
  416. logger.info("Marking as sent %r %r", destination, dev_list_id)
  417. yield self.store.mark_as_sent_devices_by_remote(
  418. destination, dev_list_id
  419. )
  420. self.last_device_stream_id_by_dest[destination] = device_stream_id
  421. self.last_device_list_stream_id_by_dest[destination] = dev_list_id
  422. else:
  423. break
  424. except NotRetryingDestination as e:
  425. logger.debug(
  426. "TX [%s] not ready for retry yet (next retry at %s) - "
  427. "dropping transaction for now",
  428. destination,
  429. datetime.datetime.fromtimestamp(
  430. (e.retry_last_ts + e.retry_interval) / 1000.0
  431. ),
  432. )
  433. except FederationDeniedError as e:
  434. logger.info(e)
  435. except Exception as e:
  436. logger.warn(
  437. "TX [%s] Failed to send transaction: %s",
  438. destination,
  439. e,
  440. )
  441. for p, _ in pending_pdus:
  442. logger.info("Failed to send event %s to %s", p.event_id,
  443. destination)
  444. finally:
  445. # We want to be *very* sure we delete this after we stop processing
  446. self.pending_transactions.pop(destination, None)
  447. @defer.inlineCallbacks
  448. def _get_new_device_messages(self, destination):
  449. last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0)
  450. to_device_stream_id = self.store.get_to_device_stream_token()
  451. contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
  452. destination, last_device_stream_id, to_device_stream_id
  453. )
  454. edus = [
  455. Edu(
  456. origin=self.server_name,
  457. destination=destination,
  458. edu_type="m.direct_to_device",
  459. content=content,
  460. )
  461. for content in contents
  462. ]
  463. last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0)
  464. now_stream_id, results = yield self.store.get_devices_by_remote(
  465. destination, last_device_list
  466. )
  467. edus.extend(
  468. Edu(
  469. origin=self.server_name,
  470. destination=destination,
  471. edu_type="m.device_list_update",
  472. content=content,
  473. )
  474. for content in results
  475. )
  476. defer.returnValue((edus, stream_id, now_stream_id))
  477. @measure_func("_send_new_transaction")
  478. @defer.inlineCallbacks
  479. def _send_new_transaction(self, destination, pending_pdus, pending_edus,
  480. pending_failures):
  481. # Sort based on the order field
  482. pending_pdus.sort(key=lambda t: t[1])
  483. pdus = [x[0] for x in pending_pdus]
  484. edus = pending_edus
  485. failures = [x.get_dict() for x in pending_failures]
  486. success = True
  487. logger.debug("TX [%s] _attempt_new_transaction", destination)
  488. txn_id = str(self._next_txn_id)
  489. logger.debug(
  490. "TX [%s] {%s} Attempting new transaction"
  491. " (pdus: %d, edus: %d, failures: %d)",
  492. destination, txn_id,
  493. len(pdus),
  494. len(edus),
  495. len(failures)
  496. )
  497. logger.debug("TX [%s] Persisting transaction...", destination)
  498. transaction = Transaction.create_new(
  499. origin_server_ts=int(self.clock.time_msec()),
  500. transaction_id=txn_id,
  501. origin=self.server_name,
  502. destination=destination,
  503. pdus=pdus,
  504. edus=edus,
  505. pdu_failures=failures,
  506. )
  507. self._next_txn_id += 1
  508. yield self.transaction_actions.prepare_to_send(transaction)
  509. logger.debug("TX [%s] Persisted transaction", destination)
  510. logger.info(
  511. "TX [%s] {%s} Sending transaction [%s],"
  512. " (PDUs: %d, EDUs: %d, failures: %d)",
  513. destination, txn_id,
  514. transaction.transaction_id,
  515. len(pdus),
  516. len(edus),
  517. len(failures),
  518. )
  519. # Actually send the transaction
  520. # FIXME (erikj): This is a bit of a hack to make the Pdu age
  521. # keys work
  522. def json_data_cb():
  523. data = transaction.get_dict()
  524. now = int(self.clock.time_msec())
  525. if "pdus" in data:
  526. for p in data["pdus"]:
  527. if "age_ts" in p:
  528. unsigned = p.setdefault("unsigned", {})
  529. unsigned["age"] = now - int(p["age_ts"])
  530. del p["age_ts"]
  531. return data
  532. try:
  533. response = yield self.transport_layer.send_transaction(
  534. transaction, json_data_cb
  535. )
  536. code = 200
  537. if response:
  538. for e_id, r in response.get("pdus", {}).items():
  539. if "error" in r:
  540. logger.warn(
  541. "Transaction returned error for %s: %s",
  542. e_id, r,
  543. )
  544. except HttpResponseException as e:
  545. code = e.code
  546. response = e.response
  547. if e.code in (401, 404, 429) or 500 <= e.code:
  548. logger.info(
  549. "TX [%s] {%s} got %d response",
  550. destination, txn_id, code
  551. )
  552. raise e
  553. logger.info(
  554. "TX [%s] {%s} got %d response",
  555. destination, txn_id, code
  556. )
  557. logger.debug("TX [%s] Sent transaction", destination)
  558. logger.debug("TX [%s] Marking as delivered...", destination)
  559. yield self.transaction_actions.delivered(
  560. transaction, code, response
  561. )
  562. logger.debug("TX [%s] Marked as delivered", destination)
  563. if code != 200:
  564. for p in pdus:
  565. logger.info(
  566. "Failed to send event %s to %s", p.event_id, destination
  567. )
  568. success = False
  569. defer.returnValue(success)