test_background_update.py 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from unittest.mock import Mock
  15. from twisted.internet.defer import Deferred, ensureDeferred
  16. from synapse.storage.background_updates import BackgroundUpdater
  17. from tests import unittest
  18. from tests.test_utils import make_awaitable, simple_async_mock
  19. class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
  20. def prepare(self, reactor, clock, homeserver):
  21. self.updates: BackgroundUpdater = self.hs.get_datastore().db_pool.updates
  22. # the base test class should have run the real bg updates for us
  23. self.assertTrue(
  24. self.get_success(self.updates.has_completed_background_updates())
  25. )
  26. self.update_handler = Mock()
  27. self.updates.register_background_update_handler(
  28. "test_update", self.update_handler
  29. )
  30. def test_do_background_update(self):
  31. # the time we claim it takes to update one item when running the update
  32. duration_ms = 10
  33. # the target runtime for each bg update
  34. target_background_update_duration_ms = 100
  35. store = self.hs.get_datastore()
  36. self.get_success(
  37. store.db_pool.simple_insert(
  38. "background_updates",
  39. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  40. )
  41. )
  42. # first step: make a bit of progress
  43. async def update(progress, count):
  44. await self.clock.sleep((count * duration_ms) / 1000)
  45. progress = {"my_key": progress["my_key"] + 1}
  46. await store.db_pool.runInteraction(
  47. "update_progress",
  48. self.updates._background_update_progress_txn,
  49. "test_update",
  50. progress,
  51. )
  52. return count
  53. self.update_handler.side_effect = update
  54. self.update_handler.reset_mock()
  55. res = self.get_success(
  56. self.updates.do_next_background_update(False),
  57. by=0.01,
  58. )
  59. self.assertFalse(res)
  60. # on the first call, we should get run with the default background update size
  61. self.update_handler.assert_called_once_with(
  62. {"my_key": 1}, self.updates.MINIMUM_BACKGROUND_BATCH_SIZE
  63. )
  64. # second step: complete the update
  65. # we should now get run with a much bigger number of items to update
  66. async def update(progress, count):
  67. self.assertEqual(progress, {"my_key": 2})
  68. self.assertAlmostEqual(
  69. count,
  70. target_background_update_duration_ms / duration_ms,
  71. places=0,
  72. )
  73. await self.updates._end_background_update("test_update")
  74. return count
  75. self.update_handler.side_effect = update
  76. self.update_handler.reset_mock()
  77. result = self.get_success(self.updates.do_next_background_update(False))
  78. self.assertFalse(result)
  79. self.update_handler.assert_called_once()
  80. # third step: we don't expect to be called any more
  81. self.update_handler.reset_mock()
  82. result = self.get_success(self.updates.do_next_background_update(False))
  83. self.assertTrue(result)
  84. self.assertFalse(self.update_handler.called)
  85. class BackgroundUpdateControllerTestCase(unittest.HomeserverTestCase):
  86. def prepare(self, reactor, clock, homeserver):
  87. self.updates: BackgroundUpdater = self.hs.get_datastore().db_pool.updates
  88. # the base test class should have run the real bg updates for us
  89. self.assertTrue(
  90. self.get_success(self.updates.has_completed_background_updates())
  91. )
  92. self.update_deferred = Deferred()
  93. self.update_handler = Mock(return_value=self.update_deferred)
  94. self.updates.register_background_update_handler(
  95. "test_update", self.update_handler
  96. )
  97. # Mock out the AsyncContextManager
  98. class MockCM:
  99. __aenter__ = simple_async_mock(return_value=None)
  100. __aexit__ = simple_async_mock(return_value=None)
  101. self._update_ctx_manager = MockCM
  102. # Mock out the `update_handler` callback
  103. self._on_update = Mock(return_value=self._update_ctx_manager())
  104. # Define a default batch size value that's not the same as the internal default
  105. # value (100).
  106. self._default_batch_size = 500
  107. # Register the callbacks with more mocks
  108. self.hs.get_module_api().register_background_update_controller_callbacks(
  109. on_update=self._on_update,
  110. min_batch_size=Mock(return_value=make_awaitable(self._default_batch_size)),
  111. default_batch_size=Mock(
  112. return_value=make_awaitable(self._default_batch_size),
  113. ),
  114. )
  115. def test_controller(self):
  116. store = self.hs.get_datastore()
  117. self.get_success(
  118. store.db_pool.simple_insert(
  119. "background_updates",
  120. values={"update_name": "test_update", "progress_json": "{}"},
  121. )
  122. )
  123. # Set the return value for the context manager.
  124. enter_defer = Deferred()
  125. self._update_ctx_manager.__aenter__ = Mock(return_value=enter_defer)
  126. # Start the background update.
  127. do_update_d = ensureDeferred(self.updates.do_next_background_update(True))
  128. self.pump()
  129. # `run_update` should have been called, but the update handler won't be
  130. # called until the `enter_defer` (returned by `__aenter__`) is resolved.
  131. self._on_update.assert_called_once_with(
  132. "test_update",
  133. "master",
  134. False,
  135. )
  136. self.assertFalse(do_update_d.called)
  137. self.assertFalse(self.update_deferred.called)
  138. # Resolving the `enter_defer` should call the update handler, which then
  139. # blocks.
  140. enter_defer.callback(100)
  141. self.pump()
  142. self.update_handler.assert_called_once_with({}, self._default_batch_size)
  143. self.assertFalse(self.update_deferred.called)
  144. self._update_ctx_manager.__aexit__.assert_not_called()
  145. # Resolving the update handler deferred should cause the
  146. # `do_next_background_update` to finish and return
  147. self.update_deferred.callback(100)
  148. self.pump()
  149. self._update_ctx_manager.__aexit__.assert_called()
  150. self.get_success(do_update_d)