test_retryutils.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. # Copyright 2019 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from unittest import mock
  15. from synapse.notifier import Notifier
  16. from synapse.replication.tcp.handler import ReplicationCommandHandler
  17. from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
  18. from tests.unittest import HomeserverTestCase
  19. class RetryLimiterTestCase(HomeserverTestCase):
  20. def test_new_destination(self) -> None:
  21. """A happy-path case with a new destination and a successful operation"""
  22. store = self.hs.get_datastores().main
  23. limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
  24. # advance the clock a bit before making the request
  25. self.pump(1)
  26. with limiter:
  27. pass
  28. new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
  29. self.assertIsNone(new_timings)
  30. def test_limiter(self) -> None:
  31. """General test case which walks through the process of a failing request"""
  32. store = self.hs.get_datastores().main
  33. limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
  34. min_retry_interval_ms = (
  35. self.hs.config.federation.destination_min_retry_interval_ms
  36. )
  37. retry_multiplier = self.hs.config.federation.destination_retry_multiplier
  38. self.pump(1)
  39. try:
  40. with limiter:
  41. self.pump(1)
  42. failure_ts = self.clock.time_msec()
  43. raise AssertionError("argh")
  44. except AssertionError:
  45. pass
  46. self.pump()
  47. new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
  48. assert new_timings is not None
  49. self.assertEqual(new_timings.failure_ts, failure_ts)
  50. self.assertEqual(new_timings.retry_last_ts, failure_ts)
  51. self.assertEqual(new_timings.retry_interval, min_retry_interval_ms)
  52. # now if we try again we should get a failure
  53. self.get_failure(
  54. get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
  55. )
  56. #
  57. # advance the clock and try again
  58. #
  59. self.pump(min_retry_interval_ms)
  60. limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
  61. self.pump(1)
  62. try:
  63. with limiter:
  64. self.pump(1)
  65. retry_ts = self.clock.time_msec()
  66. raise AssertionError("argh")
  67. except AssertionError:
  68. pass
  69. self.pump()
  70. new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
  71. assert new_timings is not None
  72. self.assertEqual(new_timings.failure_ts, failure_ts)
  73. self.assertEqual(new_timings.retry_last_ts, retry_ts)
  74. self.assertGreaterEqual(
  75. new_timings.retry_interval, min_retry_interval_ms * retry_multiplier * 0.5
  76. )
  77. self.assertLessEqual(
  78. new_timings.retry_interval, min_retry_interval_ms * retry_multiplier * 2.0
  79. )
  80. #
  81. # one more go, with success
  82. #
  83. self.reactor.advance(min_retry_interval_ms * retry_multiplier * 2.0)
  84. limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
  85. self.pump(1)
  86. with limiter:
  87. self.pump(1)
  88. # wait for the update to land
  89. self.pump()
  90. new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
  91. self.assertIsNone(new_timings)
  92. def test_notifier_replication(self) -> None:
  93. """Ensure the notifier/replication client is called only when expected."""
  94. store = self.hs.get_datastores().main
  95. notifier = mock.Mock(spec=Notifier)
  96. replication_client = mock.Mock(spec=ReplicationCommandHandler)
  97. limiter = self.get_success(
  98. get_retry_limiter(
  99. "test_dest",
  100. self.clock,
  101. store,
  102. notifier=notifier,
  103. replication_client=replication_client,
  104. )
  105. )
  106. # The server is already up, nothing should occur.
  107. self.pump(1)
  108. with limiter:
  109. pass
  110. self.pump()
  111. new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
  112. self.assertIsNone(new_timings)
  113. notifier.notify_remote_server_up.assert_not_called()
  114. replication_client.send_remote_server_up.assert_not_called()
  115. # Attempt again, but return an error. This will cause new retry timings, but
  116. # should not trigger server up notifications.
  117. self.pump(1)
  118. try:
  119. with limiter:
  120. raise AssertionError("argh")
  121. except AssertionError:
  122. pass
  123. self.pump()
  124. new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
  125. # The exact retry timings are tested separately.
  126. self.assertIsNotNone(new_timings)
  127. notifier.notify_remote_server_up.assert_not_called()
  128. replication_client.send_remote_server_up.assert_not_called()
  129. # A second failing request should be treated as the above.
  130. self.pump(1)
  131. try:
  132. with limiter:
  133. raise AssertionError("argh")
  134. except AssertionError:
  135. pass
  136. self.pump()
  137. new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
  138. # The exact retry timings are tested separately.
  139. self.assertIsNotNone(new_timings)
  140. notifier.notify_remote_server_up.assert_not_called()
  141. replication_client.send_remote_server_up.assert_not_called()
  142. # A final successful attempt should generate a server up notification.
  143. self.pump(1)
  144. with limiter:
  145. pass
  146. self.pump()
  147. new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
  148. # The exact retry timings are tested separately.
  149. self.assertIsNone(new_timings)
  150. notifier.notify_remote_server_up.assert_called_once_with("test_dest")
  151. replication_client.send_remote_server_up.assert_called_once_with("test_dest")
  152. def test_max_retry_interval(self) -> None:
  153. """Test that `destination_max_retry_interval` setting works as expected"""
  154. store = self.hs.get_datastores().main
  155. destination_max_retry_interval_ms = (
  156. self.hs.config.federation.destination_max_retry_interval_ms
  157. )
  158. self.get_success(get_retry_limiter("test_dest", self.clock, store))
  159. self.pump(1)
  160. failure_ts = self.clock.time_msec()
  161. # Simulate reaching destination_max_retry_interval
  162. self.get_success(
  163. store.set_destination_retry_timings(
  164. "test_dest",
  165. failure_ts=failure_ts,
  166. retry_last_ts=failure_ts,
  167. retry_interval=destination_max_retry_interval_ms,
  168. )
  169. )
  170. # Check it fails
  171. self.get_failure(
  172. get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
  173. )
  174. # Get past retry_interval and we can try again, and still throw an error to continue the backoff
  175. self.reactor.advance(destination_max_retry_interval_ms / 1000 + 1)
  176. limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store))
  177. self.pump(1)
  178. try:
  179. with limiter:
  180. self.pump(1)
  181. raise AssertionError("argh")
  182. except AssertionError:
  183. pass
  184. self.pump()
  185. # retry_interval does not increase and stays at destination_max_retry_interval_ms
  186. new_timings = self.get_success(store.get_destination_retry_timings("test_dest"))
  187. assert new_timings is not None
  188. self.assertEqual(new_timings.retry_interval, destination_max_retry_interval_ms)
  189. # Check it fails
  190. self.get_failure(
  191. get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination
  192. )