retryutils.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import random
  16. from types import TracebackType
  17. from typing import TYPE_CHECKING, Any, Optional, Type
  18. from synapse.api.errors import CodeMessageException
  19. from synapse.metrics.background_process_metrics import run_as_background_process
  20. from synapse.storage import DataStore
  21. from synapse.util import Clock
  22. if TYPE_CHECKING:
  23. from synapse.notifier import Notifier
  24. from synapse.replication.tcp.handler import ReplicationCommandHandler
  25. logger = logging.getLogger(__name__)
  26. # the initial backoff, after the first transaction fails
  27. MIN_RETRY_INTERVAL = 10 * 60 * 1000
  28. # how much we multiply the backoff by after each subsequent fail
  29. RETRY_MULTIPLIER = 5
  30. # a cap on the backoff. (Essentially none)
  31. MAX_RETRY_INTERVAL = 2**62
  32. class NotRetryingDestination(Exception):
  33. def __init__(self, retry_last_ts: int, retry_interval: int, destination: str):
  34. """Raised by the limiter (and federation client) to indicate that we are
  35. are deliberately not attempting to contact a given server.
  36. Args:
  37. retry_last_ts: the unix ts in milliseconds of our last attempt
  38. to contact the server. 0 indicates that the last attempt was
  39. successful or that we've never actually attempted to connect.
  40. retry_interval: the time in milliseconds to wait until the next
  41. attempt.
  42. destination: the domain in question
  43. """
  44. msg = "Not retrying server %s." % (destination,)
  45. super().__init__(msg)
  46. self.retry_last_ts = retry_last_ts
  47. self.retry_interval = retry_interval
  48. self.destination = destination
  49. async def get_retry_limiter(
  50. destination: str,
  51. clock: Clock,
  52. store: DataStore,
  53. ignore_backoff: bool = False,
  54. **kwargs: Any,
  55. ) -> "RetryDestinationLimiter":
  56. """For a given destination check if we have previously failed to
  57. send a request there and are waiting before retrying the destination.
  58. If we are not ready to retry the destination, this will raise a
  59. NotRetryingDestination exception. Otherwise, will return a Context Manager
  60. that will mark the destination as down if an exception is thrown (excluding
  61. CodeMessageException with code < 500)
  62. Args:
  63. destination: name of homeserver
  64. clock: timing source
  65. store: datastore
  66. ignore_backoff: true to ignore the historical backoff data and
  67. try the request anyway. We will still reset the retry_interval on success.
  68. Example usage:
  69. try:
  70. limiter = await get_retry_limiter(destination, clock, store)
  71. with limiter:
  72. response = await do_request()
  73. except NotRetryingDestination:
  74. # We aren't ready to retry that destination.
  75. raise
  76. """
  77. failure_ts = None
  78. retry_last_ts, retry_interval = (0, 0)
  79. retry_timings = await store.get_destination_retry_timings(destination)
  80. if retry_timings:
  81. failure_ts = retry_timings.failure_ts
  82. retry_last_ts = retry_timings.retry_last_ts
  83. retry_interval = retry_timings.retry_interval
  84. now = int(clock.time_msec())
  85. if not ignore_backoff and retry_last_ts + retry_interval > now:
  86. raise NotRetryingDestination(
  87. retry_last_ts=retry_last_ts,
  88. retry_interval=retry_interval,
  89. destination=destination,
  90. )
  91. # if we are ignoring the backoff data, we should also not increment the backoff
  92. # when we get another failure - otherwise a server can very quickly reach the
  93. # maximum backoff even though it might only have been down briefly
  94. backoff_on_failure = not ignore_backoff
  95. return RetryDestinationLimiter(
  96. destination,
  97. clock,
  98. store,
  99. failure_ts,
  100. retry_interval,
  101. backoff_on_failure=backoff_on_failure,
  102. **kwargs,
  103. )
  104. class RetryDestinationLimiter:
  105. def __init__(
  106. self,
  107. destination: str,
  108. clock: Clock,
  109. store: DataStore,
  110. failure_ts: Optional[int],
  111. retry_interval: int,
  112. backoff_on_404: bool = False,
  113. backoff_on_failure: bool = True,
  114. notifier: Optional["Notifier"] = None,
  115. replication_client: Optional["ReplicationCommandHandler"] = None,
  116. ):
  117. """Marks the destination as "down" if an exception is thrown in the
  118. context, except for CodeMessageException with code < 500.
  119. If no exception is raised, marks the destination as "up".
  120. Args:
  121. destination
  122. clock
  123. store
  124. failure_ts: when this destination started failing (in ms since
  125. the epoch), or zero if the last request was successful
  126. retry_interval: The next retry interval taken from the
  127. database in milliseconds, or zero if the last request was
  128. successful.
  129. backoff_on_404: Back off if we get a 404
  130. backoff_on_failure: set to False if we should not increase the
  131. retry interval on a failure.
  132. """
  133. self.clock = clock
  134. self.store = store
  135. self.destination = destination
  136. self.failure_ts = failure_ts
  137. self.retry_interval = retry_interval
  138. self.backoff_on_404 = backoff_on_404
  139. self.backoff_on_failure = backoff_on_failure
  140. self.notifier = notifier
  141. self.replication_client = replication_client
  142. def __enter__(self) -> None:
  143. pass
  144. def __exit__(
  145. self,
  146. exc_type: Optional[Type[BaseException]],
  147. exc_val: Optional[BaseException],
  148. exc_tb: Optional[TracebackType],
  149. ) -> None:
  150. valid_err_code = False
  151. if exc_type is None:
  152. valid_err_code = True
  153. elif not issubclass(exc_type, Exception):
  154. # avoid treating exceptions which don't derive from Exception as
  155. # failures; this is mostly so as not to catch defer._DefGen.
  156. valid_err_code = True
  157. elif isinstance(exc_val, CodeMessageException):
  158. # Some error codes are perfectly fine for some APIs, whereas other
  159. # APIs may expect to never received e.g. a 404. It's important to
  160. # handle 404 as some remote servers will return a 404 when the HS
  161. # has been decommissioned.
  162. # If we get a 401, then we should probably back off since they
  163. # won't accept our requests for at least a while.
  164. # 429 is us being aggressively rate limited, so lets rate limit
  165. # ourselves.
  166. if exc_val.code == 404 and self.backoff_on_404:
  167. valid_err_code = False
  168. elif exc_val.code in (401, 429):
  169. valid_err_code = False
  170. elif exc_val.code < 500:
  171. valid_err_code = True
  172. else:
  173. valid_err_code = False
  174. if valid_err_code:
  175. # We connected successfully.
  176. if not self.retry_interval:
  177. return
  178. logger.debug(
  179. "Connection to %s was successful; clearing backoff", self.destination
  180. )
  181. self.failure_ts = None
  182. retry_last_ts = 0
  183. self.retry_interval = 0
  184. elif not self.backoff_on_failure:
  185. return
  186. else:
  187. # We couldn't connect.
  188. if self.retry_interval:
  189. self.retry_interval = int(
  190. self.retry_interval * RETRY_MULTIPLIER * random.uniform(0.8, 1.4)
  191. )
  192. if self.retry_interval >= MAX_RETRY_INTERVAL:
  193. self.retry_interval = MAX_RETRY_INTERVAL
  194. else:
  195. self.retry_interval = MIN_RETRY_INTERVAL
  196. logger.info(
  197. "Connection to %s was unsuccessful (%s(%s)); backoff now %i",
  198. self.destination,
  199. exc_type,
  200. exc_val,
  201. self.retry_interval,
  202. )
  203. retry_last_ts = int(self.clock.time_msec())
  204. if self.failure_ts is None:
  205. self.failure_ts = retry_last_ts
  206. async def store_retry_timings() -> None:
  207. try:
  208. await self.store.set_destination_retry_timings(
  209. self.destination,
  210. self.failure_ts,
  211. retry_last_ts,
  212. self.retry_interval,
  213. )
  214. if self.notifier:
  215. # Inform the relevant places that the remote server is back up.
  216. self.notifier.notify_remote_server_up(self.destination)
  217. if self.replication_client:
  218. # If we're on a worker we try and inform master about this. The
  219. # replication client doesn't hook into the notifier to avoid
  220. # infinite loops where we send a `REMOTE_SERVER_UP` command to
  221. # master, which then echoes it back to us which in turn pokes
  222. # the notifier.
  223. self.replication_client.send_remote_server_up(self.destination)
  224. except Exception:
  225. logger.exception("Failed to store destination_retry_timings")
  226. # we deliberately do this in the background.
  227. run_as_background_process("store_retry_timings", store_retry_timings)