test_background_updates.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Collection
  15. from parameterized import parameterized
  16. from twisted.test.proto_helpers import MemoryReactor
  17. import synapse.rest.admin
  18. from synapse.api.errors import Codes
  19. from synapse.rest.client import login
  20. from synapse.server import HomeServer
  21. from synapse.storage.background_updates import BackgroundUpdater
  22. from synapse.types import JsonDict
  23. from synapse.util import Clock
  24. from tests import unittest
  25. class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
  26. servlets = [
  27. synapse.rest.admin.register_servlets,
  28. login.register_servlets,
  29. ]
  30. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  31. self.store = hs.get_datastores().main
  32. self.admin_user = self.register_user("admin", "pass", admin=True)
  33. self.admin_user_tok = self.login("admin", "pass")
  34. self.updater = BackgroundUpdater(hs, self.store.db_pool)
  35. @parameterized.expand(
  36. [
  37. ("GET", "/_synapse/admin/v1/background_updates/enabled"),
  38. ("POST", "/_synapse/admin/v1/background_updates/enabled"),
  39. ("GET", "/_synapse/admin/v1/background_updates/status"),
  40. ("POST", "/_synapse/admin/v1/background_updates/start_job"),
  41. ]
  42. )
  43. def test_requester_is_no_admin(self, method: str, url: str) -> None:
  44. """
  45. If the user is not a server admin, an error 403 is returned.
  46. """
  47. self.register_user("user", "pass", admin=False)
  48. other_user_tok = self.login("user", "pass")
  49. channel = self.make_request(
  50. method,
  51. url,
  52. content={},
  53. access_token=other_user_tok,
  54. )
  55. self.assertEqual(403, channel.code, msg=channel.json_body)
  56. self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
  57. def test_invalid_parameter(self) -> None:
  58. """
  59. If parameters are invalid, an error is returned.
  60. """
  61. url = "/_synapse/admin/v1/background_updates/start_job"
  62. # empty content
  63. channel = self.make_request(
  64. "POST",
  65. url,
  66. content={},
  67. access_token=self.admin_user_tok,
  68. )
  69. self.assertEqual(400, channel.code, msg=channel.json_body)
  70. self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
  71. # job_name invalid
  72. channel = self.make_request(
  73. "POST",
  74. url,
  75. content={"job_name": "unknown"},
  76. access_token=self.admin_user_tok,
  77. )
  78. self.assertEqual(400, channel.code, msg=channel.json_body)
  79. self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
  80. def _register_bg_update(self) -> None:
  81. "Adds a bg update but doesn't start it"
  82. async def _fake_update(progress: JsonDict, batch_size: int) -> int:
  83. await self.clock.sleep(0.2)
  84. return batch_size
  85. self.store.db_pool.updates.register_background_update_handler(
  86. "test_update",
  87. _fake_update,
  88. )
  89. self.get_success(
  90. self.store.db_pool.simple_insert(
  91. table="background_updates",
  92. values={
  93. "update_name": "test_update",
  94. "progress_json": "{}",
  95. },
  96. )
  97. )
  98. def test_status_empty(self) -> None:
  99. """Test the status API works."""
  100. channel = self.make_request(
  101. "GET",
  102. "/_synapse/admin/v1/background_updates/status",
  103. access_token=self.admin_user_tok,
  104. )
  105. self.assertEqual(200, channel.code, msg=channel.json_body)
  106. # Background updates should be enabled, but none should be running.
  107. self.assertDictEqual(
  108. channel.json_body, {"current_updates": {}, "enabled": True}
  109. )
  110. def test_status_bg_update(self) -> None:
  111. """Test the status API works with a background update."""
  112. # Create a new background update
  113. self._register_bg_update()
  114. self.store.db_pool.updates.start_doing_background_updates()
  115. self.reactor.pump([1.0, 1.0, 1.0])
  116. channel = self.make_request(
  117. "GET",
  118. "/_synapse/admin/v1/background_updates/status",
  119. access_token=self.admin_user_tok,
  120. )
  121. self.assertEqual(200, channel.code, msg=channel.json_body)
  122. # Background updates should be enabled, and one should be running.
  123. self.assertDictEqual(
  124. channel.json_body,
  125. {
  126. "current_updates": {
  127. "master": {
  128. "name": "test_update",
  129. "average_items_per_ms": 0.1,
  130. "total_duration_ms": 1000.0,
  131. "total_item_count": (
  132. self.updater.default_background_batch_size
  133. ),
  134. }
  135. },
  136. "enabled": True,
  137. },
  138. )
  139. def test_enabled(self) -> None:
  140. """Test the enabled API works."""
  141. # Create a new background update
  142. self._register_bg_update()
  143. self.store.db_pool.updates.start_doing_background_updates()
  144. # Test that GET works and returns enabled is True.
  145. channel = self.make_request(
  146. "GET",
  147. "/_synapse/admin/v1/background_updates/enabled",
  148. access_token=self.admin_user_tok,
  149. )
  150. self.assertEqual(200, channel.code, msg=channel.json_body)
  151. self.assertDictEqual(channel.json_body, {"enabled": True})
  152. # Disable the BG updates
  153. channel = self.make_request(
  154. "POST",
  155. "/_synapse/admin/v1/background_updates/enabled",
  156. content={"enabled": False},
  157. access_token=self.admin_user_tok,
  158. )
  159. self.assertEqual(200, channel.code, msg=channel.json_body)
  160. self.assertDictEqual(channel.json_body, {"enabled": False})
  161. # Advance a bit and get the current status, note this will finish the in
  162. # flight background update so we call it the status API twice and check
  163. # there was no change.
  164. self.reactor.pump([1.0, 1.0])
  165. channel = self.make_request(
  166. "GET",
  167. "/_synapse/admin/v1/background_updates/status",
  168. access_token=self.admin_user_tok,
  169. )
  170. self.assertEqual(200, channel.code, msg=channel.json_body)
  171. self.assertDictEqual(
  172. channel.json_body,
  173. {
  174. "current_updates": {
  175. "master": {
  176. "name": "test_update",
  177. "average_items_per_ms": 0.1,
  178. "total_duration_ms": 1000.0,
  179. "total_item_count": (
  180. self.updater.default_background_batch_size
  181. ),
  182. }
  183. },
  184. "enabled": False,
  185. },
  186. )
  187. # Run the reactor for a bit so the BG updates would have a chance to run
  188. # if they were to.
  189. self.reactor.pump([1.0, 1.0])
  190. channel = self.make_request(
  191. "GET",
  192. "/_synapse/admin/v1/background_updates/status",
  193. access_token=self.admin_user_tok,
  194. )
  195. self.assertEqual(200, channel.code, msg=channel.json_body)
  196. # There should be no change from the previous /status response.
  197. self.assertDictEqual(
  198. channel.json_body,
  199. {
  200. "current_updates": {
  201. "master": {
  202. "name": "test_update",
  203. "average_items_per_ms": 0.1,
  204. "total_duration_ms": 1000.0,
  205. "total_item_count": (
  206. self.updater.default_background_batch_size
  207. ),
  208. }
  209. },
  210. "enabled": False,
  211. },
  212. )
  213. # Re-enable the background updates.
  214. channel = self.make_request(
  215. "POST",
  216. "/_synapse/admin/v1/background_updates/enabled",
  217. content={"enabled": True},
  218. access_token=self.admin_user_tok,
  219. )
  220. self.assertEqual(200, channel.code, msg=channel.json_body)
  221. self.assertDictEqual(channel.json_body, {"enabled": True})
  222. self.reactor.pump([1.0, 1.0])
  223. channel = self.make_request(
  224. "GET",
  225. "/_synapse/admin/v1/background_updates/status",
  226. access_token=self.admin_user_tok,
  227. )
  228. self.assertEqual(200, channel.code, msg=channel.json_body)
  229. # Background updates should be enabled and making progress.
  230. self.assertDictEqual(
  231. channel.json_body,
  232. {
  233. "current_updates": {
  234. "master": {
  235. "name": "test_update",
  236. "average_items_per_ms": 0.05263157894736842,
  237. "total_duration_ms": 2000.0,
  238. "total_item_count": (110),
  239. }
  240. },
  241. "enabled": True,
  242. },
  243. )
  244. @parameterized.expand(
  245. [
  246. ("populate_stats_process_rooms", ["populate_stats_process_rooms"]),
  247. (
  248. "regenerate_directory",
  249. [
  250. "populate_user_directory_createtables",
  251. "populate_user_directory_process_rooms",
  252. "populate_user_directory_process_users",
  253. "populate_user_directory_cleanup",
  254. ],
  255. ),
  256. ]
  257. )
  258. def test_start_backround_job(self, job_name: str, updates: Collection[str]) -> None:
  259. """
  260. Test that background updates add to database and be processed.
  261. Args:
  262. job_name: name of the job to call with API
  263. updates: collection of background updates to be started
  264. """
  265. # no background update is waiting
  266. self.assertTrue(
  267. self.get_success(
  268. self.store.db_pool.updates.has_completed_background_updates()
  269. )
  270. )
  271. channel = self.make_request(
  272. "POST",
  273. "/_synapse/admin/v1/background_updates/start_job",
  274. content={"job_name": job_name},
  275. access_token=self.admin_user_tok,
  276. )
  277. self.assertEqual(200, channel.code, msg=channel.json_body)
  278. # test that each background update is waiting now
  279. for update in updates:
  280. self.assertFalse(
  281. self.get_success(
  282. self.store.db_pool.updates.has_completed_background_update(update)
  283. )
  284. )
  285. self.wait_for_background_updates()
  286. # background updates are done
  287. self.assertTrue(
  288. self.get_success(
  289. self.store.db_pool.updates.has_completed_background_updates()
  290. )
  291. )
  292. def test_start_backround_job_twice(self) -> None:
  293. """Test that add a background update twice return an error."""
  294. # add job to database
  295. self.get_success(
  296. self.store.db_pool.simple_insert(
  297. table="background_updates",
  298. values={
  299. "update_name": "populate_stats_process_rooms",
  300. "progress_json": "{}",
  301. },
  302. )
  303. )
  304. channel = self.make_request(
  305. "POST",
  306. "/_synapse/admin/v1/background_updates/start_job",
  307. content={"job_name": "populate_stats_process_rooms"},
  308. access_token=self.admin_user_tok,
  309. )
  310. self.assertEqual(400, channel.code, msg=channel.json_body)