test_background_update.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. from mock import Mock
  2. from twisted.internet.defer import Deferred, ensureDeferred
  3. from synapse.storage.background_updates import BackgroundUpdater
  4. from tests import unittest
  5. from tests.test_utils import make_awaitable
  6. class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
  7. def prepare(self, reactor, clock, homeserver):
  8. self.updates: BackgroundUpdater = self.hs.get_datastore().db_pool.updates
  9. # the base test class should have run the real bg updates for us
  10. self.assertTrue(
  11. self.get_success(self.updates.has_completed_background_updates())
  12. )
  13. self.update_handler = Mock()
  14. self.updates.register_background_update_handler(
  15. "test_update", self.update_handler
  16. )
  17. def test_do_background_update(self):
  18. # the time we claim it takes to update one item when running the update
  19. duration_ms = 10
  20. # the target runtime for each bg update
  21. target_background_update_duration_ms = 100
  22. store = self.hs.get_datastore()
  23. self.get_success(
  24. store.db_pool.simple_insert(
  25. "background_updates",
  26. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  27. )
  28. )
  29. # first step: make a bit of progress
  30. async def update(progress, count):
  31. await self.clock.sleep((count * duration_ms) / 1000)
  32. progress = {"my_key": progress["my_key"] + 1}
  33. await store.db_pool.runInteraction(
  34. "update_progress",
  35. self.updates._background_update_progress_txn,
  36. "test_update",
  37. progress,
  38. )
  39. return count
  40. self.update_handler.side_effect = update
  41. self.update_handler.reset_mock()
  42. res = self.get_success(
  43. self.updates.do_next_background_update(False),
  44. by=0.01,
  45. )
  46. self.assertFalse(res)
  47. # on the first call, we should get run with the default background update size
  48. self.update_handler.assert_called_once_with(
  49. {"my_key": 1}, self.updates.MINIMUM_BACKGROUND_BATCH_SIZE
  50. )
  51. # second step: complete the update
  52. # we should now get run with a much bigger number of items to update
  53. async def update(progress, count):
  54. self.assertEqual(progress, {"my_key": 2})
  55. self.assertAlmostEqual(
  56. count,
  57. target_background_update_duration_ms / duration_ms,
  58. places=0,
  59. )
  60. await self.updates._end_background_update("test_update")
  61. return count
  62. self.update_handler.side_effect = update
  63. self.update_handler.reset_mock()
  64. result = self.get_success(self.updates.do_next_background_update(False))
  65. self.assertFalse(result)
  66. self.update_handler.assert_called_once()
  67. # third step: we don't expect to be called any more
  68. self.update_handler.reset_mock()
  69. result = self.get_success(self.updates.do_next_background_update(False))
  70. self.assertTrue(result)
  71. self.assertFalse(self.update_handler.called)
  72. class BackgroundUpdateControllerTestCase(unittest.HomeserverTestCase):
  73. def prepare(self, reactor, clock, homeserver):
  74. self.updates: BackgroundUpdater = self.hs.get_datastore().db_pool.updates
  75. # the base test class should have run the real bg updates for us
  76. self.assertTrue(
  77. self.get_success(self.updates.has_completed_background_updates())
  78. )
  79. self.update_deferred = Deferred()
  80. self.update_handler = Mock(return_value=self.update_deferred)
  81. self.updates.register_background_update_handler(
  82. "test_update", self.update_handler
  83. )
  84. # Mock out the AsyncContextManager
  85. self._update_ctx_manager = Mock(spec=["__aenter__", "__aexit__"])
  86. self._update_ctx_manager.__aenter__ = Mock(
  87. return_value=make_awaitable(None),
  88. )
  89. self._update_ctx_manager.__aexit__ = Mock(return_value=make_awaitable(None))
  90. # Mock out the `update_handler` callback
  91. self._on_update = Mock(return_value=self._update_ctx_manager)
  92. # Define a default batch size value that's not the same as the internal default
  93. # value (100).
  94. self._default_batch_size = 500
  95. # Register the callbacks with more mocks
  96. self.hs.get_module_api().register_background_update_controller_callbacks(
  97. on_update=self._on_update,
  98. min_batch_size=Mock(return_value=make_awaitable(self._default_batch_size)),
  99. default_batch_size=Mock(
  100. return_value=make_awaitable(self._default_batch_size),
  101. ),
  102. )
  103. def test_controller(self):
  104. store = self.hs.get_datastore()
  105. self.get_success(
  106. store.db_pool.simple_insert(
  107. "background_updates",
  108. values={"update_name": "test_update", "progress_json": "{}"},
  109. )
  110. )
  111. # Set the return value for the context manager.
  112. enter_defer = Deferred()
  113. self._update_ctx_manager.__aenter__ = Mock(return_value=enter_defer)
  114. # Start the background update.
  115. do_update_d = ensureDeferred(self.updates.do_next_background_update(True))
  116. self.pump()
  117. # `run_update` should have been called, but the update handler won't be
  118. # called until the `enter_defer` (returned by `__aenter__`) is resolved.
  119. self._on_update.assert_called_once_with(
  120. "test_update",
  121. "master",
  122. False,
  123. )
  124. self.assertFalse(do_update_d.called)
  125. self.assertFalse(self.update_deferred.called)
  126. # Resolving the `enter_defer` should call the update handler, which then
  127. # blocks.
  128. enter_defer.callback(100)
  129. self.pump()
  130. self.update_handler.assert_called_once_with({}, self._default_batch_size)
  131. self.assertFalse(self.update_deferred.called)
  132. self._update_ctx_manager.__aexit__.assert_not_called()
  133. # Resolving the update handler deferred should cause the
  134. # `do_next_background_update` to finish and return
  135. self.update_deferred.callback(100)
  136. self.pump()
  137. self._update_ctx_manager.__aexit__.assert_called()
  138. self.get_success(do_update_d)