scheduler.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """
  15. This module controls the reliability for application service transactions.
  16. The nominal flow through this module looks like:
  17. __________
  18. 1---ASa[e]-->| Service |--> Queue ASa[f]
  19. 2----ASb[e]->| Queuer |
  20. 3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e]
  21. V
  22. -````````- +------------+
  23. |````````|<--StoreTxn-|Transaction |
  24. |Database| | Controller |---> SEND TO AS
  25. `--------` +------------+
  26. What happens on SEND TO AS depends on the state of the Application Service:
  27. - If the AS is marked as DOWN, do nothing.
  28. - If the AS is marked as UP, send the transaction.
  29. * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn
  30. contents from the db.
  31. * FAILURE : Marked AS as DOWN and start Recoverer.
  32. Recoverer attempts to recover ASes who have died. The flow for this looks like:
  33. ,--------------------- backoff++ --------------.
  34. V |
  35. START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE
  36. backoff DB and try to send it
  37. ^ |___________
  38. Mark AS as | V
  39. UP & quit +---------- YES SUCCESS
  40. | | |
  41. NO <--- Have more txns? <------ Mark txn success & nuke <-+
  42. from db; incr AS pos.
  43. Reset backoff.
  44. This is all tied together by the AppServiceScheduler which DIs the required
  45. components.
  46. """
  47. import logging
  48. from typing import (
  49. TYPE_CHECKING,
  50. Awaitable,
  51. Callable,
  52. Collection,
  53. Dict,
  54. List,
  55. Optional,
  56. Set,
  57. )
  58. from synapse.appservice import ApplicationService, ApplicationServiceState
  59. from synapse.appservice.api import ApplicationServiceApi
  60. from synapse.events import EventBase
  61. from synapse.logging.context import run_in_background
  62. from synapse.metrics.background_process_metrics import run_as_background_process
  63. from synapse.storage.databases.main import DataStore
  64. from synapse.types import JsonDict
  65. from synapse.util import Clock
  66. if TYPE_CHECKING:
  67. from synapse.server import HomeServer
  68. logger = logging.getLogger(__name__)
  69. # Maximum number of events to provide in an AS transaction.
  70. MAX_PERSISTENT_EVENTS_PER_TRANSACTION = 100
  71. # Maximum number of ephemeral events to provide in an AS transaction.
  72. MAX_EPHEMERAL_EVENTS_PER_TRANSACTION = 100
  73. # Maximum number of to-device messages to provide in an AS transaction.
  74. MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION = 100
  75. class ApplicationServiceScheduler:
  76. """Public facing API for this module. Does the required DI to tie the
  77. components together. This also serves as the "event_pool", which in this
  78. case is a simple array.
  79. """
  80. def __init__(self, hs: "HomeServer"):
  81. self.clock = hs.get_clock()
  82. self.store = hs.get_datastores().main
  83. self.as_api = hs.get_application_service_api()
  84. self.txn_ctrl = _TransactionController(self.clock, self.store, self.as_api)
  85. self.queuer = _ServiceQueuer(self.txn_ctrl, self.clock)
  86. async def start(self) -> None:
  87. logger.info("Starting appservice scheduler")
  88. # check for any DOWN ASes and start recoverers for them.
  89. services = await self.store.get_appservices_by_state(
  90. ApplicationServiceState.DOWN
  91. )
  92. for service in services:
  93. self.txn_ctrl.start_recoverer(service)
  94. def enqueue_for_appservice(
  95. self,
  96. appservice: ApplicationService,
  97. events: Optional[Collection[EventBase]] = None,
  98. ephemeral: Optional[Collection[JsonDict]] = None,
  99. to_device_messages: Optional[Collection[JsonDict]] = None,
  100. ) -> None:
  101. """
  102. Enqueue some data to be sent off to an application service.
  103. Args:
  104. appservice: The application service to create and send a transaction to.
  105. events: The persistent room events to send.
  106. ephemeral: The ephemeral events to send.
  107. to_device_messages: The to-device messages to send. These differ from normal
  108. to-device messages sent to clients, as they have 'to_device_id' and
  109. 'to_user_id' fields.
  110. """
  111. # We purposefully allow this method to run with empty events/ephemeral
  112. # collections, so that callers do not need to check iterable size themselves.
  113. if not events and not ephemeral and not to_device_messages:
  114. return
  115. if events:
  116. self.queuer.queued_events.setdefault(appservice.id, []).extend(events)
  117. if ephemeral:
  118. self.queuer.queued_ephemeral.setdefault(appservice.id, []).extend(ephemeral)
  119. if to_device_messages:
  120. self.queuer.queued_to_device_messages.setdefault(appservice.id, []).extend(
  121. to_device_messages
  122. )
  123. # Kick off a new application service transaction
  124. self.queuer.start_background_request(appservice)
  125. class _ServiceQueuer:
  126. """Queue of events waiting to be sent to appservices.
  127. Groups events into transactions per-appservice, and sends them on to the
  128. TransactionController. Makes sure that we only have one transaction in flight per
  129. appservice at a given time.
  130. """
  131. def __init__(self, txn_ctrl: "_TransactionController", clock: Clock):
  132. # dict of {service_id: [events]}
  133. self.queued_events: Dict[str, List[EventBase]] = {}
  134. # dict of {service_id: [events]}
  135. self.queued_ephemeral: Dict[str, List[JsonDict]] = {}
  136. # dict of {service_id: [to_device_message_json]}
  137. self.queued_to_device_messages: Dict[str, List[JsonDict]] = {}
  138. # the appservices which currently have a transaction in flight
  139. self.requests_in_flight: Set[str] = set()
  140. self.txn_ctrl = txn_ctrl
  141. self.clock = clock
  142. def start_background_request(self, service: ApplicationService) -> None:
  143. # start a sender for this appservice if we don't already have one
  144. if service.id in self.requests_in_flight:
  145. return
  146. run_as_background_process(
  147. "as-sender-%s" % (service.id,), self._send_request, service
  148. )
  149. async def _send_request(self, service: ApplicationService) -> None:
  150. # sanity-check: we shouldn't get here if this service already has a sender
  151. # running.
  152. assert service.id not in self.requests_in_flight
  153. self.requests_in_flight.add(service.id)
  154. try:
  155. while True:
  156. all_events = self.queued_events.get(service.id, [])
  157. events = all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION]
  158. del all_events[:MAX_PERSISTENT_EVENTS_PER_TRANSACTION]
  159. all_events_ephemeral = self.queued_ephemeral.get(service.id, [])
  160. ephemeral = all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
  161. del all_events_ephemeral[:MAX_EPHEMERAL_EVENTS_PER_TRANSACTION]
  162. all_to_device_messages = self.queued_to_device_messages.get(
  163. service.id, []
  164. )
  165. to_device_messages_to_send = all_to_device_messages[
  166. :MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION
  167. ]
  168. del all_to_device_messages[:MAX_TO_DEVICE_MESSAGES_PER_TRANSACTION]
  169. if not events and not ephemeral and not to_device_messages_to_send:
  170. return
  171. try:
  172. await self.txn_ctrl.send(
  173. service, events, ephemeral, to_device_messages_to_send
  174. )
  175. except Exception:
  176. logger.exception("AS request failed")
  177. finally:
  178. self.requests_in_flight.discard(service.id)
  179. class _TransactionController:
  180. """Transaction manager.
  181. Builds AppServiceTransactions and runs their lifecycle. Also starts a Recoverer
  182. if a transaction fails.
  183. (Note we have only have one of these in the homeserver.)
  184. """
  185. def __init__(self, clock: Clock, store: DataStore, as_api: ApplicationServiceApi):
  186. self.clock = clock
  187. self.store = store
  188. self.as_api = as_api
  189. # map from service id to recoverer instance
  190. self.recoverers: Dict[str, "_Recoverer"] = {}
  191. # for UTs
  192. self.RECOVERER_CLASS = _Recoverer
  193. async def send(
  194. self,
  195. service: ApplicationService,
  196. events: List[EventBase],
  197. ephemeral: Optional[List[JsonDict]] = None,
  198. to_device_messages: Optional[List[JsonDict]] = None,
  199. ) -> None:
  200. """
  201. Create a transaction with the given data and send to the provided
  202. application service.
  203. Args:
  204. service: The application service to send the transaction to.
  205. events: The persistent events to include in the transaction.
  206. ephemeral: The ephemeral events to include in the transaction.
  207. to_device_messages: The to-device messages to include in the transaction.
  208. """
  209. try:
  210. txn = await self.store.create_appservice_txn(
  211. service=service,
  212. events=events,
  213. ephemeral=ephemeral or [],
  214. to_device_messages=to_device_messages or [],
  215. )
  216. service_is_up = await self._is_service_up(service)
  217. if service_is_up:
  218. sent = await txn.send(self.as_api)
  219. if sent:
  220. await txn.complete(self.store)
  221. else:
  222. run_in_background(self._on_txn_fail, service)
  223. except Exception:
  224. logger.exception("Error creating appservice transaction")
  225. run_in_background(self._on_txn_fail, service)
  226. async def on_recovered(self, recoverer: "_Recoverer") -> None:
  227. logger.info(
  228. "Successfully recovered application service AS ID %s", recoverer.service.id
  229. )
  230. self.recoverers.pop(recoverer.service.id)
  231. logger.info("Remaining active recoverers: %s", len(self.recoverers))
  232. await self.store.set_appservice_state(
  233. recoverer.service, ApplicationServiceState.UP
  234. )
  235. async def _on_txn_fail(self, service: ApplicationService) -> None:
  236. try:
  237. await self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
  238. self.start_recoverer(service)
  239. except Exception:
  240. logger.exception("Error starting AS recoverer")
  241. def start_recoverer(self, service: ApplicationService) -> None:
  242. """Start a Recoverer for the given service
  243. Args:
  244. service:
  245. """
  246. logger.info("Starting recoverer for AS ID %s", service.id)
  247. assert service.id not in self.recoverers
  248. recoverer = self.RECOVERER_CLASS(
  249. self.clock, self.store, self.as_api, service, self.on_recovered
  250. )
  251. self.recoverers[service.id] = recoverer
  252. recoverer.recover()
  253. logger.info("Now %i active recoverers", len(self.recoverers))
  254. async def _is_service_up(self, service: ApplicationService) -> bool:
  255. state = await self.store.get_appservice_state(service)
  256. return state == ApplicationServiceState.UP or state is None
  257. class _Recoverer:
  258. """Manages retries and backoff for a DOWN appservice.
  259. We have one of these for each appservice which is currently considered DOWN.
  260. Args:
  261. clock (synapse.util.Clock):
  262. store (synapse.storage.DataStore):
  263. as_api (synapse.appservice.api.ApplicationServiceApi):
  264. service (synapse.appservice.ApplicationService): the service we are managing
  265. callback (callable[_Recoverer]): called once the service recovers.
  266. """
  267. def __init__(
  268. self,
  269. clock: Clock,
  270. store: DataStore,
  271. as_api: ApplicationServiceApi,
  272. service: ApplicationService,
  273. callback: Callable[["_Recoverer"], Awaitable[None]],
  274. ):
  275. self.clock = clock
  276. self.store = store
  277. self.as_api = as_api
  278. self.service = service
  279. self.callback = callback
  280. self.backoff_counter = 1
  281. def recover(self) -> None:
  282. def _retry() -> None:
  283. run_as_background_process(
  284. "as-recoverer-%s" % (self.service.id,), self.retry
  285. )
  286. delay = 2 ** self.backoff_counter
  287. logger.info("Scheduling retries on %s in %fs", self.service.id, delay)
  288. self.clock.call_later(delay, _retry)
  289. def _backoff(self) -> None:
  290. # cap the backoff to be around 8.5min => (2^9) = 512 secs
  291. if self.backoff_counter < 9:
  292. self.backoff_counter += 1
  293. self.recover()
  294. async def retry(self) -> None:
  295. logger.info("Starting retries on %s", self.service.id)
  296. try:
  297. while True:
  298. txn = await self.store.get_oldest_unsent_txn(self.service)
  299. if not txn:
  300. # nothing left: we're done!
  301. await self.callback(self)
  302. return
  303. logger.info(
  304. "Retrying transaction %s for AS ID %s", txn.id, txn.service.id
  305. )
  306. sent = await txn.send(self.as_api)
  307. if not sent:
  308. break
  309. await txn.complete(self.store)
  310. # reset the backoff counter and then process the next transaction
  311. self.backoff_counter = 1
  312. except Exception:
  313. logger.exception("Unexpected error running retries")
  314. # we didn't manage to send all of the transactions before we got an error of
  315. # some flavour: reschedule the next retry.
  316. self._backoff()