test_background_updates.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from http import HTTPStatus
  15. from typing import Collection
  16. from parameterized import parameterized
  17. import synapse.rest.admin
  18. from synapse.api.errors import Codes
  19. from synapse.rest.client import login
  20. from synapse.server import HomeServer
  21. from synapse.storage.background_updates import BackgroundUpdater
  22. from tests import unittest
  23. class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
  24. servlets = [
  25. synapse.rest.admin.register_servlets,
  26. login.register_servlets,
  27. ]
  28. def prepare(self, reactor, clock, hs: HomeServer):
  29. self.store = hs.get_datastore()
  30. self.admin_user = self.register_user("admin", "pass", admin=True)
  31. self.admin_user_tok = self.login("admin", "pass")
  32. @parameterized.expand(
  33. [
  34. ("GET", "/_synapse/admin/v1/background_updates/enabled"),
  35. ("POST", "/_synapse/admin/v1/background_updates/enabled"),
  36. ("GET", "/_synapse/admin/v1/background_updates/status"),
  37. ("POST", "/_synapse/admin/v1/background_updates/start_job"),
  38. ]
  39. )
  40. def test_requester_is_no_admin(self, method: str, url: str):
  41. """
  42. If the user is not a server admin, an error 403 is returned.
  43. """
  44. self.register_user("user", "pass", admin=False)
  45. other_user_tok = self.login("user", "pass")
  46. channel = self.make_request(
  47. method,
  48. url,
  49. content={},
  50. access_token=other_user_tok,
  51. )
  52. self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
  53. self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
  54. def test_invalid_parameter(self):
  55. """
  56. If parameters are invalid, an error is returned.
  57. """
  58. url = "/_synapse/admin/v1/background_updates/start_job"
  59. # empty content
  60. channel = self.make_request(
  61. "POST",
  62. url,
  63. content={},
  64. access_token=self.admin_user_tok,
  65. )
  66. self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
  67. self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
  68. # job_name invalid
  69. channel = self.make_request(
  70. "POST",
  71. url,
  72. content={"job_name": "unknown"},
  73. access_token=self.admin_user_tok,
  74. )
  75. self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
  76. self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
  77. def _register_bg_update(self):
  78. "Adds a bg update but doesn't start it"
  79. async def _fake_update(progress, batch_size) -> int:
  80. await self.clock.sleep(0.2)
  81. return batch_size
  82. self.store.db_pool.updates.register_background_update_handler(
  83. "test_update",
  84. _fake_update,
  85. )
  86. self.get_success(
  87. self.store.db_pool.simple_insert(
  88. table="background_updates",
  89. values={
  90. "update_name": "test_update",
  91. "progress_json": "{}",
  92. },
  93. )
  94. )
  95. def test_status_empty(self):
  96. """Test the status API works."""
  97. channel = self.make_request(
  98. "GET",
  99. "/_synapse/admin/v1/background_updates/status",
  100. access_token=self.admin_user_tok,
  101. )
  102. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  103. # Background updates should be enabled, but none should be running.
  104. self.assertDictEqual(
  105. channel.json_body, {"current_updates": {}, "enabled": True}
  106. )
  107. def test_status_bg_update(self):
  108. """Test the status API works with a background update."""
  109. # Create a new background update
  110. self._register_bg_update()
  111. self.store.db_pool.updates.start_doing_background_updates()
  112. self.reactor.pump([1.0, 1.0, 1.0])
  113. channel = self.make_request(
  114. "GET",
  115. "/_synapse/admin/v1/background_updates/status",
  116. access_token=self.admin_user_tok,
  117. )
  118. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  119. # Background updates should be enabled, and one should be running.
  120. self.assertDictEqual(
  121. channel.json_body,
  122. {
  123. "current_updates": {
  124. "master": {
  125. "name": "test_update",
  126. "average_items_per_ms": 0.001,
  127. "total_duration_ms": 1000.0,
  128. "total_item_count": (
  129. BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE
  130. ),
  131. }
  132. },
  133. "enabled": True,
  134. },
  135. )
  136. def test_enabled(self):
  137. """Test the enabled API works."""
  138. # Create a new background update
  139. self._register_bg_update()
  140. self.store.db_pool.updates.start_doing_background_updates()
  141. # Test that GET works and returns enabled is True.
  142. channel = self.make_request(
  143. "GET",
  144. "/_synapse/admin/v1/background_updates/enabled",
  145. access_token=self.admin_user_tok,
  146. )
  147. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  148. self.assertDictEqual(channel.json_body, {"enabled": True})
  149. # Disable the BG updates
  150. channel = self.make_request(
  151. "POST",
  152. "/_synapse/admin/v1/background_updates/enabled",
  153. content={"enabled": False},
  154. access_token=self.admin_user_tok,
  155. )
  156. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  157. self.assertDictEqual(channel.json_body, {"enabled": False})
  158. # Advance a bit and get the current status, note this will finish the in
  159. # flight background update so we call it the status API twice and check
  160. # there was no change.
  161. self.reactor.pump([1.0, 1.0])
  162. channel = self.make_request(
  163. "GET",
  164. "/_synapse/admin/v1/background_updates/status",
  165. access_token=self.admin_user_tok,
  166. )
  167. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  168. self.assertDictEqual(
  169. channel.json_body,
  170. {
  171. "current_updates": {
  172. "master": {
  173. "name": "test_update",
  174. "average_items_per_ms": 0.001,
  175. "total_duration_ms": 1000.0,
  176. "total_item_count": (
  177. BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE
  178. ),
  179. }
  180. },
  181. "enabled": False,
  182. },
  183. )
  184. # Run the reactor for a bit so the BG updates would have a chance to run
  185. # if they were to.
  186. self.reactor.pump([1.0, 1.0])
  187. channel = self.make_request(
  188. "GET",
  189. "/_synapse/admin/v1/background_updates/status",
  190. access_token=self.admin_user_tok,
  191. )
  192. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  193. # There should be no change from the previous /status response.
  194. self.assertDictEqual(
  195. channel.json_body,
  196. {
  197. "current_updates": {
  198. "master": {
  199. "name": "test_update",
  200. "average_items_per_ms": 0.001,
  201. "total_duration_ms": 1000.0,
  202. "total_item_count": (
  203. BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE
  204. ),
  205. }
  206. },
  207. "enabled": False,
  208. },
  209. )
  210. # Re-enable the background updates.
  211. channel = self.make_request(
  212. "POST",
  213. "/_synapse/admin/v1/background_updates/enabled",
  214. content={"enabled": True},
  215. access_token=self.admin_user_tok,
  216. )
  217. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  218. self.assertDictEqual(channel.json_body, {"enabled": True})
  219. self.reactor.pump([1.0, 1.0])
  220. channel = self.make_request(
  221. "GET",
  222. "/_synapse/admin/v1/background_updates/status",
  223. access_token=self.admin_user_tok,
  224. )
  225. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  226. # Background updates should be enabled and making progress.
  227. self.assertDictEqual(
  228. channel.json_body,
  229. {
  230. "current_updates": {
  231. "master": {
  232. "name": "test_update",
  233. "average_items_per_ms": 0.001,
  234. "total_duration_ms": 2000.0,
  235. "total_item_count": (
  236. 2 * BackgroundUpdater.MINIMUM_BACKGROUND_BATCH_SIZE
  237. ),
  238. }
  239. },
  240. "enabled": True,
  241. },
  242. )
  243. @parameterized.expand(
  244. [
  245. ("populate_stats_process_rooms", ["populate_stats_process_rooms"]),
  246. (
  247. "regenerate_directory",
  248. [
  249. "populate_user_directory_createtables",
  250. "populate_user_directory_process_rooms",
  251. "populate_user_directory_process_users",
  252. "populate_user_directory_cleanup",
  253. ],
  254. ),
  255. ]
  256. )
  257. def test_start_backround_job(self, job_name: str, updates: Collection[str]):
  258. """
  259. Test that background updates add to database and be processed.
  260. Args:
  261. job_name: name of the job to call with API
  262. updates: collection of background updates to be started
  263. """
  264. # no background update is waiting
  265. self.assertTrue(
  266. self.get_success(
  267. self.store.db_pool.updates.has_completed_background_updates()
  268. )
  269. )
  270. channel = self.make_request(
  271. "POST",
  272. "/_synapse/admin/v1/background_updates/start_job",
  273. content={"job_name": job_name},
  274. access_token=self.admin_user_tok,
  275. )
  276. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  277. # test that each background update is waiting now
  278. for update in updates:
  279. self.assertFalse(
  280. self.get_success(
  281. self.store.db_pool.updates.has_completed_background_update(update)
  282. )
  283. )
  284. self.wait_for_background_updates()
  285. # background updates are done
  286. self.assertTrue(
  287. self.get_success(
  288. self.store.db_pool.updates.has_completed_background_updates()
  289. )
  290. )
  291. def test_start_backround_job_twice(self):
  292. """Test that add a background update twice return an error."""
  293. # add job to database
  294. self.get_success(
  295. self.store.db_pool.simple_insert(
  296. table="background_updates",
  297. values={
  298. "update_name": "populate_stats_process_rooms",
  299. "progress_json": "{}",
  300. },
  301. )
  302. )
  303. channel = self.make_request(
  304. "POST",
  305. "/_synapse/admin/v1/background_updates/start_job",
  306. content={"job_name": "populate_stats_process_rooms"},
  307. access_token=self.admin_user_tok,
  308. )
  309. self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)