123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156 |
- # -*- coding: utf-8 -*-
- # Copyright 2015, 2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from twisted.internet import defer
- from synapse.api.errors import CodeMessageException
- import logging
- import random
- logger = logging.getLogger(__name__)
- class NotRetryingDestination(Exception):
- def __init__(self, retry_last_ts, retry_interval, destination):
- msg = "Not retrying server %s." % (destination,)
- super(NotRetryingDestination, self).__init__(msg)
- self.retry_last_ts = retry_last_ts
- self.retry_interval = retry_interval
- self.destination = destination
- @defer.inlineCallbacks
- def get_retry_limiter(destination, clock, store, **kwargs):
- """For a given destination check if we have previously failed to
- send a request there and are waiting before retrying the destination.
- If we are not ready to retry the destination, this will raise a
- NotRetryingDestination exception. Otherwise, will return a Context Manager
- that will mark the destination as down if an exception is thrown (excluding
- CodeMessageException with code < 500)
- Example usage:
- try:
- limiter = yield get_retry_limiter(destination, clock, store)
- with limiter:
- response = yield do_request()
- except NotRetryingDestination:
- # We aren't ready to retry that destination.
- raise
- """
- retry_last_ts, retry_interval = (0, 0)
- retry_timings = yield store.get_destination_retry_timings(
- destination
- )
- if retry_timings:
- retry_last_ts, retry_interval = (
- retry_timings["retry_last_ts"], retry_timings["retry_interval"]
- )
- now = int(clock.time_msec())
- if retry_last_ts + retry_interval > now:
- raise NotRetryingDestination(
- retry_last_ts=retry_last_ts,
- retry_interval=retry_interval,
- destination=destination,
- )
- defer.returnValue(
- RetryDestinationLimiter(
- destination,
- clock,
- store,
- retry_interval,
- **kwargs
- )
- )
- class RetryDestinationLimiter(object):
- def __init__(self, destination, clock, store, retry_interval,
- min_retry_interval=10 * 60 * 1000,
- max_retry_interval=24 * 60 * 60 * 1000,
- multiplier_retry_interval=5,):
- """Marks the destination as "down" if an exception is thrown in the
- context, except for CodeMessageException with code < 500.
- If no exception is raised, marks the destination as "up".
- Args:
- destination (str)
- clock (Clock)
- store (DataStore)
- retry_interval (int): The next retry interval taken from the
- database in milliseconds, or zero if the last request was
- successful.
- min_retry_interval (int): The minimum retry interval to use after
- a failed request, in milliseconds.
- max_retry_interval (int): The maximum retry interval to use after
- a failed request, in milliseconds.
- multiplier_retry_interval (int): The multiplier to use to increase
- the retry interval after a failed request.
- """
- self.clock = clock
- self.store = store
- self.destination = destination
- self.retry_interval = retry_interval
- self.min_retry_interval = min_retry_interval
- self.max_retry_interval = max_retry_interval
- self.multiplier_retry_interval = multiplier_retry_interval
- def __enter__(self):
- pass
- def __exit__(self, exc_type, exc_val, exc_tb):
- def err(failure):
- logger.exception(
- "Failed to store set_destination_retry_timings",
- failure.value
- )
- valid_err_code = False
- if exc_type is CodeMessageException:
- valid_err_code = 0 <= exc_val.code < 500
- if exc_type is None or valid_err_code:
- # We connected successfully.
- if not self.retry_interval:
- return
- retry_last_ts = 0
- self.retry_interval = 0
- else:
- # We couldn't connect.
- if self.retry_interval:
- self.retry_interval *= self.multiplier_retry_interval
- self.retry_interval *= int(random.uniform(0.8, 1.4))
- if self.retry_interval >= self.max_retry_interval:
- self.retry_interval = self.max_retry_interval
- else:
- self.retry_interval = self.min_retry_interval
- retry_last_ts = int(self.clock.time_msec())
- self.store.set_destination_retry_timings(
- self.destination, retry_last_ts, self.retry_interval
- ).addErrback(err)
|