test_background_updates.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from http import HTTPStatus
  15. from typing import Collection
  16. from parameterized import parameterized
  17. from twisted.test.proto_helpers import MemoryReactor
  18. import synapse.rest.admin
  19. from synapse.api.errors import Codes
  20. from synapse.rest.client import login
  21. from synapse.server import HomeServer
  22. from synapse.storage.background_updates import BackgroundUpdater
  23. from synapse.types import JsonDict
  24. from synapse.util import Clock
  25. from tests import unittest
  26. class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
  27. servlets = [
  28. synapse.rest.admin.register_servlets,
  29. login.register_servlets,
  30. ]
  31. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  32. self.store = hs.get_datastores().main
  33. self.admin_user = self.register_user("admin", "pass", admin=True)
  34. self.admin_user_tok = self.login("admin", "pass")
  35. self.updater = BackgroundUpdater(hs, self.store.db_pool)
  36. @parameterized.expand(
  37. [
  38. ("GET", "/_synapse/admin/v1/background_updates/enabled"),
  39. ("POST", "/_synapse/admin/v1/background_updates/enabled"),
  40. ("GET", "/_synapse/admin/v1/background_updates/status"),
  41. ("POST", "/_synapse/admin/v1/background_updates/start_job"),
  42. ]
  43. )
  44. def test_requester_is_no_admin(self, method: str, url: str) -> None:
  45. """
  46. If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned.
  47. """
  48. self.register_user("user", "pass", admin=False)
  49. other_user_tok = self.login("user", "pass")
  50. channel = self.make_request(
  51. method,
  52. url,
  53. content={},
  54. access_token=other_user_tok,
  55. )
  56. self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
  57. self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
  58. def test_invalid_parameter(self) -> None:
  59. """
  60. If parameters are invalid, an error is returned.
  61. """
  62. url = "/_synapse/admin/v1/background_updates/start_job"
  63. # empty content
  64. channel = self.make_request(
  65. "POST",
  66. url,
  67. content={},
  68. access_token=self.admin_user_tok,
  69. )
  70. self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
  71. self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
  72. # job_name invalid
  73. channel = self.make_request(
  74. "POST",
  75. url,
  76. content={"job_name": "unknown"},
  77. access_token=self.admin_user_tok,
  78. )
  79. self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
  80. self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
  81. def _register_bg_update(self) -> None:
  82. "Adds a bg update but doesn't start it"
  83. async def _fake_update(progress: JsonDict, batch_size: int) -> int:
  84. await self.clock.sleep(0.2)
  85. return batch_size
  86. self.store.db_pool.updates.register_background_update_handler(
  87. "test_update",
  88. _fake_update,
  89. )
  90. self.get_success(
  91. self.store.db_pool.simple_insert(
  92. table="background_updates",
  93. values={
  94. "update_name": "test_update",
  95. "progress_json": "{}",
  96. },
  97. )
  98. )
  99. def test_status_empty(self) -> None:
  100. """Test the status API works."""
  101. channel = self.make_request(
  102. "GET",
  103. "/_synapse/admin/v1/background_updates/status",
  104. access_token=self.admin_user_tok,
  105. )
  106. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  107. # Background updates should be enabled, but none should be running.
  108. self.assertDictEqual(
  109. channel.json_body, {"current_updates": {}, "enabled": True}
  110. )
  111. def test_status_bg_update(self) -> None:
  112. """Test the status API works with a background update."""
  113. # Create a new background update
  114. self._register_bg_update()
  115. self.store.db_pool.updates.start_doing_background_updates()
  116. self.reactor.pump([1.0, 1.0, 1.0])
  117. channel = self.make_request(
  118. "GET",
  119. "/_synapse/admin/v1/background_updates/status",
  120. access_token=self.admin_user_tok,
  121. )
  122. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  123. # Background updates should be enabled, and one should be running.
  124. self.assertDictEqual(
  125. channel.json_body,
  126. {
  127. "current_updates": {
  128. "master": {
  129. "name": "test_update",
  130. "average_items_per_ms": 0.1,
  131. "total_duration_ms": 1000.0,
  132. "total_item_count": (
  133. self.updater.default_background_batch_size
  134. ),
  135. }
  136. },
  137. "enabled": True,
  138. },
  139. )
  140. def test_enabled(self) -> None:
  141. """Test the enabled API works."""
  142. # Create a new background update
  143. self._register_bg_update()
  144. self.store.db_pool.updates.start_doing_background_updates()
  145. # Test that GET works and returns enabled is True.
  146. channel = self.make_request(
  147. "GET",
  148. "/_synapse/admin/v1/background_updates/enabled",
  149. access_token=self.admin_user_tok,
  150. )
  151. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  152. self.assertDictEqual(channel.json_body, {"enabled": True})
  153. # Disable the BG updates
  154. channel = self.make_request(
  155. "POST",
  156. "/_synapse/admin/v1/background_updates/enabled",
  157. content={"enabled": False},
  158. access_token=self.admin_user_tok,
  159. )
  160. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  161. self.assertDictEqual(channel.json_body, {"enabled": False})
  162. # Advance a bit and get the current status, note this will finish the in
  163. # flight background update so we call it the status API twice and check
  164. # there was no change.
  165. self.reactor.pump([1.0, 1.0])
  166. channel = self.make_request(
  167. "GET",
  168. "/_synapse/admin/v1/background_updates/status",
  169. access_token=self.admin_user_tok,
  170. )
  171. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  172. self.assertDictEqual(
  173. channel.json_body,
  174. {
  175. "current_updates": {
  176. "master": {
  177. "name": "test_update",
  178. "average_items_per_ms": 0.1,
  179. "total_duration_ms": 1000.0,
  180. "total_item_count": (
  181. self.updater.default_background_batch_size
  182. ),
  183. }
  184. },
  185. "enabled": False,
  186. },
  187. )
  188. # Run the reactor for a bit so the BG updates would have a chance to run
  189. # if they were to.
  190. self.reactor.pump([1.0, 1.0])
  191. channel = self.make_request(
  192. "GET",
  193. "/_synapse/admin/v1/background_updates/status",
  194. access_token=self.admin_user_tok,
  195. )
  196. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  197. # There should be no change from the previous /status response.
  198. self.assertDictEqual(
  199. channel.json_body,
  200. {
  201. "current_updates": {
  202. "master": {
  203. "name": "test_update",
  204. "average_items_per_ms": 0.1,
  205. "total_duration_ms": 1000.0,
  206. "total_item_count": (
  207. self.updater.default_background_batch_size
  208. ),
  209. }
  210. },
  211. "enabled": False,
  212. },
  213. )
  214. # Re-enable the background updates.
  215. channel = self.make_request(
  216. "POST",
  217. "/_synapse/admin/v1/background_updates/enabled",
  218. content={"enabled": True},
  219. access_token=self.admin_user_tok,
  220. )
  221. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  222. self.assertDictEqual(channel.json_body, {"enabled": True})
  223. self.reactor.pump([1.0, 1.0])
  224. channel = self.make_request(
  225. "GET",
  226. "/_synapse/admin/v1/background_updates/status",
  227. access_token=self.admin_user_tok,
  228. )
  229. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  230. # Background updates should be enabled and making progress.
  231. self.assertDictEqual(
  232. channel.json_body,
  233. {
  234. "current_updates": {
  235. "master": {
  236. "name": "test_update",
  237. "average_items_per_ms": 0.05263157894736842,
  238. "total_duration_ms": 2000.0,
  239. "total_item_count": (110),
  240. }
  241. },
  242. "enabled": True,
  243. },
  244. )
  245. @parameterized.expand(
  246. [
  247. ("populate_stats_process_rooms", ["populate_stats_process_rooms"]),
  248. (
  249. "regenerate_directory",
  250. [
  251. "populate_user_directory_createtables",
  252. "populate_user_directory_process_rooms",
  253. "populate_user_directory_process_users",
  254. "populate_user_directory_cleanup",
  255. ],
  256. ),
  257. ]
  258. )
  259. def test_start_backround_job(self, job_name: str, updates: Collection[str]) -> None:
  260. """
  261. Test that background updates add to database and be processed.
  262. Args:
  263. job_name: name of the job to call with API
  264. updates: collection of background updates to be started
  265. """
  266. # no background update is waiting
  267. self.assertTrue(
  268. self.get_success(
  269. self.store.db_pool.updates.has_completed_background_updates()
  270. )
  271. )
  272. channel = self.make_request(
  273. "POST",
  274. "/_synapse/admin/v1/background_updates/start_job",
  275. content={"job_name": job_name},
  276. access_token=self.admin_user_tok,
  277. )
  278. self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
  279. # test that each background update is waiting now
  280. for update in updates:
  281. self.assertFalse(
  282. self.get_success(
  283. self.store.db_pool.updates.has_completed_background_update(update)
  284. )
  285. )
  286. self.wait_for_background_updates()
  287. # background updates are done
  288. self.assertTrue(
  289. self.get_success(
  290. self.store.db_pool.updates.has_completed_background_updates()
  291. )
  292. )
  293. def test_start_backround_job_twice(self) -> None:
  294. """Test that add a background update twice return an error."""
  295. # add job to database
  296. self.get_success(
  297. self.store.db_pool.simple_insert(
  298. table="background_updates",
  299. values={
  300. "update_name": "populate_stats_process_rooms",
  301. "progress_json": "{}",
  302. },
  303. )
  304. )
  305. channel = self.make_request(
  306. "POST",
  307. "/_synapse/admin/v1/background_updates/start_job",
  308. content={"job_name": "populate_stats_process_rooms"},
  309. access_token=self.admin_user_tok,
  310. )
  311. self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)