transaction_queue.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer
  16. from .persistence import TransactionActions
  17. from .units import Transaction
  18. from synapse.api.errors import HttpResponseException
  19. from synapse.util.async import run_on_reactor
  20. from synapse.util.logutils import log_function
  21. from synapse.util.logcontext import PreserveLoggingContext
  22. from synapse.util.retryutils import (
  23. get_retry_limiter, NotRetryingDestination,
  24. )
  25. import synapse.metrics
  26. import logging
  27. logger = logging.getLogger(__name__)
  28. metrics = synapse.metrics.get_metrics_for(__name__)
  29. class TransactionQueue(object):
  30. """This class makes sure we only have one transaction in flight at
  31. a time for a given destination.
  32. It batches pending PDUs into single transactions.
  33. """
  34. def __init__(self, hs, transport_layer):
  35. self.server_name = hs.hostname
  36. self.store = hs.get_datastore()
  37. self.transaction_actions = TransactionActions(self.store)
  38. self.transport_layer = transport_layer
  39. self._clock = hs.get_clock()
  40. # Is a mapping from destinations -> deferreds. Used to keep track
  41. # of which destinations have transactions in flight and when they are
  42. # done
  43. self.pending_transactions = {}
  44. metrics.register_callback(
  45. "pending_destinations",
  46. lambda: len(self.pending_transactions),
  47. )
  48. # Is a mapping from destination -> list of
  49. # tuple(pending pdus, deferred, order)
  50. self.pending_pdus_by_dest = pdus = {}
  51. # destination -> list of tuple(edu, deferred)
  52. self.pending_edus_by_dest = edus = {}
  53. metrics.register_callback(
  54. "pending_pdus",
  55. lambda: sum(map(len, pdus.values())),
  56. )
  57. metrics.register_callback(
  58. "pending_edus",
  59. lambda: sum(map(len, edus.values())),
  60. )
  61. # destination -> list of tuple(failure, deferred)
  62. self.pending_failures_by_dest = {}
  63. # HACK to get unique tx id
  64. self._next_txn_id = int(self._clock.time_msec())
  65. def can_send_to(self, destination):
  66. """Can we send messages to the given server?
  67. We can't send messages to ourselves. If we are running on localhost
  68. then we can only federation with other servers running on localhost.
  69. Otherwise we only federate with servers on a public domain.
  70. Args:
  71. destination(str): The server we are possibly trying to send to.
  72. Returns:
  73. bool: True if we can send to the server.
  74. """
  75. if destination == self.server_name:
  76. return False
  77. if self.server_name.startswith("localhost"):
  78. return destination.startswith("localhost")
  79. else:
  80. return not destination.startswith("localhost")
  81. def enqueue_pdu(self, pdu, destinations, order):
  82. # We loop through all destinations to see whether we already have
  83. # a transaction in progress. If we do, stick it in the pending_pdus
  84. # table and we'll get back to it later.
  85. destinations = set(destinations)
  86. destinations = set(
  87. dest for dest in destinations if self.can_send_to(dest)
  88. )
  89. logger.debug("Sending to: %s", str(destinations))
  90. if not destinations:
  91. return
  92. deferreds = []
  93. for destination in destinations:
  94. deferred = defer.Deferred()
  95. self.pending_pdus_by_dest.setdefault(destination, []).append(
  96. (pdu, deferred, order)
  97. )
  98. def chain(failure):
  99. if not deferred.called:
  100. deferred.errback(failure)
  101. def log_failure(f):
  102. logger.warn("Failed to send pdu to %s: %s", destination, f.value)
  103. deferred.addErrback(log_failure)
  104. with PreserveLoggingContext():
  105. self._attempt_new_transaction(destination).addErrback(chain)
  106. deferreds.append(deferred)
  107. # NO inlineCallbacks
  108. def enqueue_edu(self, edu):
  109. destination = edu.destination
  110. if not self.can_send_to(destination):
  111. return
  112. deferred = defer.Deferred()
  113. self.pending_edus_by_dest.setdefault(destination, []).append(
  114. (edu, deferred)
  115. )
  116. def chain(failure):
  117. if not deferred.called:
  118. deferred.errback(failure)
  119. def log_failure(f):
  120. logger.warn("Failed to send edu to %s: %s", destination, f.value)
  121. deferred.addErrback(log_failure)
  122. with PreserveLoggingContext():
  123. self._attempt_new_transaction(destination).addErrback(chain)
  124. return deferred
  125. @defer.inlineCallbacks
  126. def enqueue_failure(self, failure, destination):
  127. if destination == self.server_name or destination == "localhost":
  128. return
  129. deferred = defer.Deferred()
  130. if not self.can_send_to(destination):
  131. return
  132. self.pending_failures_by_dest.setdefault(
  133. destination, []
  134. ).append(
  135. (failure, deferred)
  136. )
  137. def chain(f):
  138. if not deferred.called:
  139. deferred.errback(f)
  140. def log_failure(f):
  141. logger.warn("Failed to send failure to %s: %s", destination, f.value)
  142. deferred.addErrback(log_failure)
  143. with PreserveLoggingContext():
  144. self._attempt_new_transaction(destination).addErrback(chain)
  145. yield deferred
  146. @defer.inlineCallbacks
  147. @log_function
  148. def _attempt_new_transaction(self, destination):
  149. yield run_on_reactor()
  150. # list of (pending_pdu, deferred, order)
  151. if destination in self.pending_transactions:
  152. # XXX: pending_transactions can get stuck on by a never-ending
  153. # request at which point pending_pdus_by_dest just keeps growing.
  154. # we need application-layer timeouts of some flavour of these
  155. # requests
  156. logger.debug(
  157. "TX [%s] Transaction already in progress",
  158. destination
  159. )
  160. return
  161. pending_pdus = self.pending_pdus_by_dest.pop(destination, [])
  162. pending_edus = self.pending_edus_by_dest.pop(destination, [])
  163. pending_failures = self.pending_failures_by_dest.pop(destination, [])
  164. if pending_pdus:
  165. logger.debug("TX [%s] len(pending_pdus_by_dest[dest]) = %d",
  166. destination, len(pending_pdus))
  167. if not pending_pdus and not pending_edus and not pending_failures:
  168. logger.debug("TX [%s] Nothing to send", destination)
  169. return
  170. try:
  171. self.pending_transactions[destination] = 1
  172. logger.debug("TX [%s] _attempt_new_transaction", destination)
  173. # Sort based on the order field
  174. pending_pdus.sort(key=lambda t: t[2])
  175. pdus = [x[0] for x in pending_pdus]
  176. edus = [x[0] for x in pending_edus]
  177. failures = [x[0].get_dict() for x in pending_failures]
  178. deferreds = [
  179. x[1]
  180. for x in pending_pdus + pending_edus + pending_failures
  181. ]
  182. txn_id = str(self._next_txn_id)
  183. limiter = yield get_retry_limiter(
  184. destination,
  185. self._clock,
  186. self.store,
  187. )
  188. logger.debug(
  189. "TX [%s] {%s} Attempting new transaction"
  190. " (pdus: %d, edus: %d, failures: %d)",
  191. destination, txn_id,
  192. len(pending_pdus),
  193. len(pending_edus),
  194. len(pending_failures)
  195. )
  196. logger.debug("TX [%s] Persisting transaction...", destination)
  197. transaction = Transaction.create_new(
  198. origin_server_ts=int(self._clock.time_msec()),
  199. transaction_id=txn_id,
  200. origin=self.server_name,
  201. destination=destination,
  202. pdus=pdus,
  203. edus=edus,
  204. pdu_failures=failures,
  205. )
  206. self._next_txn_id += 1
  207. yield self.transaction_actions.prepare_to_send(transaction)
  208. logger.debug("TX [%s] Persisted transaction", destination)
  209. logger.info(
  210. "TX [%s] {%s} Sending transaction [%s],"
  211. " (PDUs: %d, EDUs: %d, failures: %d)",
  212. destination, txn_id,
  213. transaction.transaction_id,
  214. len(pending_pdus),
  215. len(pending_edus),
  216. len(pending_failures),
  217. )
  218. with limiter:
  219. # Actually send the transaction
  220. # FIXME (erikj): This is a bit of a hack to make the Pdu age
  221. # keys work
  222. def json_data_cb():
  223. data = transaction.get_dict()
  224. now = int(self._clock.time_msec())
  225. if "pdus" in data:
  226. for p in data["pdus"]:
  227. if "age_ts" in p:
  228. unsigned = p.setdefault("unsigned", {})
  229. unsigned["age"] = now - int(p["age_ts"])
  230. del p["age_ts"]
  231. return data
  232. try:
  233. response = yield self.transport_layer.send_transaction(
  234. transaction, json_data_cb
  235. )
  236. code = 200
  237. if response:
  238. for e_id, r in response.get("pdus", {}).items():
  239. if "error" in r:
  240. logger.warn(
  241. "Transaction returned error for %s: %s",
  242. e_id, r,
  243. )
  244. except HttpResponseException as e:
  245. code = e.code
  246. response = e.response
  247. logger.info(
  248. "TX [%s] {%s} got %d response",
  249. destination, txn_id, code
  250. )
  251. logger.debug("TX [%s] Sent transaction", destination)
  252. logger.debug("TX [%s] Marking as delivered...", destination)
  253. yield self.transaction_actions.delivered(
  254. transaction, code, response
  255. )
  256. logger.debug("TX [%s] Marked as delivered", destination)
  257. logger.debug("TX [%s] Yielding to callbacks...", destination)
  258. for deferred in deferreds:
  259. if code == 200:
  260. deferred.callback(None)
  261. else:
  262. deferred.errback(RuntimeError("Got status %d" % code))
  263. # Ensures we don't continue until all callbacks on that
  264. # deferred have fired
  265. try:
  266. yield deferred
  267. except:
  268. pass
  269. logger.debug("TX [%s] Yielded to callbacks", destination)
  270. except NotRetryingDestination:
  271. logger.info(
  272. "TX [%s] not ready for retry yet - "
  273. "dropping transaction for now",
  274. destination,
  275. )
  276. except RuntimeError as e:
  277. # We capture this here as there as nothing actually listens
  278. # for this finishing functions deferred.
  279. logger.warn(
  280. "TX [%s] Problem in _attempt_transaction: %s",
  281. destination,
  282. e,
  283. )
  284. except Exception as e:
  285. # We capture this here as there as nothing actually listens
  286. # for this finishing functions deferred.
  287. logger.warn(
  288. "TX [%s] Problem in _attempt_transaction: %s",
  289. destination,
  290. e,
  291. )
  292. for deferred in deferreds:
  293. if not deferred.called:
  294. deferred.errback(e)
  295. finally:
  296. # We want to be *very* sure we delete this after we stop processing
  297. self.pending_transactions.pop(destination, None)
  298. # Check to see if there is anything else to send.
  299. self._attempt_new_transaction(destination)