1
0

test_sharded_event_persister.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from unittest.mock import patch
  16. from synapse.api.room_versions import RoomVersion
  17. from synapse.rest import admin
  18. from synapse.rest.client.v1 import login, room
  19. from synapse.rest.client.v2_alpha import sync
  20. from tests.replication._base import BaseMultiWorkerStreamTestCase
  21. from tests.server import make_request
  22. from tests.utils import USE_POSTGRES_FOR_TESTS
  23. logger = logging.getLogger(__name__)
  24. class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
  25. """Checks event persisting sharding works"""
  26. # Event persister sharding requires postgres (due to needing
  27. # `MultiWriterIdGenerator`).
  28. if not USE_POSTGRES_FOR_TESTS:
  29. skip = "Requires Postgres"
  30. servlets = [
  31. admin.register_servlets_for_client_rest_resource,
  32. room.register_servlets,
  33. login.register_servlets,
  34. sync.register_servlets,
  35. ]
  36. def prepare(self, reactor, clock, hs):
  37. # Register a user who sends a message that we'll get notified about
  38. self.other_user_id = self.register_user("otheruser", "pass")
  39. self.other_access_token = self.login("otheruser", "pass")
  40. self.room_creator = self.hs.get_room_creation_handler()
  41. self.store = hs.get_datastore()
  42. def default_config(self):
  43. conf = super().default_config()
  44. conf["redis"] = {"enabled": "true"}
  45. conf["stream_writers"] = {"events": ["worker1", "worker2"]}
  46. conf["instance_map"] = {
  47. "worker1": {"host": "testserv", "port": 1001},
  48. "worker2": {"host": "testserv", "port": 1002},
  49. }
  50. return conf
  51. def _create_room(self, room_id: str, user_id: str, tok: str):
  52. """Create a room with given room_id"""
  53. # We control the room ID generation by patching out the
  54. # `_generate_room_id` method
  55. async def generate_room(
  56. creator_id: str, is_public: bool, room_version: RoomVersion
  57. ):
  58. await self.store.store_room(
  59. room_id=room_id,
  60. room_creator_user_id=creator_id,
  61. is_public=is_public,
  62. room_version=room_version,
  63. )
  64. return room_id
  65. with patch(
  66. "synapse.handlers.room.RoomCreationHandler._generate_room_id"
  67. ) as mock:
  68. mock.side_effect = generate_room
  69. self.helper.create_room_as(user_id, tok=tok)
  70. def test_basic(self):
  71. """Simple test to ensure that multiple rooms can be created and joined,
  72. and that different rooms get handled by different instances.
  73. """
  74. self.make_worker_hs(
  75. "synapse.app.generic_worker",
  76. {"worker_name": "worker1"},
  77. )
  78. self.make_worker_hs(
  79. "synapse.app.generic_worker",
  80. {"worker_name": "worker2"},
  81. )
  82. persisted_on_1 = False
  83. persisted_on_2 = False
  84. store = self.hs.get_datastore()
  85. user_id = self.register_user("user", "pass")
  86. access_token = self.login("user", "pass")
  87. # Keep making new rooms until we see rooms being persisted on both
  88. # workers.
  89. for _ in range(10):
  90. # Create a room
  91. room = self.helper.create_room_as(user_id, tok=access_token)
  92. # The other user joins
  93. self.helper.join(
  94. room=room, user=self.other_user_id, tok=self.other_access_token
  95. )
  96. # The other user sends some messages
  97. rseponse = self.helper.send(room, body="Hi!", tok=self.other_access_token)
  98. event_id = rseponse["event_id"]
  99. # The event position includes which instance persisted the event.
  100. pos = self.get_success(store.get_position_for_event(event_id))
  101. persisted_on_1 |= pos.instance_name == "worker1"
  102. persisted_on_2 |= pos.instance_name == "worker2"
  103. if persisted_on_1 and persisted_on_2:
  104. break
  105. self.assertTrue(persisted_on_1)
  106. self.assertTrue(persisted_on_2)
  107. def test_vector_clock_token(self):
  108. """Tests that using a stream token with a vector clock component works
  109. correctly with basic /sync and /messages usage.
  110. """
  111. self.make_worker_hs(
  112. "synapse.app.generic_worker",
  113. {"worker_name": "worker1"},
  114. )
  115. worker_hs2 = self.make_worker_hs(
  116. "synapse.app.generic_worker",
  117. {"worker_name": "worker2"},
  118. )
  119. sync_hs = self.make_worker_hs(
  120. "synapse.app.generic_worker",
  121. {"worker_name": "sync"},
  122. )
  123. sync_hs_site = self._hs_to_site[sync_hs]
  124. # Specially selected room IDs that get persisted on different workers.
  125. room_id1 = "!foo:test"
  126. room_id2 = "!baz:test"
  127. self.assertEqual(
  128. self.hs.config.worker.events_shard_config.get_instance(room_id1), "worker1"
  129. )
  130. self.assertEqual(
  131. self.hs.config.worker.events_shard_config.get_instance(room_id2), "worker2"
  132. )
  133. user_id = self.register_user("user", "pass")
  134. access_token = self.login("user", "pass")
  135. store = self.hs.get_datastore()
  136. # Create two room on the different workers.
  137. self._create_room(room_id1, user_id, access_token)
  138. self._create_room(room_id2, user_id, access_token)
  139. # The other user joins
  140. self.helper.join(
  141. room=room_id1, user=self.other_user_id, tok=self.other_access_token
  142. )
  143. self.helper.join(
  144. room=room_id2, user=self.other_user_id, tok=self.other_access_token
  145. )
  146. # Do an initial sync so that we're up to date.
  147. channel = make_request(
  148. self.reactor, sync_hs_site, "GET", "/sync", access_token=access_token
  149. )
  150. next_batch = channel.json_body["next_batch"]
  151. # We now gut wrench into the events stream MultiWriterIdGenerator on
  152. # worker2 to mimic it getting stuck persisting an event. This ensures
  153. # that when we send an event on worker1 we end up in a state where
  154. # worker2 events stream position lags that on worker1, resulting in a
  155. # RoomStreamToken with a non-empty instance map component.
  156. #
  157. # Worker2's event stream position will not advance until we call
  158. # __aexit__ again.
  159. actx = worker_hs2.get_datastore()._stream_id_gen.get_next()
  160. self.get_success(actx.__aenter__())
  161. response = self.helper.send(room_id1, body="Hi!", tok=self.other_access_token)
  162. first_event_in_room1 = response["event_id"]
  163. # Assert that the current stream token has an instance map component, as
  164. # we are trying to test vector clock tokens.
  165. room_stream_token = store.get_room_max_token()
  166. self.assertNotEqual(len(room_stream_token.instance_map), 0)
  167. # Check that syncing still gets the new event, despite the gap in the
  168. # stream IDs.
  169. channel = make_request(
  170. self.reactor,
  171. sync_hs_site,
  172. "GET",
  173. "/sync?since={}".format(next_batch),
  174. access_token=access_token,
  175. )
  176. # We should only see the new event and nothing else
  177. self.assertIn(room_id1, channel.json_body["rooms"]["join"])
  178. self.assertNotIn(room_id2, channel.json_body["rooms"]["join"])
  179. events = channel.json_body["rooms"]["join"][room_id1]["timeline"]["events"]
  180. self.assertListEqual(
  181. [first_event_in_room1], [event["event_id"] for event in events]
  182. )
  183. # Get the next batch and makes sure its a vector clock style token.
  184. vector_clock_token = channel.json_body["next_batch"]
  185. self.assertTrue(vector_clock_token.startswith("m"))
  186. # Now that we've got a vector clock token we finish the fake persisting
  187. # an event we started above.
  188. self.get_success(actx.__aexit__(None, None, None))
  189. # Now try and send an event to the other rooom so that we can test that
  190. # the vector clock style token works as a `since` token.
  191. response = self.helper.send(room_id2, body="Hi!", tok=self.other_access_token)
  192. first_event_in_room2 = response["event_id"]
  193. channel = make_request(
  194. self.reactor,
  195. sync_hs_site,
  196. "GET",
  197. "/sync?since={}".format(vector_clock_token),
  198. access_token=access_token,
  199. )
  200. self.assertNotIn(room_id1, channel.json_body["rooms"]["join"])
  201. self.assertIn(room_id2, channel.json_body["rooms"]["join"])
  202. events = channel.json_body["rooms"]["join"][room_id2]["timeline"]["events"]
  203. self.assertListEqual(
  204. [first_event_in_room2], [event["event_id"] for event in events]
  205. )
  206. next_batch = channel.json_body["next_batch"]
  207. # We also want to test that the vector clock style token works with
  208. # pagination. We do this by sending a couple of new events into the room
  209. # and syncing again to get a prev_batch token for each room, then
  210. # paginating from there back to the vector clock token.
  211. self.helper.send(room_id1, body="Hi again!", tok=self.other_access_token)
  212. self.helper.send(room_id2, body="Hi again!", tok=self.other_access_token)
  213. channel = make_request(
  214. self.reactor,
  215. sync_hs_site,
  216. "GET",
  217. "/sync?since={}".format(next_batch),
  218. access_token=access_token,
  219. )
  220. prev_batch1 = channel.json_body["rooms"]["join"][room_id1]["timeline"][
  221. "prev_batch"
  222. ]
  223. prev_batch2 = channel.json_body["rooms"]["join"][room_id2]["timeline"][
  224. "prev_batch"
  225. ]
  226. # Paginating back in the first room should not produce any results, as
  227. # no events have happened in it. This tests that we are correctly
  228. # filtering results based on the vector clock portion.
  229. channel = make_request(
  230. self.reactor,
  231. sync_hs_site,
  232. "GET",
  233. "/rooms/{}/messages?from={}&to={}&dir=b".format(
  234. room_id1, prev_batch1, vector_clock_token
  235. ),
  236. access_token=access_token,
  237. )
  238. self.assertListEqual([], channel.json_body["chunk"])
  239. # Paginating back on the second room should produce the first event
  240. # again. This tests that pagination isn't completely broken.
  241. channel = make_request(
  242. self.reactor,
  243. sync_hs_site,
  244. "GET",
  245. "/rooms/{}/messages?from={}&to={}&dir=b".format(
  246. room_id2, prev_batch2, vector_clock_token
  247. ),
  248. access_token=access_token,
  249. )
  250. self.assertEqual(len(channel.json_body["chunk"]), 1)
  251. self.assertEqual(
  252. channel.json_body["chunk"][0]["event_id"], first_event_in_room2
  253. )
  254. # Paginating forwards should give the same results
  255. channel = make_request(
  256. self.reactor,
  257. sync_hs_site,
  258. "GET",
  259. "/rooms/{}/messages?from={}&to={}&dir=f".format(
  260. room_id1, vector_clock_token, prev_batch1
  261. ),
  262. access_token=access_token,
  263. )
  264. self.assertListEqual([], channel.json_body["chunk"])
  265. channel = make_request(
  266. self.reactor,
  267. sync_hs_site,
  268. "GET",
  269. "/rooms/{}/messages?from={}&to={}&dir=f".format(
  270. room_id2,
  271. vector_clock_token,
  272. prev_batch2,
  273. ),
  274. access_token=access_token,
  275. )
  276. self.assertEqual(len(channel.json_body["chunk"]), 1)
  277. self.assertEqual(
  278. channel.json_body["chunk"][0]["event_id"], first_event_in_room2
  279. )