transaction_queue.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer
  16. from .persistence import TransactionActions
  17. from .units import Transaction, Edu
  18. from synapse.api.errors import HttpResponseException
  19. from synapse.util.async import run_on_reactor
  20. from synapse.util.logcontext import preserve_context_over_fn
  21. from synapse.util.retryutils import (
  22. get_retry_limiter, NotRetryingDestination,
  23. )
  24. from synapse.util.metrics import measure_func
  25. from synapse.types import get_domain_from_id
  26. from synapse.handlers.presence import format_user_presence_state
  27. import synapse.metrics
  28. import logging
  29. logger = logging.getLogger(__name__)
  30. metrics = synapse.metrics.get_metrics_for(__name__)
  31. client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
  32. sent_pdus_destination_dist = client_metrics.register_distribution(
  33. "sent_pdu_destinations"
  34. )
  35. sent_edus_counter = client_metrics.register_counter("sent_edus")
  36. class TransactionQueue(object):
  37. """This class makes sure we only have one transaction in flight at
  38. a time for a given destination.
  39. It batches pending PDUs into single transactions.
  40. """
  41. def __init__(self, hs):
  42. self.server_name = hs.hostname
  43. self.store = hs.get_datastore()
  44. self.state = hs.get_state_handler()
  45. self.transaction_actions = TransactionActions(self.store)
  46. self.transport_layer = hs.get_federation_transport_client()
  47. self.clock = hs.get_clock()
  48. self.is_mine_id = hs.is_mine_id
  49. # Is a mapping from destinations -> deferreds. Used to keep track
  50. # of which destinations have transactions in flight and when they are
  51. # done
  52. self.pending_transactions = {}
  53. metrics.register_callback(
  54. "pending_destinations",
  55. lambda: len(self.pending_transactions),
  56. )
  57. # Is a mapping from destination -> list of
  58. # tuple(pending pdus, deferred, order)
  59. self.pending_pdus_by_dest = pdus = {}
  60. # destination -> list of tuple(edu, deferred)
  61. self.pending_edus_by_dest = edus = {}
  62. # Presence needs to be separate as we send single aggragate EDUs
  63. self.pending_presence_by_dest = presence = {}
  64. self.pending_edus_keyed_by_dest = edus_keyed = {}
  65. metrics.register_callback(
  66. "pending_pdus",
  67. lambda: sum(map(len, pdus.values())),
  68. )
  69. metrics.register_callback(
  70. "pending_edus",
  71. lambda: (
  72. sum(map(len, edus.values()))
  73. + sum(map(len, presence.values()))
  74. + sum(map(len, edus_keyed.values()))
  75. ),
  76. )
  77. # destination -> list of tuple(failure, deferred)
  78. self.pending_failures_by_dest = {}
  79. # destination -> stream_id of last successfully sent to-device message.
  80. # NB: may be a long or an int.
  81. self.last_device_stream_id_by_dest = {}
  82. # destination -> stream_id of last successfully sent device list
  83. # update.
  84. self.last_device_list_stream_id_by_dest = {}
  85. # HACK to get unique tx id
  86. self._next_txn_id = int(self.clock.time_msec())
  87. self._order = 1
  88. self._is_processing = False
  89. self._last_poked_id = -1
  90. def can_send_to(self, destination):
  91. """Can we send messages to the given server?
  92. We can't send messages to ourselves. If we are running on localhost
  93. then we can only federation with other servers running on localhost.
  94. Otherwise we only federate with servers on a public domain.
  95. Args:
  96. destination(str): The server we are possibly trying to send to.
  97. Returns:
  98. bool: True if we can send to the server.
  99. """
  100. if destination == self.server_name:
  101. return False
  102. if self.server_name.startswith("localhost"):
  103. return destination.startswith("localhost")
  104. else:
  105. return not destination.startswith("localhost")
  106. @defer.inlineCallbacks
  107. def notify_new_events(self, current_id):
  108. """This gets called when we have some new events we might want to
  109. send out to other servers.
  110. """
  111. self._last_poked_id = max(current_id, self._last_poked_id)
  112. if self._is_processing:
  113. return
  114. try:
  115. self._is_processing = True
  116. while True:
  117. last_token = yield self.store.get_federation_out_pos("events")
  118. next_token, events = yield self.store.get_all_new_events_stream(
  119. last_token, self._last_poked_id, limit=20,
  120. )
  121. logger.debug("Handling %s -> %s", last_token, next_token)
  122. if not events and next_token >= self._last_poked_id:
  123. break
  124. for event in events:
  125. # Only send events for this server.
  126. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
  127. is_mine = self.is_mine_id(event.event_id)
  128. if not is_mine and send_on_behalf_of is None:
  129. continue
  130. # Get the state from before the event.
  131. # We need to make sure that this is the state from before
  132. # the event and not from after it.
  133. # Otherwise if the last member on a server in a room is
  134. # banned then it won't receive the event because it won't
  135. # be in the room after the ban.
  136. users_in_room = yield self.state.get_current_user_in_room(
  137. event.room_id, latest_event_ids=[
  138. prev_id for prev_id, _ in event.prev_events
  139. ],
  140. )
  141. destinations = set(
  142. get_domain_from_id(user_id) for user_id in users_in_room
  143. )
  144. if send_on_behalf_of is not None:
  145. # If we are sending the event on behalf of another server
  146. # then it already has the event and there is no reason to
  147. # send the event to it.
  148. destinations.discard(send_on_behalf_of)
  149. logger.debug("Sending %s to %r", event, destinations)
  150. self._send_pdu(event, destinations)
  151. yield self.store.update_federation_out_pos(
  152. "events", next_token
  153. )
  154. finally:
  155. self._is_processing = False
  156. def _send_pdu(self, pdu, destinations):
  157. # We loop through all destinations to see whether we already have
  158. # a transaction in progress. If we do, stick it in the pending_pdus
  159. # table and we'll get back to it later.
  160. order = self._order
  161. self._order += 1
  162. destinations = set(destinations)
  163. destinations = set(
  164. dest for dest in destinations if self.can_send_to(dest)
  165. )
  166. logger.debug("Sending to: %s", str(destinations))
  167. if not destinations:
  168. return
  169. sent_pdus_destination_dist.inc_by(len(destinations))
  170. for destination in destinations:
  171. self.pending_pdus_by_dest.setdefault(destination, []).append(
  172. (pdu, order)
  173. )
  174. preserve_context_over_fn(
  175. self._attempt_new_transaction, destination
  176. )
  177. def send_presence(self, destination, states):
  178. if not self.can_send_to(destination):
  179. return
  180. self.pending_presence_by_dest.setdefault(destination, {}).update({
  181. state.user_id: state for state in states
  182. })
  183. preserve_context_over_fn(
  184. self._attempt_new_transaction, destination
  185. )
  186. def send_edu(self, destination, edu_type, content, key=None):
  187. edu = Edu(
  188. origin=self.server_name,
  189. destination=destination,
  190. edu_type=edu_type,
  191. content=content,
  192. )
  193. if not self.can_send_to(destination):
  194. return
  195. sent_edus_counter.inc()
  196. if key:
  197. self.pending_edus_keyed_by_dest.setdefault(
  198. destination, {}
  199. )[(edu.edu_type, key)] = edu
  200. else:
  201. self.pending_edus_by_dest.setdefault(destination, []).append(edu)
  202. preserve_context_over_fn(
  203. self._attempt_new_transaction, destination
  204. )
  205. def send_failure(self, failure, destination):
  206. if destination == self.server_name or destination == "localhost":
  207. return
  208. if not self.can_send_to(destination):
  209. return
  210. self.pending_failures_by_dest.setdefault(
  211. destination, []
  212. ).append(failure)
  213. preserve_context_over_fn(
  214. self._attempt_new_transaction, destination
  215. )
  216. def send_device_messages(self, destination):
  217. if destination == self.server_name or destination == "localhost":
  218. return
  219. if not self.can_send_to(destination):
  220. return
  221. preserve_context_over_fn(
  222. self._attempt_new_transaction, destination
  223. )
  224. def get_current_token(self):
  225. return 0
  226. @defer.inlineCallbacks
  227. def _attempt_new_transaction(self, destination):
  228. # list of (pending_pdu, deferred, order)
  229. if destination in self.pending_transactions:
  230. # XXX: pending_transactions can get stuck on by a never-ending
  231. # request at which point pending_pdus_by_dest just keeps growing.
  232. # we need application-layer timeouts of some flavour of these
  233. # requests
  234. logger.debug(
  235. "TX [%s] Transaction already in progress",
  236. destination
  237. )
  238. return
  239. try:
  240. self.pending_transactions[destination] = 1
  241. # XXX: what's this for?
  242. yield run_on_reactor()
  243. while True:
  244. limiter = yield get_retry_limiter(
  245. destination,
  246. self.clock,
  247. self.store,
  248. backoff_on_404=True, # If we get a 404 the other side has gone
  249. )
  250. device_message_edus, device_stream_id, dev_list_id = (
  251. yield self._get_new_device_messages(destination)
  252. )
  253. # BEGIN CRITICAL SECTION
  254. #
  255. # In order to avoid a race condition, we need to make sure that
  256. # the following code (from popping the queues up to the point
  257. # where we decide if we actually have any pending messages) is
  258. # atomic - otherwise new PDUs or EDUs might arrive in the
  259. # meantime, but not get sent because we hold the
  260. # pending_transactions flag.
  261. pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
  262. pending_edus = self.pending_edus_by_dest.pop(destination, [])
  263. pending_presence = self.pending_presence_by_dest.pop(destination, {})
  264. pending_failures = self.pending_failures_by_dest.pop(destination, [])
  265. pending_edus.extend(
  266. self.pending_edus_keyed_by_dest.pop(destination, {}).values()
  267. )
  268. pending_edus.extend(device_message_edus)
  269. if pending_presence:
  270. pending_edus.append(
  271. Edu(
  272. origin=self.server_name,
  273. destination=destination,
  274. edu_type="m.presence",
  275. content={
  276. "push": [
  277. format_user_presence_state(
  278. presence, self.clock.time_msec()
  279. )
  280. for presence in pending_presence.values()
  281. ]
  282. },
  283. )
  284. )
  285. if pending_pdus:
  286. logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
  287. destination, len(pending_pdus))
  288. if not pending_pdus and not pending_edus and not pending_failures:
  289. logger.debug("TX [%s] Nothing to send", destination)
  290. self.last_device_stream_id_by_dest[destination] = (
  291. device_stream_id
  292. )
  293. return
  294. # END CRITICAL SECTION
  295. success = yield self._send_new_transaction(
  296. destination, pending_pdus, pending_edus, pending_failures,
  297. limiter=limiter,
  298. )
  299. if success:
  300. # Remove the acknowledged device messages from the database
  301. # Only bother if we actually sent some device messages
  302. if device_message_edus:
  303. yield self.store.delete_device_msgs_for_remote(
  304. destination, device_stream_id
  305. )
  306. logger.info("Marking as sent %r %r", destination, dev_list_id)
  307. yield self.store.mark_as_sent_devices_by_remote(
  308. destination, dev_list_id
  309. )
  310. self.last_device_stream_id_by_dest[destination] = device_stream_id
  311. self.last_device_list_stream_id_by_dest[destination] = dev_list_id
  312. else:
  313. break
  314. except NotRetryingDestination:
  315. logger.debug(
  316. "TX [%s] not ready for retry yet - "
  317. "dropping transaction for now",
  318. destination,
  319. )
  320. finally:
  321. # We want to be *very* sure we delete this after we stop processing
  322. self.pending_transactions.pop(destination, None)
  323. @defer.inlineCallbacks
  324. def _get_new_device_messages(self, destination):
  325. last_device_stream_id = self.last_device_stream_id_by_dest.get(destination, 0)
  326. to_device_stream_id = self.store.get_to_device_stream_token()
  327. contents, stream_id = yield self.store.get_new_device_msgs_for_remote(
  328. destination, last_device_stream_id, to_device_stream_id
  329. )
  330. edus = [
  331. Edu(
  332. origin=self.server_name,
  333. destination=destination,
  334. edu_type="m.direct_to_device",
  335. content=content,
  336. )
  337. for content in contents
  338. ]
  339. last_device_list = self.last_device_list_stream_id_by_dest.get(destination, 0)
  340. now_stream_id, results = yield self.store.get_devices_by_remote(
  341. destination, last_device_list
  342. )
  343. edus.extend(
  344. Edu(
  345. origin=self.server_name,
  346. destination=destination,
  347. edu_type="m.device_list_update",
  348. content=content,
  349. )
  350. for content in results
  351. )
  352. defer.returnValue((edus, stream_id, now_stream_id))
  353. @measure_func("_send_new_transaction")
  354. @defer.inlineCallbacks
  355. def _send_new_transaction(self, destination, pending_pdus, pending_edus,
  356. pending_failures, limiter):
  357. # Sort based on the order field
  358. pending_pdus.sort(key=lambda t: t[1])
  359. pdus = [x[0] for x in pending_pdus]
  360. edus = pending_edus
  361. failures = [x.get_dict() for x in pending_failures]
  362. success = True
  363. try:
  364. logger.debug("TX [%s] _attempt_new_transaction", destination)
  365. txn_id = str(self._next_txn_id)
  366. logger.debug(
  367. "TX [%s] {%s} Attempting new transaction"
  368. " (pdus: %d, edus: %d, failures: %d)",
  369. destination, txn_id,
  370. len(pdus),
  371. len(edus),
  372. len(failures)
  373. )
  374. logger.debug("TX [%s] Persisting transaction...", destination)
  375. transaction = Transaction.create_new(
  376. origin_server_ts=int(self.clock.time_msec()),
  377. transaction_id=txn_id,
  378. origin=self.server_name,
  379. destination=destination,
  380. pdus=pdus,
  381. edus=edus,
  382. pdu_failures=failures,
  383. )
  384. self._next_txn_id += 1
  385. yield self.transaction_actions.prepare_to_send(transaction)
  386. logger.debug("TX [%s] Persisted transaction", destination)
  387. logger.info(
  388. "TX [%s] {%s} Sending transaction [%s],"
  389. " (PDUs: %d, EDUs: %d, failures: %d)",
  390. destination, txn_id,
  391. transaction.transaction_id,
  392. len(pdus),
  393. len(edus),
  394. len(failures),
  395. )
  396. with limiter:
  397. # Actually send the transaction
  398. # FIXME (erikj): This is a bit of a hack to make the Pdu age
  399. # keys work
  400. def json_data_cb():
  401. data = transaction.get_dict()
  402. now = int(self.clock.time_msec())
  403. if "pdus" in data:
  404. for p in data["pdus"]:
  405. if "age_ts" in p:
  406. unsigned = p.setdefault("unsigned", {})
  407. unsigned["age"] = now - int(p["age_ts"])
  408. del p["age_ts"]
  409. return data
  410. try:
  411. response = yield self.transport_layer.send_transaction(
  412. transaction, json_data_cb
  413. )
  414. code = 200
  415. if response:
  416. for e_id, r in response.get("pdus", {}).items():
  417. if "error" in r:
  418. logger.warn(
  419. "Transaction returned error for %s: %s",
  420. e_id, r,
  421. )
  422. except HttpResponseException as e:
  423. code = e.code
  424. response = e.response
  425. if e.code in (401, 404, 429) or 500 <= e.code:
  426. logger.info(
  427. "TX [%s] {%s} got %d response",
  428. destination, txn_id, code
  429. )
  430. raise e
  431. logger.info(
  432. "TX [%s] {%s} got %d response",
  433. destination, txn_id, code
  434. )
  435. logger.debug("TX [%s] Sent transaction", destination)
  436. logger.debug("TX [%s] Marking as delivered...", destination)
  437. yield self.transaction_actions.delivered(
  438. transaction, code, response
  439. )
  440. logger.debug("TX [%s] Marked as delivered", destination)
  441. if code != 200:
  442. for p in pdus:
  443. logger.info(
  444. "Failed to send event %s to %s", p.event_id, destination
  445. )
  446. success = False
  447. except RuntimeError as e:
  448. # We capture this here as there as nothing actually listens
  449. # for this finishing functions deferred.
  450. logger.warn(
  451. "TX [%s] Problem in _attempt_transaction: %s",
  452. destination,
  453. e,
  454. )
  455. success = False
  456. for p in pdus:
  457. logger.info("Failed to send event %s to %s", p.event_id, destination)
  458. except Exception as e:
  459. # We capture this here as there as nothing actually listens
  460. # for this finishing functions deferred.
  461. logger.warn(
  462. "TX [%s] Problem in _attempt_transaction: %s",
  463. destination,
  464. e,
  465. )
  466. success = False
  467. for p in pdus:
  468. logger.info("Failed to send event %s to %s", p.event_id, destination)
  469. defer.returnValue(success)