test_background_update.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import List, Tuple, cast
  16. from unittest.mock import AsyncMock, Mock
  17. import yaml
  18. from twisted.internet.defer import Deferred, ensureDeferred
  19. from twisted.test.proto_helpers import MemoryReactor
  20. from synapse.server import HomeServer
  21. from synapse.storage.background_updates import (
  22. BackgroundUpdater,
  23. ForeignKeyConstraint,
  24. NotNullConstraint,
  25. run_validate_constraint_and_delete_rows_schema_delta,
  26. )
  27. from synapse.storage.database import LoggingTransaction
  28. from synapse.storage.engines import PostgresEngine, Sqlite3Engine
  29. from synapse.types import JsonDict
  30. from synapse.util import Clock
  31. from tests import unittest
  32. from tests.unittest import override_config
  33. class BackgroundUpdateTestCase(unittest.HomeserverTestCase):
  34. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  35. self.updates: BackgroundUpdater = self.hs.get_datastores().main.db_pool.updates
  36. # the base test class should have run the real bg updates for us
  37. self.assertTrue(
  38. self.get_success(self.updates.has_completed_background_updates())
  39. )
  40. self.update_handler = Mock()
  41. self.updates.register_background_update_handler(
  42. "test_update", self.update_handler
  43. )
  44. self.store = self.hs.get_datastores().main
  45. async def update(self, progress: JsonDict, count: int) -> int:
  46. duration_ms = 10
  47. await self.clock.sleep((count * duration_ms) / 1000)
  48. progress = {"my_key": progress["my_key"] + 1}
  49. await self.store.db_pool.runInteraction(
  50. "update_progress",
  51. self.updates._background_update_progress_txn,
  52. "test_update",
  53. progress,
  54. )
  55. return count
  56. def test_do_background_update(self) -> None:
  57. # the time we claim it takes to update one item when running the update
  58. duration_ms = 10
  59. # the target runtime for each bg update
  60. target_background_update_duration_ms = 100
  61. self.get_success(
  62. self.store.db_pool.simple_insert(
  63. "background_updates",
  64. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  65. )
  66. )
  67. self.update_handler.side_effect = self.update
  68. self.update_handler.reset_mock()
  69. res = self.get_success(
  70. self.updates.do_next_background_update(False),
  71. by=0.02,
  72. )
  73. self.assertFalse(res)
  74. # on the first call, we should get run with the default background update size
  75. self.update_handler.assert_called_once_with(
  76. {"my_key": 1}, self.updates.default_background_batch_size
  77. )
  78. # second step: complete the update
  79. # we should now get run with a much bigger number of items to update
  80. async def update(progress: JsonDict, count: int) -> int:
  81. self.assertEqual(progress, {"my_key": 2})
  82. self.assertAlmostEqual(
  83. count,
  84. target_background_update_duration_ms / duration_ms,
  85. places=0,
  86. )
  87. await self.updates._end_background_update("test_update")
  88. return count
  89. self.update_handler.side_effect = update
  90. self.update_handler.reset_mock()
  91. result = self.get_success(self.updates.do_next_background_update(False))
  92. self.assertFalse(result)
  93. self.update_handler.assert_called_once()
  94. # third step: we don't expect to be called any more
  95. self.update_handler.reset_mock()
  96. result = self.get_success(self.updates.do_next_background_update(False))
  97. self.assertTrue(result)
  98. self.assertFalse(self.update_handler.called)
  99. @override_config(
  100. yaml.safe_load(
  101. """
  102. background_updates:
  103. default_batch_size: 20
  104. """
  105. )
  106. )
  107. def test_background_update_default_batch_set_by_config(self) -> None:
  108. """
  109. Test that the background update is run with the default_batch_size set by the config
  110. """
  111. self.get_success(
  112. self.store.db_pool.simple_insert(
  113. "background_updates",
  114. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  115. )
  116. )
  117. self.update_handler.side_effect = self.update
  118. self.update_handler.reset_mock()
  119. res = self.get_success(
  120. self.updates.do_next_background_update(False),
  121. by=0.01,
  122. )
  123. self.assertFalse(res)
  124. # on the first call, we should get run with the default background update size specified in the config
  125. self.update_handler.assert_called_once_with({"my_key": 1}, 20)
  126. def test_background_update_default_sleep_behavior(self) -> None:
  127. """
  128. Test default background update behavior, which is to sleep
  129. """
  130. self.get_success(
  131. self.store.db_pool.simple_insert(
  132. "background_updates",
  133. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  134. )
  135. )
  136. self.update_handler.side_effect = self.update
  137. self.update_handler.reset_mock()
  138. self.updates.start_doing_background_updates()
  139. # 2: advance the reactor less than the default sleep duration (1000ms)
  140. self.reactor.pump([0.5])
  141. # check that an update has not been run
  142. self.update_handler.assert_not_called()
  143. # advance reactor past default sleep duration
  144. self.reactor.pump([1])
  145. # check that update has been run
  146. self.update_handler.assert_called()
  147. @override_config(
  148. yaml.safe_load(
  149. """
  150. background_updates:
  151. sleep_duration_ms: 500
  152. """
  153. )
  154. )
  155. def test_background_update_sleep_set_in_config(self) -> None:
  156. """
  157. Test that changing the sleep time in the config changes how long it sleeps
  158. """
  159. self.get_success(
  160. self.store.db_pool.simple_insert(
  161. "background_updates",
  162. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  163. )
  164. )
  165. self.update_handler.side_effect = self.update
  166. self.update_handler.reset_mock()
  167. self.updates.start_doing_background_updates()
  168. # 2: advance the reactor less than the configured sleep duration (500ms)
  169. self.reactor.pump([0.45])
  170. # check that an update has not been run
  171. self.update_handler.assert_not_called()
  172. # advance reactor past config sleep duration but less than default duration
  173. self.reactor.pump([0.75])
  174. # check that update has been run
  175. self.update_handler.assert_called()
  176. @override_config(
  177. yaml.safe_load(
  178. """
  179. background_updates:
  180. sleep_enabled: false
  181. """
  182. )
  183. )
  184. def test_disabling_background_update_sleep(self) -> None:
  185. """
  186. Test that disabling sleep in the config results in bg update not sleeping
  187. """
  188. self.get_success(
  189. self.store.db_pool.simple_insert(
  190. "background_updates",
  191. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  192. )
  193. )
  194. self.update_handler.side_effect = self.update
  195. self.update_handler.reset_mock()
  196. self.updates.start_doing_background_updates()
  197. # 2: advance the reactor very little
  198. self.reactor.pump([0.025])
  199. # check that an update has run
  200. self.update_handler.assert_called()
  201. @override_config(
  202. yaml.safe_load(
  203. """
  204. background_updates:
  205. background_update_duration_ms: 500
  206. """
  207. )
  208. )
  209. def test_background_update_duration_set_in_config(self) -> None:
  210. """
  211. Test that the desired duration set in the config is used in determining batch size
  212. """
  213. # Duration of one background update item
  214. duration_ms = 10
  215. self.get_success(
  216. self.store.db_pool.simple_insert(
  217. "background_updates",
  218. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  219. )
  220. )
  221. self.update_handler.side_effect = self.update
  222. self.update_handler.reset_mock()
  223. res = self.get_success(
  224. self.updates.do_next_background_update(False),
  225. by=0.02,
  226. )
  227. self.assertFalse(res)
  228. # the first update was run with the default batch size, this should be run with 500ms as the
  229. # desired duration
  230. async def update(progress: JsonDict, count: int) -> int:
  231. self.assertEqual(progress, {"my_key": 2})
  232. self.assertAlmostEqual(
  233. count,
  234. 500 / duration_ms,
  235. places=0,
  236. )
  237. await self.updates._end_background_update("test_update")
  238. return count
  239. self.update_handler.side_effect = update
  240. self.get_success(self.updates.do_next_background_update(False))
  241. @override_config(
  242. yaml.safe_load(
  243. """
  244. background_updates:
  245. min_batch_size: 5
  246. """
  247. )
  248. )
  249. def test_background_update_min_batch_set_in_config(self) -> None:
  250. """
  251. Test that the minimum batch size set in the config is used
  252. """
  253. # a very long-running individual update
  254. duration_ms = 50
  255. self.get_success(
  256. self.store.db_pool.simple_insert(
  257. "background_updates",
  258. values={"update_name": "test_update", "progress_json": '{"my_key": 1}'},
  259. )
  260. )
  261. # Run the update with the long-running update item
  262. async def update_long(progress: JsonDict, count: int) -> int:
  263. await self.clock.sleep((count * duration_ms) / 1000)
  264. progress = {"my_key": progress["my_key"] + 1}
  265. await self.store.db_pool.runInteraction(
  266. "update_progress",
  267. self.updates._background_update_progress_txn,
  268. "test_update",
  269. progress,
  270. )
  271. return count
  272. self.update_handler.side_effect = update_long
  273. self.update_handler.reset_mock()
  274. res = self.get_success(
  275. self.updates.do_next_background_update(False),
  276. by=1,
  277. )
  278. self.assertFalse(res)
  279. # the first update was run with the default batch size, this should be run with minimum batch size
  280. # as the first items took a very long time
  281. async def update_short(progress: JsonDict, count: int) -> int:
  282. self.assertEqual(progress, {"my_key": 2})
  283. self.assertEqual(count, 5)
  284. await self.updates._end_background_update("test_update")
  285. return count
  286. self.update_handler.side_effect = update_short
  287. self.get_success(self.updates.do_next_background_update(False))
  288. def test_failed_update_logs_exception_details(self) -> None:
  289. needle = "RUH ROH RAGGY"
  290. def failing_update(progress: JsonDict, count: int) -> int:
  291. raise Exception(needle)
  292. self.update_handler.side_effect = failing_update
  293. self.update_handler.reset_mock()
  294. self.get_success(
  295. self.store.db_pool.simple_insert(
  296. "background_updates",
  297. values={"update_name": "test_update", "progress_json": "{}"},
  298. )
  299. )
  300. with self.assertLogs(level=logging.ERROR) as logs:
  301. # Expect a back-to-back RuntimeError to be raised
  302. self.get_failure(self.updates.run_background_updates(False), RuntimeError)
  303. self.assertTrue(any(needle in log for log in logs.output), logs.output)
  304. class BackgroundUpdateControllerTestCase(unittest.HomeserverTestCase):
  305. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  306. self.updates: BackgroundUpdater = self.hs.get_datastores().main.db_pool.updates
  307. # the base test class should have run the real bg updates for us
  308. self.assertTrue(
  309. self.get_success(self.updates.has_completed_background_updates())
  310. )
  311. self.update_deferred: Deferred[int] = Deferred()
  312. self.update_handler = Mock(return_value=self.update_deferred)
  313. self.updates.register_background_update_handler(
  314. "test_update", self.update_handler
  315. )
  316. # Mock out the AsyncContextManager
  317. class MockCM:
  318. __aenter__ = AsyncMock(return_value=None)
  319. __aexit__ = AsyncMock(return_value=None)
  320. self._update_ctx_manager = MockCM
  321. # Mock out the `update_handler` callback
  322. self._on_update = Mock(return_value=self._update_ctx_manager())
  323. # Define a default batch size value that's not the same as the internal default
  324. # value (100).
  325. self._default_batch_size = 500
  326. # Register the callbacks with more mocks
  327. self.hs.get_module_api().register_background_update_controller_callbacks(
  328. on_update=self._on_update,
  329. min_batch_size=AsyncMock(return_value=self._default_batch_size),
  330. default_batch_size=AsyncMock(
  331. return_value=self._default_batch_size,
  332. ),
  333. )
  334. def test_controller(self) -> None:
  335. store = self.hs.get_datastores().main
  336. self.get_success(
  337. store.db_pool.simple_insert(
  338. "background_updates",
  339. values={"update_name": "test_update", "progress_json": "{}"},
  340. )
  341. )
  342. # Set the return value for the context manager.
  343. enter_defer: Deferred[int] = Deferred()
  344. self._update_ctx_manager.__aenter__ = Mock(return_value=enter_defer)
  345. # Start the background update.
  346. do_update_d = ensureDeferred(self.updates.do_next_background_update(True))
  347. self.pump()
  348. # `run_update` should have been called, but the update handler won't be
  349. # called until the `enter_defer` (returned by `__aenter__`) is resolved.
  350. self._on_update.assert_called_once_with(
  351. "test_update",
  352. "master",
  353. False,
  354. )
  355. self.assertFalse(do_update_d.called)
  356. self.assertFalse(self.update_deferred.called)
  357. # Resolving the `enter_defer` should call the update handler, which then
  358. # blocks.
  359. enter_defer.callback(100)
  360. self.pump()
  361. self.update_handler.assert_called_once_with({}, self._default_batch_size)
  362. self.assertFalse(self.update_deferred.called)
  363. self._update_ctx_manager.__aexit__.assert_not_called()
  364. # Resolving the update handler deferred should cause the
  365. # `do_next_background_update` to finish and return
  366. self.update_deferred.callback(100)
  367. self.pump()
  368. self._update_ctx_manager.__aexit__.assert_called()
  369. self.get_success(do_update_d)
  370. class BackgroundUpdateValidateConstraintTestCase(unittest.HomeserverTestCase):
  371. """Tests the validate contraint and delete background handlers."""
  372. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  373. self.updates: BackgroundUpdater = self.hs.get_datastores().main.db_pool.updates
  374. # the base test class should have run the real bg updates for us
  375. self.assertTrue(
  376. self.get_success(self.updates.has_completed_background_updates())
  377. )
  378. self.store = self.hs.get_datastores().main
  379. def test_not_null_constraint(self) -> None:
  380. # Create the initial tables, where we have some invalid data.
  381. """Tests adding a not null constraint."""
  382. table_sql = """
  383. CREATE TABLE test_constraint(
  384. a INT PRIMARY KEY,
  385. b INT
  386. );
  387. """
  388. self.get_success(
  389. self.store.db_pool.runInteraction(
  390. "test_not_null_constraint", lambda txn: txn.execute(table_sql)
  391. )
  392. )
  393. # We add an index so that we can check that its correctly recreated when
  394. # using SQLite.
  395. index_sql = "CREATE INDEX test_index ON test_constraint(a)"
  396. self.get_success(
  397. self.store.db_pool.runInteraction(
  398. "test_not_null_constraint", lambda txn: txn.execute(index_sql)
  399. )
  400. )
  401. self.get_success(
  402. self.store.db_pool.simple_insert("test_constraint", {"a": 1, "b": 1})
  403. )
  404. self.get_success(
  405. self.store.db_pool.simple_insert("test_constraint", {"a": 2, "b": None})
  406. )
  407. self.get_success(
  408. self.store.db_pool.simple_insert("test_constraint", {"a": 3, "b": 3})
  409. )
  410. # Now lets do the migration
  411. table2_sqlite = """
  412. CREATE TABLE test_constraint2(
  413. a INT PRIMARY KEY,
  414. b INT,
  415. CONSTRAINT test_constraint_name CHECK (b is NOT NULL)
  416. );
  417. """
  418. def delta(txn: LoggingTransaction) -> None:
  419. run_validate_constraint_and_delete_rows_schema_delta(
  420. txn,
  421. ordering=1000,
  422. update_name="test_bg_update",
  423. table="test_constraint",
  424. constraint_name="test_constraint_name",
  425. constraint=NotNullConstraint("b"),
  426. sqlite_table_name="test_constraint2",
  427. sqlite_table_schema=table2_sqlite,
  428. )
  429. self.get_success(
  430. self.store.db_pool.runInteraction(
  431. "test_not_null_constraint",
  432. delta,
  433. )
  434. )
  435. if isinstance(self.store.database_engine, PostgresEngine):
  436. # Postgres uses a background update
  437. self.updates.register_background_validate_constraint_and_delete_rows(
  438. "test_bg_update",
  439. table="test_constraint",
  440. constraint_name="test_constraint_name",
  441. constraint=NotNullConstraint("b"),
  442. unique_columns=["a"],
  443. )
  444. # Tell the DataStore that it hasn't finished all updates yet
  445. self.store.db_pool.updates._all_done = False
  446. # Now let's actually drive the updates to completion
  447. self.wait_for_background_updates()
  448. # Check the correct values are in the new table.
  449. rows = cast(
  450. List[Tuple[int, int]],
  451. self.get_success(
  452. self.store.db_pool.simple_select_list(
  453. table="test_constraint",
  454. keyvalues={},
  455. retcols=("a", "b"),
  456. )
  457. ),
  458. )
  459. self.assertCountEqual(rows, [(1, 1), (3, 3)])
  460. # And check that invalid rows get correctly rejected.
  461. self.get_failure(
  462. self.store.db_pool.simple_insert("test_constraint", {"a": 2, "b": None}),
  463. exc=self.store.database_engine.module.IntegrityError,
  464. )
  465. # Check the index is still there for SQLite.
  466. if isinstance(self.store.database_engine, Sqlite3Engine):
  467. # Ensure the index exists in the schema.
  468. self.get_success(
  469. self.store.db_pool.simple_select_one_onecol(
  470. table="sqlite_master",
  471. keyvalues={"tbl_name": "test_constraint"},
  472. retcol="name",
  473. )
  474. )
  475. def test_foreign_constraint(self) -> None:
  476. """Tests adding a not foreign key constraint."""
  477. # Create the initial tables, where we have some invalid data.
  478. base_sql = """
  479. CREATE TABLE base_table(
  480. b INT PRIMARY KEY
  481. );
  482. """
  483. table_sql = """
  484. CREATE TABLE test_constraint(
  485. a INT PRIMARY KEY,
  486. b INT NOT NULL
  487. );
  488. """
  489. self.get_success(
  490. self.store.db_pool.runInteraction(
  491. "test_foreign_key_constraint", lambda txn: txn.execute(base_sql)
  492. )
  493. )
  494. self.get_success(
  495. self.store.db_pool.runInteraction(
  496. "test_foreign_key_constraint", lambda txn: txn.execute(table_sql)
  497. )
  498. )
  499. self.get_success(self.store.db_pool.simple_insert("base_table", {"b": 1}))
  500. self.get_success(
  501. self.store.db_pool.simple_insert("test_constraint", {"a": 1, "b": 1})
  502. )
  503. self.get_success(
  504. self.store.db_pool.simple_insert("test_constraint", {"a": 2, "b": 2})
  505. )
  506. self.get_success(self.store.db_pool.simple_insert("base_table", {"b": 3}))
  507. self.get_success(
  508. self.store.db_pool.simple_insert("test_constraint", {"a": 3, "b": 3})
  509. )
  510. table2_sqlite = """
  511. CREATE TABLE test_constraint2(
  512. a INT PRIMARY KEY,
  513. b INT NOT NULL,
  514. CONSTRAINT test_constraint_name FOREIGN KEY (b) REFERENCES base_table (b)
  515. );
  516. """
  517. def delta(txn: LoggingTransaction) -> None:
  518. run_validate_constraint_and_delete_rows_schema_delta(
  519. txn,
  520. ordering=1000,
  521. update_name="test_bg_update",
  522. table="test_constraint",
  523. constraint_name="test_constraint_name",
  524. constraint=ForeignKeyConstraint(
  525. "base_table", [("b", "b")], deferred=False
  526. ),
  527. sqlite_table_name="test_constraint2",
  528. sqlite_table_schema=table2_sqlite,
  529. )
  530. self.get_success(
  531. self.store.db_pool.runInteraction(
  532. "test_foreign_key_constraint",
  533. delta,
  534. )
  535. )
  536. if isinstance(self.store.database_engine, PostgresEngine):
  537. # Postgres uses a background update
  538. self.updates.register_background_validate_constraint_and_delete_rows(
  539. "test_bg_update",
  540. table="test_constraint",
  541. constraint_name="test_constraint_name",
  542. constraint=ForeignKeyConstraint(
  543. "base_table", [("b", "b")], deferred=False
  544. ),
  545. unique_columns=["a"],
  546. )
  547. # Tell the DataStore that it hasn't finished all updates yet
  548. self.store.db_pool.updates._all_done = False
  549. # Now let's actually drive the updates to completion
  550. self.wait_for_background_updates()
  551. # Check the correct values are in the new table.
  552. rows = cast(
  553. List[Tuple[int, int]],
  554. self.get_success(
  555. self.store.db_pool.simple_select_list(
  556. table="test_constraint",
  557. keyvalues={},
  558. retcols=("a", "b"),
  559. )
  560. ),
  561. )
  562. self.assertCountEqual(rows, [(1, 1), (3, 3)])
  563. # And check that invalid rows get correctly rejected.
  564. self.get_failure(
  565. self.store.db_pool.simple_insert("test_constraint", {"a": 2, "b": 2}),
  566. exc=self.store.database_engine.module.IntegrityError,
  567. )