test_sharded_event_persister.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from unittest.mock import patch
  16. from synapse.api.room_versions import RoomVersion
  17. from synapse.rest import admin
  18. from synapse.rest.client import login, room, sync
  19. from synapse.storage.util.id_generators import MultiWriterIdGenerator
  20. from tests.replication._base import BaseMultiWorkerStreamTestCase
  21. from tests.server import make_request
  22. from tests.utils import USE_POSTGRES_FOR_TESTS
  23. logger = logging.getLogger(__name__)
  24. class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
  25. """Checks event persisting sharding works"""
  26. # Event persister sharding requires postgres (due to needing
  27. # `MultiWriterIdGenerator`).
  28. if not USE_POSTGRES_FOR_TESTS:
  29. skip = "Requires Postgres"
  30. servlets = [
  31. admin.register_servlets_for_client_rest_resource,
  32. room.register_servlets,
  33. login.register_servlets,
  34. sync.register_servlets,
  35. ]
  36. def prepare(self, reactor, clock, hs):
  37. # Register a user who sends a message that we'll get notified about
  38. self.other_user_id = self.register_user("otheruser", "pass")
  39. self.other_access_token = self.login("otheruser", "pass")
  40. self.room_creator = self.hs.get_room_creation_handler()
  41. self.store = hs.get_datastore()
  42. def default_config(self):
  43. conf = super().default_config()
  44. conf["redis"] = {"enabled": "true"}
  45. conf["stream_writers"] = {"events": ["worker1", "worker2"]}
  46. conf["instance_map"] = {
  47. "worker1": {"host": "testserv", "port": 1001},
  48. "worker2": {"host": "testserv", "port": 1002},
  49. }
  50. return conf
  51. def _create_room(self, room_id: str, user_id: str, tok: str):
  52. """Create a room with given room_id"""
  53. # We control the room ID generation by patching out the
  54. # `_generate_room_id` method
  55. async def generate_room(
  56. creator_id: str, is_public: bool, room_version: RoomVersion
  57. ):
  58. await self.store.store_room(
  59. room_id=room_id,
  60. room_creator_user_id=creator_id,
  61. is_public=is_public,
  62. room_version=room_version,
  63. )
  64. return room_id
  65. with patch(
  66. "synapse.handlers.room.RoomCreationHandler._generate_room_id"
  67. ) as mock:
  68. mock.side_effect = generate_room
  69. self.helper.create_room_as(user_id, tok=tok)
  70. def test_basic(self):
  71. """Simple test to ensure that multiple rooms can be created and joined,
  72. and that different rooms get handled by different instances.
  73. """
  74. self.make_worker_hs(
  75. "synapse.app.generic_worker",
  76. {"worker_name": "worker1"},
  77. )
  78. self.make_worker_hs(
  79. "synapse.app.generic_worker",
  80. {"worker_name": "worker2"},
  81. )
  82. persisted_on_1 = False
  83. persisted_on_2 = False
  84. store = self.hs.get_datastore()
  85. user_id = self.register_user("user", "pass")
  86. access_token = self.login("user", "pass")
  87. # Keep making new rooms until we see rooms being persisted on both
  88. # workers.
  89. for _ in range(10):
  90. # Create a room
  91. room = self.helper.create_room_as(user_id, tok=access_token)
  92. # The other user joins
  93. self.helper.join(
  94. room=room, user=self.other_user_id, tok=self.other_access_token
  95. )
  96. # The other user sends some messages
  97. rseponse = self.helper.send(room, body="Hi!", tok=self.other_access_token)
  98. event_id = rseponse["event_id"]
  99. # The event position includes which instance persisted the event.
  100. pos = self.get_success(store.get_position_for_event(event_id))
  101. persisted_on_1 |= pos.instance_name == "worker1"
  102. persisted_on_2 |= pos.instance_name == "worker2"
  103. if persisted_on_1 and persisted_on_2:
  104. break
  105. self.assertTrue(persisted_on_1)
  106. self.assertTrue(persisted_on_2)
  107. def test_vector_clock_token(self):
  108. """Tests that using a stream token with a vector clock component works
  109. correctly with basic /sync and /messages usage.
  110. """
  111. self.make_worker_hs(
  112. "synapse.app.generic_worker",
  113. {"worker_name": "worker1"},
  114. )
  115. worker_hs2 = self.make_worker_hs(
  116. "synapse.app.generic_worker",
  117. {"worker_name": "worker2"},
  118. )
  119. sync_hs = self.make_worker_hs(
  120. "synapse.app.generic_worker",
  121. {"worker_name": "sync"},
  122. )
  123. sync_hs_site = self._hs_to_site[sync_hs]
  124. # Specially selected room IDs that get persisted on different workers.
  125. room_id1 = "!foo:test"
  126. room_id2 = "!baz:test"
  127. self.assertEqual(
  128. self.hs.config.worker.events_shard_config.get_instance(room_id1), "worker1"
  129. )
  130. self.assertEqual(
  131. self.hs.config.worker.events_shard_config.get_instance(room_id2), "worker2"
  132. )
  133. user_id = self.register_user("user", "pass")
  134. access_token = self.login("user", "pass")
  135. store = self.hs.get_datastore()
  136. # Create two room on the different workers.
  137. self._create_room(room_id1, user_id, access_token)
  138. self._create_room(room_id2, user_id, access_token)
  139. # The other user joins
  140. self.helper.join(
  141. room=room_id1, user=self.other_user_id, tok=self.other_access_token
  142. )
  143. self.helper.join(
  144. room=room_id2, user=self.other_user_id, tok=self.other_access_token
  145. )
  146. # Do an initial sync so that we're up to date.
  147. channel = make_request(
  148. self.reactor, sync_hs_site, "GET", "/sync", access_token=access_token
  149. )
  150. next_batch = channel.json_body["next_batch"]
  151. # We now gut wrench into the events stream MultiWriterIdGenerator on
  152. # worker2 to mimic it getting stuck persisting an event. This ensures
  153. # that when we send an event on worker1 we end up in a state where
  154. # worker2 events stream position lags that on worker1, resulting in a
  155. # RoomStreamToken with a non-empty instance map component.
  156. #
  157. # Worker2's event stream position will not advance until we call
  158. # __aexit__ again.
  159. worker_store2 = worker_hs2.get_datastore()
  160. assert isinstance(worker_store2._stream_id_gen, MultiWriterIdGenerator)
  161. actx = worker_store2._stream_id_gen.get_next()
  162. self.get_success(actx.__aenter__())
  163. response = self.helper.send(room_id1, body="Hi!", tok=self.other_access_token)
  164. first_event_in_room1 = response["event_id"]
  165. # Assert that the current stream token has an instance map component, as
  166. # we are trying to test vector clock tokens.
  167. room_stream_token = store.get_room_max_token()
  168. self.assertNotEqual(len(room_stream_token.instance_map), 0)
  169. # Check that syncing still gets the new event, despite the gap in the
  170. # stream IDs.
  171. channel = make_request(
  172. self.reactor,
  173. sync_hs_site,
  174. "GET",
  175. f"/sync?since={next_batch}",
  176. access_token=access_token,
  177. )
  178. # We should only see the new event and nothing else
  179. self.assertIn(room_id1, channel.json_body["rooms"]["join"])
  180. self.assertNotIn(room_id2, channel.json_body["rooms"]["join"])
  181. events = channel.json_body["rooms"]["join"][room_id1]["timeline"]["events"]
  182. self.assertListEqual(
  183. [first_event_in_room1], [event["event_id"] for event in events]
  184. )
  185. # Get the next batch and makes sure its a vector clock style token.
  186. vector_clock_token = channel.json_body["next_batch"]
  187. self.assertTrue(vector_clock_token.startswith("m"))
  188. # Now that we've got a vector clock token we finish the fake persisting
  189. # an event we started above.
  190. self.get_success(actx.__aexit__(None, None, None))
  191. # Now try and send an event to the other rooom so that we can test that
  192. # the vector clock style token works as a `since` token.
  193. response = self.helper.send(room_id2, body="Hi!", tok=self.other_access_token)
  194. first_event_in_room2 = response["event_id"]
  195. channel = make_request(
  196. self.reactor,
  197. sync_hs_site,
  198. "GET",
  199. f"/sync?since={vector_clock_token}",
  200. access_token=access_token,
  201. )
  202. self.assertNotIn(room_id1, channel.json_body["rooms"]["join"])
  203. self.assertIn(room_id2, channel.json_body["rooms"]["join"])
  204. events = channel.json_body["rooms"]["join"][room_id2]["timeline"]["events"]
  205. self.assertListEqual(
  206. [first_event_in_room2], [event["event_id"] for event in events]
  207. )
  208. next_batch = channel.json_body["next_batch"]
  209. # We also want to test that the vector clock style token works with
  210. # pagination. We do this by sending a couple of new events into the room
  211. # and syncing again to get a prev_batch token for each room, then
  212. # paginating from there back to the vector clock token.
  213. self.helper.send(room_id1, body="Hi again!", tok=self.other_access_token)
  214. self.helper.send(room_id2, body="Hi again!", tok=self.other_access_token)
  215. channel = make_request(
  216. self.reactor,
  217. sync_hs_site,
  218. "GET",
  219. f"/sync?since={next_batch}",
  220. access_token=access_token,
  221. )
  222. prev_batch1 = channel.json_body["rooms"]["join"][room_id1]["timeline"][
  223. "prev_batch"
  224. ]
  225. prev_batch2 = channel.json_body["rooms"]["join"][room_id2]["timeline"][
  226. "prev_batch"
  227. ]
  228. # Paginating back in the first room should not produce any results, as
  229. # no events have happened in it. This tests that we are correctly
  230. # filtering results based on the vector clock portion.
  231. channel = make_request(
  232. self.reactor,
  233. sync_hs_site,
  234. "GET",
  235. "/rooms/{}/messages?from={}&to={}&dir=b".format(
  236. room_id1, prev_batch1, vector_clock_token
  237. ),
  238. access_token=access_token,
  239. )
  240. self.assertListEqual([], channel.json_body["chunk"])
  241. # Paginating back on the second room should produce the first event
  242. # again. This tests that pagination isn't completely broken.
  243. channel = make_request(
  244. self.reactor,
  245. sync_hs_site,
  246. "GET",
  247. "/rooms/{}/messages?from={}&to={}&dir=b".format(
  248. room_id2, prev_batch2, vector_clock_token
  249. ),
  250. access_token=access_token,
  251. )
  252. self.assertEqual(len(channel.json_body["chunk"]), 1)
  253. self.assertEqual(
  254. channel.json_body["chunk"][0]["event_id"], first_event_in_room2
  255. )
  256. # Paginating forwards should give the same results
  257. channel = make_request(
  258. self.reactor,
  259. sync_hs_site,
  260. "GET",
  261. "/rooms/{}/messages?from={}&to={}&dir=f".format(
  262. room_id1, vector_clock_token, prev_batch1
  263. ),
  264. access_token=access_token,
  265. )
  266. self.assertListEqual([], channel.json_body["chunk"])
  267. channel = make_request(
  268. self.reactor,
  269. sync_hs_site,
  270. "GET",
  271. "/rooms/{}/messages?from={}&to={}&dir=f".format(
  272. room_id2,
  273. vector_clock_token,
  274. prev_batch2,
  275. ),
  276. access_token=access_token,
  277. )
  278. self.assertEqual(len(channel.json_body["chunk"]), 1)
  279. self.assertEqual(
  280. channel.json_body["chunk"][0]["event_id"], first_event_in_room2
  281. )