123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368 |
- # Copyright 2021 The Matrix.org Foundation C.I.C.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from http import HTTPStatus
- from typing import Collection
- from parameterized import parameterized
- from twisted.test.proto_helpers import MemoryReactor
- import synapse.rest.admin
- from synapse.api.errors import Codes
- from synapse.rest.client import login
- from synapse.server import HomeServer
- from synapse.storage.background_updates import BackgroundUpdater
- from synapse.types import JsonDict
- from synapse.util import Clock
- from tests import unittest
- class BackgroundUpdatesTestCase(unittest.HomeserverTestCase):
- servlets = [
- synapse.rest.admin.register_servlets,
- login.register_servlets,
- ]
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.store = hs.get_datastores().main
- self.admin_user = self.register_user("admin", "pass", admin=True)
- self.admin_user_tok = self.login("admin", "pass")
- self.updater = BackgroundUpdater(hs, self.store.db_pool)
- @parameterized.expand(
- [
- ("GET", "/_synapse/admin/v1/background_updates/enabled"),
- ("POST", "/_synapse/admin/v1/background_updates/enabled"),
- ("GET", "/_synapse/admin/v1/background_updates/status"),
- ("POST", "/_synapse/admin/v1/background_updates/start_job"),
- ]
- )
- def test_requester_is_no_admin(self, method: str, url: str) -> None:
- """
- If the user is not a server admin, an error HTTPStatus.FORBIDDEN is returned.
- """
- self.register_user("user", "pass", admin=False)
- other_user_tok = self.login("user", "pass")
- channel = self.make_request(
- method,
- url,
- content={},
- access_token=other_user_tok,
- )
- self.assertEqual(HTTPStatus.FORBIDDEN, channel.code, msg=channel.json_body)
- self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
- def test_invalid_parameter(self) -> None:
- """
- If parameters are invalid, an error is returned.
- """
- url = "/_synapse/admin/v1/background_updates/start_job"
- # empty content
- channel = self.make_request(
- "POST",
- url,
- content={},
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
- self.assertEqual(Codes.MISSING_PARAM, channel.json_body["errcode"])
- # job_name invalid
- channel = self.make_request(
- "POST",
- url,
- content={"job_name": "unknown"},
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
- self.assertEqual(Codes.UNKNOWN, channel.json_body["errcode"])
- def _register_bg_update(self) -> None:
- "Adds a bg update but doesn't start it"
- async def _fake_update(progress: JsonDict, batch_size: int) -> int:
- await self.clock.sleep(0.2)
- return batch_size
- self.store.db_pool.updates.register_background_update_handler(
- "test_update",
- _fake_update,
- )
- self.get_success(
- self.store.db_pool.simple_insert(
- table="background_updates",
- values={
- "update_name": "test_update",
- "progress_json": "{}",
- },
- )
- )
- def test_status_empty(self) -> None:
- """Test the status API works."""
- channel = self.make_request(
- "GET",
- "/_synapse/admin/v1/background_updates/status",
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- # Background updates should be enabled, but none should be running.
- self.assertDictEqual(
- channel.json_body, {"current_updates": {}, "enabled": True}
- )
- def test_status_bg_update(self) -> None:
- """Test the status API works with a background update."""
- # Create a new background update
- self._register_bg_update()
- self.store.db_pool.updates.start_doing_background_updates()
- self.reactor.pump([1.0, 1.0, 1.0])
- channel = self.make_request(
- "GET",
- "/_synapse/admin/v1/background_updates/status",
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- # Background updates should be enabled, and one should be running.
- self.assertDictEqual(
- channel.json_body,
- {
- "current_updates": {
- "master": {
- "name": "test_update",
- "average_items_per_ms": 0.1,
- "total_duration_ms": 1000.0,
- "total_item_count": (
- self.updater.default_background_batch_size
- ),
- }
- },
- "enabled": True,
- },
- )
- def test_enabled(self) -> None:
- """Test the enabled API works."""
- # Create a new background update
- self._register_bg_update()
- self.store.db_pool.updates.start_doing_background_updates()
- # Test that GET works and returns enabled is True.
- channel = self.make_request(
- "GET",
- "/_synapse/admin/v1/background_updates/enabled",
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- self.assertDictEqual(channel.json_body, {"enabled": True})
- # Disable the BG updates
- channel = self.make_request(
- "POST",
- "/_synapse/admin/v1/background_updates/enabled",
- content={"enabled": False},
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- self.assertDictEqual(channel.json_body, {"enabled": False})
- # Advance a bit and get the current status, note this will finish the in
- # flight background update so we call it the status API twice and check
- # there was no change.
- self.reactor.pump([1.0, 1.0])
- channel = self.make_request(
- "GET",
- "/_synapse/admin/v1/background_updates/status",
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- self.assertDictEqual(
- channel.json_body,
- {
- "current_updates": {
- "master": {
- "name": "test_update",
- "average_items_per_ms": 0.1,
- "total_duration_ms": 1000.0,
- "total_item_count": (
- self.updater.default_background_batch_size
- ),
- }
- },
- "enabled": False,
- },
- )
- # Run the reactor for a bit so the BG updates would have a chance to run
- # if they were to.
- self.reactor.pump([1.0, 1.0])
- channel = self.make_request(
- "GET",
- "/_synapse/admin/v1/background_updates/status",
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- # There should be no change from the previous /status response.
- self.assertDictEqual(
- channel.json_body,
- {
- "current_updates": {
- "master": {
- "name": "test_update",
- "average_items_per_ms": 0.1,
- "total_duration_ms": 1000.0,
- "total_item_count": (
- self.updater.default_background_batch_size
- ),
- }
- },
- "enabled": False,
- },
- )
- # Re-enable the background updates.
- channel = self.make_request(
- "POST",
- "/_synapse/admin/v1/background_updates/enabled",
- content={"enabled": True},
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- self.assertDictEqual(channel.json_body, {"enabled": True})
- self.reactor.pump([1.0, 1.0])
- channel = self.make_request(
- "GET",
- "/_synapse/admin/v1/background_updates/status",
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- # Background updates should be enabled and making progress.
- self.assertDictEqual(
- channel.json_body,
- {
- "current_updates": {
- "master": {
- "name": "test_update",
- "average_items_per_ms": 0.05263157894736842,
- "total_duration_ms": 2000.0,
- "total_item_count": (110),
- }
- },
- "enabled": True,
- },
- )
- @parameterized.expand(
- [
- ("populate_stats_process_rooms", ["populate_stats_process_rooms"]),
- (
- "regenerate_directory",
- [
- "populate_user_directory_createtables",
- "populate_user_directory_process_rooms",
- "populate_user_directory_process_users",
- "populate_user_directory_cleanup",
- ],
- ),
- ]
- )
- def test_start_backround_job(self, job_name: str, updates: Collection[str]) -> None:
- """
- Test that background updates add to database and be processed.
- Args:
- job_name: name of the job to call with API
- updates: collection of background updates to be started
- """
- # no background update is waiting
- self.assertTrue(
- self.get_success(
- self.store.db_pool.updates.has_completed_background_updates()
- )
- )
- channel = self.make_request(
- "POST",
- "/_synapse/admin/v1/background_updates/start_job",
- content={"job_name": job_name},
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body)
- # test that each background update is waiting now
- for update in updates:
- self.assertFalse(
- self.get_success(
- self.store.db_pool.updates.has_completed_background_update(update)
- )
- )
- self.wait_for_background_updates()
- # background updates are done
- self.assertTrue(
- self.get_success(
- self.store.db_pool.updates.has_completed_background_updates()
- )
- )
- def test_start_backround_job_twice(self) -> None:
- """Test that add a background update twice return an error."""
- # add job to database
- self.get_success(
- self.store.db_pool.simple_insert(
- table="background_updates",
- values={
- "update_name": "populate_stats_process_rooms",
- "progress_json": "{}",
- },
- )
- )
- channel = self.make_request(
- "POST",
- "/_synapse/admin/v1/background_updates/start_job",
- content={"job_name": "populate_stats_process_rooms"},
- access_token=self.admin_user_tok,
- )
- self.assertEqual(HTTPStatus.BAD_REQUEST, channel.code, msg=channel.json_body)
|