retryutils.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import random
  16. import synapse.logging.context
  17. from synapse.api.errors import CodeMessageException
  18. logger = logging.getLogger(__name__)
  19. # the initial backoff, after the first transaction fails
  20. MIN_RETRY_INTERVAL = 10 * 60 * 1000
  21. # how much we multiply the backoff by after each subsequent fail
  22. RETRY_MULTIPLIER = 5
  23. # a cap on the backoff. (Essentially none)
  24. MAX_RETRY_INTERVAL = 2 ** 62
  25. class NotRetryingDestination(Exception):
  26. def __init__(self, retry_last_ts, retry_interval, destination):
  27. """Raised by the limiter (and federation client) to indicate that we are
  28. are deliberately not attempting to contact a given server.
  29. Args:
  30. retry_last_ts (int): the unix ts in milliseconds of our last attempt
  31. to contact the server. 0 indicates that the last attempt was
  32. successful or that we've never actually attempted to connect.
  33. retry_interval (int): the time in milliseconds to wait until the next
  34. attempt.
  35. destination (str): the domain in question
  36. """
  37. msg = "Not retrying server %s." % (destination,)
  38. super().__init__(msg)
  39. self.retry_last_ts = retry_last_ts
  40. self.retry_interval = retry_interval
  41. self.destination = destination
  42. async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
  43. """For a given destination check if we have previously failed to
  44. send a request there and are waiting before retrying the destination.
  45. If we are not ready to retry the destination, this will raise a
  46. NotRetryingDestination exception. Otherwise, will return a Context Manager
  47. that will mark the destination as down if an exception is thrown (excluding
  48. CodeMessageException with code < 500)
  49. Args:
  50. destination (str): name of homeserver
  51. clock (synapse.util.clock): timing source
  52. store (synapse.storage.transactions.TransactionStore): datastore
  53. ignore_backoff (bool): true to ignore the historical backoff data and
  54. try the request anyway. We will still reset the retry_interval on success.
  55. Example usage:
  56. try:
  57. limiter = await get_retry_limiter(destination, clock, store)
  58. with limiter:
  59. response = await do_request()
  60. except NotRetryingDestination:
  61. # We aren't ready to retry that destination.
  62. raise
  63. """
  64. failure_ts = None
  65. retry_last_ts, retry_interval = (0, 0)
  66. retry_timings = await store.get_destination_retry_timings(destination)
  67. if retry_timings:
  68. failure_ts = retry_timings.failure_ts
  69. retry_last_ts = retry_timings.retry_last_ts
  70. retry_interval = retry_timings.retry_interval
  71. now = int(clock.time_msec())
  72. if not ignore_backoff and retry_last_ts + retry_interval > now:
  73. raise NotRetryingDestination(
  74. retry_last_ts=retry_last_ts,
  75. retry_interval=retry_interval,
  76. destination=destination,
  77. )
  78. # if we are ignoring the backoff data, we should also not increment the backoff
  79. # when we get another failure - otherwise a server can very quickly reach the
  80. # maximum backoff even though it might only have been down briefly
  81. backoff_on_failure = not ignore_backoff
  82. return RetryDestinationLimiter(
  83. destination,
  84. clock,
  85. store,
  86. failure_ts,
  87. retry_interval,
  88. backoff_on_failure=backoff_on_failure,
  89. **kwargs,
  90. )
  91. class RetryDestinationLimiter:
  92. def __init__(
  93. self,
  94. destination,
  95. clock,
  96. store,
  97. failure_ts,
  98. retry_interval,
  99. backoff_on_404=False,
  100. backoff_on_failure=True,
  101. ):
  102. """Marks the destination as "down" if an exception is thrown in the
  103. context, except for CodeMessageException with code < 500.
  104. If no exception is raised, marks the destination as "up".
  105. Args:
  106. destination (str)
  107. clock (Clock)
  108. store (DataStore)
  109. failure_ts (int|None): when this destination started failing (in ms since
  110. the epoch), or zero if the last request was successful
  111. retry_interval (int): The next retry interval taken from the
  112. database in milliseconds, or zero if the last request was
  113. successful.
  114. backoff_on_404 (bool): Back off if we get a 404
  115. backoff_on_failure (bool): set to False if we should not increase the
  116. retry interval on a failure.
  117. """
  118. self.clock = clock
  119. self.store = store
  120. self.destination = destination
  121. self.failure_ts = failure_ts
  122. self.retry_interval = retry_interval
  123. self.backoff_on_404 = backoff_on_404
  124. self.backoff_on_failure = backoff_on_failure
  125. def __enter__(self):
  126. pass
  127. def __exit__(self, exc_type, exc_val, exc_tb):
  128. valid_err_code = False
  129. if exc_type is None:
  130. valid_err_code = True
  131. elif not issubclass(exc_type, Exception):
  132. # avoid treating exceptions which don't derive from Exception as
  133. # failures; this is mostly so as not to catch defer._DefGen.
  134. valid_err_code = True
  135. elif issubclass(exc_type, CodeMessageException):
  136. # Some error codes are perfectly fine for some APIs, whereas other
  137. # APIs may expect to never received e.g. a 404. It's important to
  138. # handle 404 as some remote servers will return a 404 when the HS
  139. # has been decommissioned.
  140. # If we get a 401, then we should probably back off since they
  141. # won't accept our requests for at least a while.
  142. # 429 is us being aggressively rate limited, so lets rate limit
  143. # ourselves.
  144. if exc_val.code == 404 and self.backoff_on_404:
  145. valid_err_code = False
  146. elif exc_val.code in (401, 429):
  147. valid_err_code = False
  148. elif exc_val.code < 500:
  149. valid_err_code = True
  150. else:
  151. valid_err_code = False
  152. if valid_err_code:
  153. # We connected successfully.
  154. if not self.retry_interval:
  155. return
  156. logger.debug(
  157. "Connection to %s was successful; clearing backoff", self.destination
  158. )
  159. self.failure_ts = None
  160. retry_last_ts = 0
  161. self.retry_interval = 0
  162. elif not self.backoff_on_failure:
  163. return
  164. else:
  165. # We couldn't connect.
  166. if self.retry_interval:
  167. self.retry_interval = int(
  168. self.retry_interval * RETRY_MULTIPLIER * random.uniform(0.8, 1.4)
  169. )
  170. if self.retry_interval >= MAX_RETRY_INTERVAL:
  171. self.retry_interval = MAX_RETRY_INTERVAL
  172. else:
  173. self.retry_interval = MIN_RETRY_INTERVAL
  174. logger.info(
  175. "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
  176. self.destination,
  177. exc_type,
  178. exc_val,
  179. self.retry_interval,
  180. )
  181. retry_last_ts = int(self.clock.time_msec())
  182. if self.failure_ts is None:
  183. self.failure_ts = retry_last_ts
  184. async def store_retry_timings():
  185. try:
  186. await self.store.set_destination_retry_timings(
  187. self.destination,
  188. self.failure_ts,
  189. retry_last_ts,
  190. self.retry_interval,
  191. )
  192. except Exception:
  193. logger.exception("Failed to store destination_retry_timings")
  194. # we deliberately do this in the background.
  195. synapse.logging.context.run_in_background(store_retry_timings)