retryutils.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import random
  17. from twisted.internet import defer
  18. import synapse.logging.context
  19. from synapse.api.errors import CodeMessageException
  20. logger = logging.getLogger(__name__)
  21. # the intial backoff, after the first transaction fails
  22. MIN_RETRY_INTERVAL = 10 * 60 * 1000
  23. # how much we multiply the backoff by after each subsequent fail
  24. RETRY_MULTIPLIER = 5
  25. # a cap on the backoff. (Essentially none)
  26. MAX_RETRY_INTERVAL = 2 ** 62
  27. class NotRetryingDestination(Exception):
  28. def __init__(self, retry_last_ts, retry_interval, destination):
  29. """Raised by the limiter (and federation client) to indicate that we are
  30. are deliberately not attempting to contact a given server.
  31. Args:
  32. retry_last_ts (int): the unix ts in milliseconds of our last attempt
  33. to contact the server. 0 indicates that the last attempt was
  34. successful or that we've never actually attempted to connect.
  35. retry_interval (int): the time in milliseconds to wait until the next
  36. attempt.
  37. destination (str): the domain in question
  38. """
  39. msg = "Not retrying server %s." % (destination,)
  40. super(NotRetryingDestination, self).__init__(msg)
  41. self.retry_last_ts = retry_last_ts
  42. self.retry_interval = retry_interval
  43. self.destination = destination
  44. @defer.inlineCallbacks
  45. def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
  46. """For a given destination check if we have previously failed to
  47. send a request there and are waiting before retrying the destination.
  48. If we are not ready to retry the destination, this will raise a
  49. NotRetryingDestination exception. Otherwise, will return a Context Manager
  50. that will mark the destination as down if an exception is thrown (excluding
  51. CodeMessageException with code < 500)
  52. Args:
  53. destination (str): name of homeserver
  54. clock (synapse.util.clock): timing source
  55. store (synapse.storage.transactions.TransactionStore): datastore
  56. ignore_backoff (bool): true to ignore the historical backoff data and
  57. try the request anyway. We will still reset the retry_interval on success.
  58. Example usage:
  59. try:
  60. limiter = yield get_retry_limiter(destination, clock, store)
  61. with limiter:
  62. response = yield do_request()
  63. except NotRetryingDestination:
  64. # We aren't ready to retry that destination.
  65. raise
  66. """
  67. failure_ts = None
  68. retry_last_ts, retry_interval = (0, 0)
  69. retry_timings = yield store.get_destination_retry_timings(destination)
  70. if retry_timings:
  71. failure_ts = retry_timings["failure_ts"]
  72. retry_last_ts, retry_interval = (
  73. retry_timings["retry_last_ts"],
  74. retry_timings["retry_interval"],
  75. )
  76. now = int(clock.time_msec())
  77. if not ignore_backoff and retry_last_ts + retry_interval > now:
  78. raise NotRetryingDestination(
  79. retry_last_ts=retry_last_ts,
  80. retry_interval=retry_interval,
  81. destination=destination,
  82. )
  83. # if we are ignoring the backoff data, we should also not increment the backoff
  84. # when we get another failure - otherwise a server can very quickly reach the
  85. # maximum backoff even though it might only have been down briefly
  86. backoff_on_failure = not ignore_backoff
  87. return RetryDestinationLimiter(
  88. destination,
  89. clock,
  90. store,
  91. failure_ts,
  92. retry_interval,
  93. backoff_on_failure=backoff_on_failure,
  94. **kwargs
  95. )
  96. class RetryDestinationLimiter(object):
  97. def __init__(
  98. self,
  99. destination,
  100. clock,
  101. store,
  102. failure_ts,
  103. retry_interval,
  104. backoff_on_404=False,
  105. backoff_on_failure=True,
  106. ):
  107. """Marks the destination as "down" if an exception is thrown in the
  108. context, except for CodeMessageException with code < 500.
  109. If no exception is raised, marks the destination as "up".
  110. Args:
  111. destination (str)
  112. clock (Clock)
  113. store (DataStore)
  114. failure_ts (int|None): when this destination started failing (in ms since
  115. the epoch), or zero if the last request was successful
  116. retry_interval (int): The next retry interval taken from the
  117. database in milliseconds, or zero if the last request was
  118. successful.
  119. backoff_on_404 (bool): Back off if we get a 404
  120. backoff_on_failure (bool): set to False if we should not increase the
  121. retry interval on a failure.
  122. """
  123. self.clock = clock
  124. self.store = store
  125. self.destination = destination
  126. self.failure_ts = failure_ts
  127. self.retry_interval = retry_interval
  128. self.backoff_on_404 = backoff_on_404
  129. self.backoff_on_failure = backoff_on_failure
  130. def __enter__(self):
  131. pass
  132. def __exit__(self, exc_type, exc_val, exc_tb):
  133. valid_err_code = False
  134. if exc_type is None:
  135. valid_err_code = True
  136. elif not issubclass(exc_type, Exception):
  137. # avoid treating exceptions which don't derive from Exception as
  138. # failures; this is mostly so as not to catch defer._DefGen.
  139. valid_err_code = True
  140. elif issubclass(exc_type, CodeMessageException):
  141. # Some error codes are perfectly fine for some APIs, whereas other
  142. # APIs may expect to never received e.g. a 404. It's important to
  143. # handle 404 as some remote servers will return a 404 when the HS
  144. # has been decommissioned.
  145. # If we get a 401, then we should probably back off since they
  146. # won't accept our requests for at least a while.
  147. # 429 is us being aggresively rate limited, so lets rate limit
  148. # ourselves.
  149. if exc_val.code == 404 and self.backoff_on_404:
  150. valid_err_code = False
  151. elif exc_val.code in (401, 429):
  152. valid_err_code = False
  153. elif exc_val.code < 500:
  154. valid_err_code = True
  155. else:
  156. valid_err_code = False
  157. if valid_err_code:
  158. # We connected successfully.
  159. if not self.retry_interval:
  160. return
  161. logger.debug(
  162. "Connection to %s was successful; clearing backoff", self.destination
  163. )
  164. self.failure_ts = None
  165. retry_last_ts = 0
  166. self.retry_interval = 0
  167. elif not self.backoff_on_failure:
  168. return
  169. else:
  170. # We couldn't connect.
  171. if self.retry_interval:
  172. self.retry_interval = int(
  173. self.retry_interval * RETRY_MULTIPLIER * random.uniform(0.8, 1.4)
  174. )
  175. if self.retry_interval >= MAX_RETRY_INTERVAL:
  176. self.retry_interval = MAX_RETRY_INTERVAL
  177. else:
  178. self.retry_interval = MIN_RETRY_INTERVAL
  179. logger.info(
  180. "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
  181. self.destination,
  182. exc_type,
  183. exc_val,
  184. self.retry_interval,
  185. )
  186. retry_last_ts = int(self.clock.time_msec())
  187. if self.failure_ts is None:
  188. self.failure_ts = retry_last_ts
  189. @defer.inlineCallbacks
  190. def store_retry_timings():
  191. try:
  192. yield self.store.set_destination_retry_timings(
  193. self.destination,
  194. self.failure_ts,
  195. retry_last_ts,
  196. self.retry_interval,
  197. )
  198. except Exception:
  199. logger.exception("Failed to store destination_retry_timings")
  200. # we deliberately do this in the background.
  201. synapse.logging.context.run_in_background(store_retry_timings)