test_background_update.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from unittest.mock import Mock
  15. import yaml
  16. from twisted.internet.defer import Deferred, ensureDeferred
  17. from twisted.test.proto_helpers import MemoryReactor
  18. from synapse.server import HomeServer
  19. from synapse.storage.background_updates import BackgroundUpdater
  20. from synapse.types import JsonDict
  21. from synapse.util import Clock
  22. from tests import unittest
  23. from tests.test_utils import make_awaitable, simple_async_mock
  24. from tests.unittest import override_config
  25. class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
  26. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  27. self.updates: BackgroundUpdater = self.hs.get_datastores().main.db_pool.updates
  28. # the base test class should have run the real bg updates for us
  29. self.assertTrue(
  30. self.get_success(self.updates.has_completed_background_updates())
  31. )
  32. self.update_handler = Mock()
  33. self.updates.register_background_update_handler(
  34. "test_update", self.update_handler
  35. )
  36. self.store = self.hs.get_datastores().main
  37. async def update(self, progress: JsonDict, count: int) -> int:
  38. duration_ms = 10
  39. await self.clock.sleep((count * duration_ms) / 1000)
  40. progress = {"my_key": progress["my_key"] + 1}
  41. await self.store.db_pool.runInteraction(
  42. "update_progress",
  43. self.updates._background_update_progress_txn,
  44. "test_update",
  45. progress,
  46. )
  47. return count
  48. def test_do_background_update(self) -> None:
  49. # the time we claim it takes to update one item when running the update
  50. duration_ms = 10
  51. # the target runtime for each bg update
  52. target_background_update_duration_ms = 100
  53. self.get_success(
  54. self.store.db_pool.simple_insert(
  55. "background_updates",
  56. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  57. )
  58. )
  59. self.update_handler.side_effect = self.update
  60. self.update_handler.reset_mock()
  61. res = self.get_success(
  62. self.updates.do_next_background_update(False),
  63. by=0.02,
  64. )
  65. self.assertFalse(res)
  66. # on the first call, we should get run with the default background update size
  67. self.update_handler.assert_called_once_with(
  68. {"my_key": 1}, self.updates.default_background_batch_size
  69. )
  70. # second step: complete the update
  71. # we should now get run with a much bigger number of items to update
  72. async def update(progress: JsonDict, count: int) -> int:
  73. self.assertEqual(progress, {"my_key": 2})
  74. self.assertAlmostEqual(
  75. count,
  76. target_background_update_duration_ms / duration_ms,
  77. places=0,
  78. )
  79. await self.updates._end_background_update("test_update")
  80. return count
  81. self.update_handler.side_effect = update
  82. self.update_handler.reset_mock()
  83. result = self.get_success(self.updates.do_next_background_update(False))
  84. self.assertFalse(result)
  85. self.update_handler.assert_called_once()
  86. # third step: we don't expect to be called any more
  87. self.update_handler.reset_mock()
  88. result = self.get_success(self.updates.do_next_background_update(False))
  89. self.assertTrue(result)
  90. self.assertFalse(self.update_handler.called)
  91. @override_config(
  92. yaml.safe_load(
  93. """
  94. background_updates:
  95. default_batch_size: 20
  96. """
  97. )
  98. )
  99. def test_background_update_default_batch_set_by_config(self) -> None:
  100. """
  101. Test that the background update is run with the default_batch_size set by the config
  102. """
  103. self.get_success(
  104. self.store.db_pool.simple_insert(
  105. "background_updates",
  106. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  107. )
  108. )
  109. self.update_handler.side_effect = self.update
  110. self.update_handler.reset_mock()
  111. res = self.get_success(
  112. self.updates.do_next_background_update(False),
  113. by=0.01,
  114. )
  115. self.assertFalse(res)
  116. # on the first call, we should get run with the default background update size specified in the config
  117. self.update_handler.assert_called_once_with({"my_key": 1}, 20)
  118. def test_background_update_default_sleep_behavior(self) -> None:
  119. """
  120. Test default background update behavior, which is to sleep
  121. """
  122. self.get_success(
  123. self.store.db_pool.simple_insert(
  124. "background_updates",
  125. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  126. )
  127. )
  128. self.update_handler.side_effect = self.update
  129. self.update_handler.reset_mock()
  130. self.updates.start_doing_background_updates()
  131. # 2: advance the reactor less than the default sleep duration (1000ms)
  132. self.reactor.pump([0.5])
  133. # check that an update has not been run
  134. self.update_handler.assert_not_called()
  135. # advance reactor past default sleep duration
  136. self.reactor.pump([1])
  137. # check that update has been run
  138. self.update_handler.assert_called()
  139. @override_config(
  140. yaml.safe_load(
  141. """
  142. background_updates:
  143. sleep_duration_ms: 500
  144. """
  145. )
  146. )
  147. def test_background_update_sleep_set_in_config(self) -> None:
  148. """
  149. Test that changing the sleep time in the config changes how long it sleeps
  150. """
  151. self.get_success(
  152. self.store.db_pool.simple_insert(
  153. "background_updates",
  154. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  155. )
  156. )
  157. self.update_handler.side_effect = self.update
  158. self.update_handler.reset_mock()
  159. self.updates.start_doing_background_updates()
  160. # 2: advance the reactor less than the configured sleep duration (500ms)
  161. self.reactor.pump([0.45])
  162. # check that an update has not been run
  163. self.update_handler.assert_not_called()
  164. # advance reactor past config sleep duration but less than default duration
  165. self.reactor.pump([0.75])
  166. # check that update has been run
  167. self.update_handler.assert_called()
  168. @override_config(
  169. yaml.safe_load(
  170. """
  171. background_updates:
  172. sleep_enabled: false
  173. """
  174. )
  175. )
  176. def test_disabling_background_update_sleep(self) -> None:
  177. """
  178. Test that disabling sleep in the config results in bg update not sleeping
  179. """
  180. self.get_success(
  181. self.store.db_pool.simple_insert(
  182. "background_updates",
  183. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  184. )
  185. )
  186. self.update_handler.side_effect = self.update
  187. self.update_handler.reset_mock()
  188. self.updates.start_doing_background_updates()
  189. # 2: advance the reactor very little
  190. self.reactor.pump([0.025])
  191. # check that an update has run
  192. self.update_handler.assert_called()
  193. @override_config(
  194. yaml.safe_load(
  195. """
  196. background_updates:
  197. background_update_duration_ms: 500
  198. """
  199. )
  200. )
  201. def test_background_update_duration_set_in_config(self) -> None:
  202. """
  203. Test that the desired duration set in the config is used in determining batch size
  204. """
  205. # Duration of one background update item
  206. duration_ms = 10
  207. self.get_success(
  208. self.store.db_pool.simple_insert(
  209. "background_updates",
  210. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  211. )
  212. )
  213. self.update_handler.side_effect = self.update
  214. self.update_handler.reset_mock()
  215. res = self.get_success(
  216. self.updates.do_next_background_update(False),
  217. by=0.02,
  218. )
  219. self.assertFalse(res)
  220. # the first update was run with the default batch size, this should be run with 500ms as the
  221. # desired duration
  222. async def update(progress: JsonDict, count: int) -> int:
  223. self.assertEqual(progress, {"my_key": 2})
  224. self.assertAlmostEqual(
  225. count,
  226. 500 / duration_ms,
  227. places=0,
  228. )
  229. await self.updates._end_background_update("test_update")
  230. return count
  231. self.update_handler.side_effect = update
  232. self.get_success(self.updates.do_next_background_update(False))
  233. @override_config(
  234. yaml.safe_load(
  235. """
  236. background_updates:
  237. min_batch_size: 5
  238. """
  239. )
  240. )
  241. def test_background_update_min_batch_set_in_config(self) -> None:
  242. """
  243. Test that the minimum batch size set in the config is used
  244. """
  245. # a very long-running individual update
  246. duration_ms = 50
  247. self.get_success(
  248. self.store.db_pool.simple_insert(
  249. "background_updates",
  250. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  251. )
  252. )
  253. # Run the update with the long-running update item
  254. async def update_long(progress: JsonDict, count: int) -> int:
  255. await self.clock.sleep((count * duration_ms) / 1000)
  256. progress = {"my_key": progress["my_key"] + 1}
  257. await self.store.db_pool.runInteraction(
  258. "update_progress",
  259. self.updates._background_update_progress_txn,
  260. "test_update",
  261. progress,
  262. )
  263. return count
  264. self.update_handler.side_effect = update_long
  265. self.update_handler.reset_mock()
  266. res = self.get_success(
  267. self.updates.do_next_background_update(False),
  268. by=1,
  269. )
  270. self.assertFalse(res)
  271. # the first update was run with the default batch size, this should be run with minimum batch size
  272. # as the first items took a very long time
  273. async def update_short(progress: JsonDict, count: int) -> int:
  274. self.assertEqual(progress, {"my_key": 2})
  275. self.assertEqual(count, 5)
  276. await self.updates._end_background_update("test_update")
  277. return count
  278. self.update_handler.side_effect = update_short
  279. self.get_success(self.updates.do_next_background_update(False))
  280. class BackgroundUpdateControllerTestCase(unittest.HomeserverTestCase):
  281. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  282. self.updates: BackgroundUpdater = self.hs.get_datastores().main.db_pool.updates
  283. # the base test class should have run the real bg updates for us
  284. self.assertTrue(
  285. self.get_success(self.updates.has_completed_background_updates())
  286. )
  287. self.update_deferred: Deferred[int] = Deferred()
  288. self.update_handler = Mock(return_value=self.update_deferred)
  289. self.updates.register_background_update_handler(
  290. "test_update", self.update_handler
  291. )
  292. # Mock out the AsyncContextManager
  293. class MockCM:
  294. __aenter__ = simple_async_mock(return_value=None)
  295. __aexit__ = simple_async_mock(return_value=None)
  296. self._update_ctx_manager = MockCM
  297. # Mock out the `update_handler` callback
  298. self._on_update = Mock(return_value=self._update_ctx_manager())
  299. # Define a default batch size value that's not the same as the internal default
  300. # value (100).
  301. self._default_batch_size = 500
  302. # Register the callbacks with more mocks
  303. self.hs.get_module_api().register_background_update_controller_callbacks(
  304. on_update=self._on_update,
  305. min_batch_size=Mock(return_value=make_awaitable(self._default_batch_size)),
  306. default_batch_size=Mock(
  307. return_value=make_awaitable(self._default_batch_size),
  308. ),
  309. )
  310. def test_controller(self) -> None:
  311. store = self.hs.get_datastores().main
  312. self.get_success(
  313. store.db_pool.simple_insert(
  314. "background_updates",
  315. values={"update_name": "test_update", "progress_json": "{}"},
  316. )
  317. )
  318. # Set the return value for the context manager.
  319. enter_defer: Deferred[int] = Deferred()
  320. self._update_ctx_manager.__aenter__ = Mock(return_value=enter_defer)
  321. # Start the background update.
  322. do_update_d = ensureDeferred(self.updates.do_next_background_update(True))
  323. self.pump()
  324. # `run_update` should have been called, but the update handler won't be
  325. # called until the `enter_defer` (returned by `__aenter__`) is resolved.
  326. self._on_update.assert_called_once_with(
  327. "test_update",
  328. "master",
  329. False,
  330. )
  331. self.assertFalse(do_update_d.called)
  332. self.assertFalse(self.update_deferred.called)
  333. # Resolving the `enter_defer` should call the update handler, which then
  334. # blocks.
  335. enter_defer.callback(100)
  336. self.pump()
  337. self.update_handler.assert_called_once_with({}, self._default_batch_size)
  338. self.assertFalse(self.update_deferred.called)
  339. self._update_ctx_manager.__aexit__.assert_not_called()
  340. # Resolving the update handler deferred should cause the
  341. # `do_next_background_update` to finish and return
  342. self.update_deferred.callback(100)
  343. self.pump()
  344. self._update_ctx_manager.__aexit__.assert_called()
  345. self.get_success(do_update_d)