123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- # -*- coding: utf-8 -*-
- # Copyright 2015, 2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """
- This module controls the reliability for application service transactions.
- The nominal flow through this module looks like:
- __________
- 1---ASa[e]-->| Service |--> Queue ASa[f]
- 2----ASb[e]->| Queuer |
- 3--ASa[f]--->|__________|-----------+ ASa[e], ASb[e]
- V
- -````````- +------------+
- |````````|<--StoreTxn-|Transaction |
- |Database| | Controller |---> SEND TO AS
- `--------` +------------+
- What happens on SEND TO AS depends on the state of the Application Service:
- - If the AS is marked as DOWN, do nothing.
- - If the AS is marked as UP, send the transaction.
- * SUCCESS : Increment where the AS is up to txn-wise and nuke the txn
- contents from the db.
- * FAILURE : Marked AS as DOWN and start Recoverer.
- Recoverer attempts to recover ASes who have died. The flow for this looks like:
- ,--------------------- backoff++ --------------.
- V |
- START ---> Wait exp ------> Get oldest txn ID from ----> FAILURE
- backoff DB and try to send it
- ^ |___________
- Mark AS as | V
- UP & quit +---------- YES SUCCESS
- | | |
- NO <--- Have more txns? <------ Mark txn success & nuke <-+
- from db; incr AS pos.
- Reset backoff.
- This is all tied together by the AppServiceScheduler which DIs the required
- components.
- """
- from synapse.appservice import ApplicationServiceState
- from twisted.internet import defer
- import logging
- logger = logging.getLogger(__name__)
- class ApplicationServiceScheduler(object):
- """ Public facing API for this module. Does the required DI to tie the
- components together. This also serves as the "event_pool", which in this
- case is a simple array.
- """
- def __init__(self, hs):
- self.clock = hs.get_clock()
- self.store = hs.get_datastore()
- self.as_api = hs.get_application_service_api()
- def create_recoverer(service, callback):
- return _Recoverer(self.clock, self.store, self.as_api, service, callback)
- self.txn_ctrl = _TransactionController(
- self.clock, self.store, self.as_api, create_recoverer
- )
- self.queuer = _ServiceQueuer(self.txn_ctrl)
- @defer.inlineCallbacks
- def start(self):
- logger.info("Starting appservice scheduler")
- # check for any DOWN ASes and start recoverers for them.
- recoverers = yield _Recoverer.start(
- self.clock, self.store, self.as_api, self.txn_ctrl.on_recovered
- )
- self.txn_ctrl.add_recoverers(recoverers)
- def submit_event_for_as(self, service, event):
- self.queuer.enqueue(service, event)
- class _ServiceQueuer(object):
- """Queues events for the same application service together, sending
- transactions as soon as possible. Once a transaction is sent successfully,
- this schedules any other events in the queue to run.
- """
- def __init__(self, txn_ctrl):
- self.queued_events = {} # dict of {service_id: [events]}
- self.pending_requests = {} # dict of {service_id: Deferred}
- self.txn_ctrl = txn_ctrl
- def enqueue(self, service, event):
- # if this service isn't being sent something
- if not self.pending_requests.get(service.id):
- self._send_request(service, [event])
- else:
- # add to queue for this service
- if service.id not in self.queued_events:
- self.queued_events[service.id] = []
- self.queued_events[service.id].append(event)
- def _send_request(self, service, events):
- # send request and add callbacks
- d = self.txn_ctrl.send(service, events)
- d.addBoth(self._on_request_finish)
- d.addErrback(self._on_request_fail)
- self.pending_requests[service.id] = d
- def _on_request_finish(self, service):
- self.pending_requests[service.id] = None
- # if there are queued events, then send them.
- if (service.id in self.queued_events
- and len(self.queued_events[service.id]) > 0):
- self._send_request(service, self.queued_events[service.id])
- self.queued_events[service.id] = []
- def _on_request_fail(self, err):
- logger.error("AS request failed: %s", err)
- class _TransactionController(object):
- def __init__(self, clock, store, as_api, recoverer_fn):
- self.clock = clock
- self.store = store
- self.as_api = as_api
- self.recoverer_fn = recoverer_fn
- # keep track of how many recoverers there are
- self.recoverers = []
- @defer.inlineCallbacks
- def send(self, service, events):
- try:
- txn = yield self.store.create_appservice_txn(
- service=service,
- events=events
- )
- service_is_up = yield self._is_service_up(service)
- if service_is_up:
- sent = yield txn.send(self.as_api)
- if sent:
- txn.complete(self.store)
- else:
- self._start_recoverer(service)
- except Exception as e:
- logger.exception(e)
- self._start_recoverer(service)
- # request has finished
- defer.returnValue(service)
- @defer.inlineCallbacks
- def on_recovered(self, recoverer):
- self.recoverers.remove(recoverer)
- logger.info("Successfully recovered application service AS ID %s",
- recoverer.service.id)
- logger.info("Remaining active recoverers: %s", len(self.recoverers))
- yield self.store.set_appservice_state(
- recoverer.service,
- ApplicationServiceState.UP
- )
- def add_recoverers(self, recoverers):
- for r in recoverers:
- self.recoverers.append(r)
- if len(recoverers) > 0:
- logger.info("New active recoverers: %s", len(self.recoverers))
- @defer.inlineCallbacks
- def _start_recoverer(self, service):
- yield self.store.set_appservice_state(
- service,
- ApplicationServiceState.DOWN
- )
- logger.info(
- "Application service falling behind. Starting recoverer. AS ID %s",
- service.id
- )
- recoverer = self.recoverer_fn(service, self.on_recovered)
- self.add_recoverers([recoverer])
- recoverer.recover()
- @defer.inlineCallbacks
- def _is_service_up(self, service):
- state = yield self.store.get_appservice_state(service)
- defer.returnValue(state == ApplicationServiceState.UP or state is None)
- class _Recoverer(object):
- @staticmethod
- @defer.inlineCallbacks
- def start(clock, store, as_api, callback):
- services = yield store.get_appservices_by_state(
- ApplicationServiceState.DOWN
- )
- recoverers = [
- _Recoverer(clock, store, as_api, s, callback) for s in services
- ]
- for r in recoverers:
- logger.info("Starting recoverer for AS ID %s which was marked as "
- "DOWN", r.service.id)
- r.recover()
- defer.returnValue(recoverers)
- def __init__(self, clock, store, as_api, service, callback):
- self.clock = clock
- self.store = store
- self.as_api = as_api
- self.service = service
- self.callback = callback
- self.backoff_counter = 1
- def recover(self):
- self.clock.call_later((2 ** self.backoff_counter), self.retry)
- def _backoff(self):
- # cap the backoff to be around 8.5min => (2^9) = 512 secs
- if self.backoff_counter < 9:
- self.backoff_counter += 1
- self.recover()
- @defer.inlineCallbacks
- def retry(self):
- try:
- txn = yield self.store.get_oldest_unsent_txn(self.service)
- if txn:
- logger.info("Retrying transaction %s for AS ID %s",
- txn.id, txn.service.id)
- sent = yield txn.send(self.as_api)
- if sent:
- yield txn.complete(self.store)
- # reset the backoff counter and retry immediately
- self.backoff_counter = 1
- yield self.retry()
- else:
- self._backoff()
- else:
- self._set_service_recovered()
- except Exception as e:
- logger.exception(e)
- self._backoff()
- def _set_service_recovered(self):
- self.callback(self)
|