retryutils.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer
  16. from synapse.api.errors import CodeMessageException
  17. import logging
  18. import random
  19. logger = logging.getLogger(__name__)
  20. class NotRetryingDestination(Exception):
  21. def __init__(self, retry_last_ts, retry_interval, destination):
  22. msg = "Not retrying server %s." % (destination,)
  23. super(NotRetryingDestination, self).__init__(msg)
  24. self.retry_last_ts = retry_last_ts
  25. self.retry_interval = retry_interval
  26. self.destination = destination
  27. @defer.inlineCallbacks
  28. def get_retry_limiter(destination, clock, store, **kwargs):
  29. """For a given destination check if we have previously failed to
  30. send a request there and are waiting before retrying the destination.
  31. If we are not ready to retry the destination, this will raise a
  32. NotRetryingDestination exception. Otherwise, will return a Context Manager
  33. that will mark the destination as down if an exception is thrown (excluding
  34. CodeMessageException with code < 500)
  35. Example usage:
  36. try:
  37. limiter = yield get_retry_limiter(destination, clock, store)
  38. with limiter:
  39. response = yield do_request()
  40. except NotRetryingDestination:
  41. # We aren't ready to retry that destination.
  42. raise
  43. """
  44. retry_last_ts, retry_interval = (0, 0)
  45. retry_timings = yield store.get_destination_retry_timings(
  46. destination
  47. )
  48. if retry_timings:
  49. retry_last_ts, retry_interval = (
  50. retry_timings["retry_last_ts"], retry_timings["retry_interval"]
  51. )
  52. now = int(clock.time_msec())
  53. if retry_last_ts + retry_interval > now:
  54. raise NotRetryingDestination(
  55. retry_last_ts=retry_last_ts,
  56. retry_interval=retry_interval,
  57. destination=destination,
  58. )
  59. defer.returnValue(
  60. RetryDestinationLimiter(
  61. destination,
  62. clock,
  63. store,
  64. retry_interval,
  65. **kwargs
  66. )
  67. )
  68. class RetryDestinationLimiter(object):
  69. def __init__(self, destination, clock, store, retry_interval,
  70. min_retry_interval=10 * 60 * 1000,
  71. max_retry_interval=24 * 60 * 60 * 1000,
  72. multiplier_retry_interval=5,):
  73. """Marks the destination as "down" if an exception is thrown in the
  74. context, except for CodeMessageException with code < 500.
  75. If no exception is raised, marks the destination as "up".
  76. Args:
  77. destination (str)
  78. clock (Clock)
  79. store (DataStore)
  80. retry_interval (int): The next retry interval taken from the
  81. database in milliseconds, or zero if the last request was
  82. successful.
  83. min_retry_interval (int): The minimum retry interval to use after
  84. a failed request, in milliseconds.
  85. max_retry_interval (int): The maximum retry interval to use after
  86. a failed request, in milliseconds.
  87. multiplier_retry_interval (int): The multiplier to use to increase
  88. the retry interval after a failed request.
  89. """
  90. self.clock = clock
  91. self.store = store
  92. self.destination = destination
  93. self.retry_interval = retry_interval
  94. self.min_retry_interval = min_retry_interval
  95. self.max_retry_interval = max_retry_interval
  96. self.multiplier_retry_interval = multiplier_retry_interval
  97. def __enter__(self):
  98. pass
  99. def __exit__(self, exc_type, exc_val, exc_tb):
  100. def err(failure):
  101. logger.exception(
  102. "Failed to store set_destination_retry_timings",
  103. failure.value
  104. )
  105. valid_err_code = False
  106. if exc_type is CodeMessageException:
  107. valid_err_code = 0 <= exc_val.code < 500
  108. if exc_type is None or valid_err_code:
  109. # We connected successfully.
  110. if not self.retry_interval:
  111. return
  112. retry_last_ts = 0
  113. self.retry_interval = 0
  114. else:
  115. # We couldn't connect.
  116. if self.retry_interval:
  117. self.retry_interval *= self.multiplier_retry_interval
  118. self.retry_interval *= int(random.uniform(0.8, 1.4))
  119. if self.retry_interval >= self.max_retry_interval:
  120. self.retry_interval = self.max_retry_interval
  121. else:
  122. self.retry_interval = self.min_retry_interval
  123. retry_last_ts = int(self.clock.time_msec())
  124. self.store.set_destination_retry_timings(
  125. self.destination, retry_last_ts, self.retry_interval
  126. ).addErrback(err)