1
0

test_sharded_event_persister.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2020 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from mock import patch
  17. from synapse.api.room_versions import RoomVersion
  18. from synapse.rest import admin
  19. from synapse.rest.client.v1 import login, room
  20. from synapse.rest.client.v2_alpha import sync
  21. from tests.replication._base import BaseMultiWorkerStreamTestCase
  22. from tests.server import make_request
  23. from tests.utils import USE_POSTGRES_FOR_TESTS
  24. logger = logging.getLogger(__name__)
  25. class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
  26. """Checks event persisting sharding works
  27. """
  28. # Event persister sharding requires postgres (due to needing
  29. # `MutliWriterIdGenerator`).
  30. if not USE_POSTGRES_FOR_TESTS:
  31. skip = "Requires Postgres"
  32. servlets = [
  33. admin.register_servlets_for_client_rest_resource,
  34. room.register_servlets,
  35. login.register_servlets,
  36. sync.register_servlets,
  37. ]
  38. def prepare(self, reactor, clock, hs):
  39. # Register a user who sends a message that we'll get notified about
  40. self.other_user_id = self.register_user("otheruser", "pass")
  41. self.other_access_token = self.login("otheruser", "pass")
  42. self.room_creator = self.hs.get_room_creation_handler()
  43. self.store = hs.get_datastore()
  44. def default_config(self):
  45. conf = super().default_config()
  46. conf["redis"] = {"enabled": "true"}
  47. conf["stream_writers"] = {"events": ["worker1", "worker2"]}
  48. conf["instance_map"] = {
  49. "worker1": {"host": "testserv", "port": 1001},
  50. "worker2": {"host": "testserv", "port": 1002},
  51. }
  52. return conf
  53. def _create_room(self, room_id: str, user_id: str, tok: str):
  54. """Create a room with given room_id
  55. """
  56. # We control the room ID generation by patching out the
  57. # `_generate_room_id` method
  58. async def generate_room(
  59. creator_id: str, is_public: bool, room_version: RoomVersion
  60. ):
  61. await self.store.store_room(
  62. room_id=room_id,
  63. room_creator_user_id=creator_id,
  64. is_public=is_public,
  65. room_version=room_version,
  66. )
  67. return room_id
  68. with patch(
  69. "synapse.handlers.room.RoomCreationHandler._generate_room_id"
  70. ) as mock:
  71. mock.side_effect = generate_room
  72. self.helper.create_room_as(user_id, tok=tok)
  73. def test_basic(self):
  74. """Simple test to ensure that multiple rooms can be created and joined,
  75. and that different rooms get handled by different instances.
  76. """
  77. self.make_worker_hs(
  78. "synapse.app.generic_worker", {"worker_name": "worker1"},
  79. )
  80. self.make_worker_hs(
  81. "synapse.app.generic_worker", {"worker_name": "worker2"},
  82. )
  83. persisted_on_1 = False
  84. persisted_on_2 = False
  85. store = self.hs.get_datastore()
  86. user_id = self.register_user("user", "pass")
  87. access_token = self.login("user", "pass")
  88. # Keep making new rooms until we see rooms being persisted on both
  89. # workers.
  90. for _ in range(10):
  91. # Create a room
  92. room = self.helper.create_room_as(user_id, tok=access_token)
  93. # The other user joins
  94. self.helper.join(
  95. room=room, user=self.other_user_id, tok=self.other_access_token
  96. )
  97. # The other user sends some messages
  98. rseponse = self.helper.send(room, body="Hi!", tok=self.other_access_token)
  99. event_id = rseponse["event_id"]
  100. # The event position includes which instance persisted the event.
  101. pos = self.get_success(store.get_position_for_event(event_id))
  102. persisted_on_1 |= pos.instance_name == "worker1"
  103. persisted_on_2 |= pos.instance_name == "worker2"
  104. if persisted_on_1 and persisted_on_2:
  105. break
  106. self.assertTrue(persisted_on_1)
  107. self.assertTrue(persisted_on_2)
  108. def test_vector_clock_token(self):
  109. """Tests that using a stream token with a vector clock component works
  110. correctly with basic /sync and /messages usage.
  111. """
  112. self.make_worker_hs(
  113. "synapse.app.generic_worker", {"worker_name": "worker1"},
  114. )
  115. worker_hs2 = self.make_worker_hs(
  116. "synapse.app.generic_worker", {"worker_name": "worker2"},
  117. )
  118. sync_hs = self.make_worker_hs(
  119. "synapse.app.generic_worker", {"worker_name": "sync"},
  120. )
  121. sync_hs_site = self._hs_to_site[sync_hs]
  122. # Specially selected room IDs that get persisted on different workers.
  123. room_id1 = "!foo:test"
  124. room_id2 = "!baz:test"
  125. self.assertEqual(
  126. self.hs.config.worker.events_shard_config.get_instance(room_id1), "worker1"
  127. )
  128. self.assertEqual(
  129. self.hs.config.worker.events_shard_config.get_instance(room_id2), "worker2"
  130. )
  131. user_id = self.register_user("user", "pass")
  132. access_token = self.login("user", "pass")
  133. store = self.hs.get_datastore()
  134. # Create two room on the different workers.
  135. self._create_room(room_id1, user_id, access_token)
  136. self._create_room(room_id2, user_id, access_token)
  137. # The other user joins
  138. self.helper.join(
  139. room=room_id1, user=self.other_user_id, tok=self.other_access_token
  140. )
  141. self.helper.join(
  142. room=room_id2, user=self.other_user_id, tok=self.other_access_token
  143. )
  144. # Do an initial sync so that we're up to date.
  145. channel = make_request(
  146. self.reactor, sync_hs_site, "GET", "/sync", access_token=access_token
  147. )
  148. next_batch = channel.json_body["next_batch"]
  149. # We now gut wrench into the events stream MultiWriterIdGenerator on
  150. # worker2 to mimic it getting stuck persisting an event. This ensures
  151. # that when we send an event on worker1 we end up in a state where
  152. # worker2 events stream position lags that on worker1, resulting in a
  153. # RoomStreamToken with a non-empty instance map component.
  154. #
  155. # Worker2's event stream position will not advance until we call
  156. # __aexit__ again.
  157. actx = worker_hs2.get_datastore()._stream_id_gen.get_next()
  158. self.get_success(actx.__aenter__())
  159. response = self.helper.send(room_id1, body="Hi!", tok=self.other_access_token)
  160. first_event_in_room1 = response["event_id"]
  161. # Assert that the current stream token has an instance map component, as
  162. # we are trying to test vector clock tokens.
  163. room_stream_token = store.get_room_max_token()
  164. self.assertNotEqual(len(room_stream_token.instance_map), 0)
  165. # Check that syncing still gets the new event, despite the gap in the
  166. # stream IDs.
  167. channel = make_request(
  168. self.reactor,
  169. sync_hs_site,
  170. "GET",
  171. "/sync?since={}".format(next_batch),
  172. access_token=access_token,
  173. )
  174. # We should only see the new event and nothing else
  175. self.assertIn(room_id1, channel.json_body["rooms"]["join"])
  176. self.assertNotIn(room_id2, channel.json_body["rooms"]["join"])
  177. events = channel.json_body["rooms"]["join"][room_id1]["timeline"]["events"]
  178. self.assertListEqual(
  179. [first_event_in_room1], [event["event_id"] for event in events]
  180. )
  181. # Get the next batch and makes sure its a vector clock style token.
  182. vector_clock_token = channel.json_body["next_batch"]
  183. self.assertTrue(vector_clock_token.startswith("m"))
  184. # Now that we've got a vector clock token we finish the fake persisting
  185. # an event we started above.
  186. self.get_success(actx.__aexit__(None, None, None))
  187. # Now try and send an event to the other rooom so that we can test that
  188. # the vector clock style token works as a `since` token.
  189. response = self.helper.send(room_id2, body="Hi!", tok=self.other_access_token)
  190. first_event_in_room2 = response["event_id"]
  191. channel = make_request(
  192. self.reactor,
  193. sync_hs_site,
  194. "GET",
  195. "/sync?since={}".format(vector_clock_token),
  196. access_token=access_token,
  197. )
  198. self.assertNotIn(room_id1, channel.json_body["rooms"]["join"])
  199. self.assertIn(room_id2, channel.json_body["rooms"]["join"])
  200. events = channel.json_body["rooms"]["join"][room_id2]["timeline"]["events"]
  201. self.assertListEqual(
  202. [first_event_in_room2], [event["event_id"] for event in events]
  203. )
  204. next_batch = channel.json_body["next_batch"]
  205. # We also want to test that the vector clock style token works with
  206. # pagination. We do this by sending a couple of new events into the room
  207. # and syncing again to get a prev_batch token for each room, then
  208. # paginating from there back to the vector clock token.
  209. self.helper.send(room_id1, body="Hi again!", tok=self.other_access_token)
  210. self.helper.send(room_id2, body="Hi again!", tok=self.other_access_token)
  211. channel = make_request(
  212. self.reactor,
  213. sync_hs_site,
  214. "GET",
  215. "/sync?since={}".format(next_batch),
  216. access_token=access_token,
  217. )
  218. prev_batch1 = channel.json_body["rooms"]["join"][room_id1]["timeline"][
  219. "prev_batch"
  220. ]
  221. prev_batch2 = channel.json_body["rooms"]["join"][room_id2]["timeline"][
  222. "prev_batch"
  223. ]
  224. # Paginating back in the first room should not produce any results, as
  225. # no events have happened in it. This tests that we are correctly
  226. # filtering results based on the vector clock portion.
  227. channel = make_request(
  228. self.reactor,
  229. sync_hs_site,
  230. "GET",
  231. "/rooms/{}/messages?from={}&to={}&dir=b".format(
  232. room_id1, prev_batch1, vector_clock_token
  233. ),
  234. access_token=access_token,
  235. )
  236. self.assertListEqual([], channel.json_body["chunk"])
  237. # Paginating back on the second room should produce the first event
  238. # again. This tests that pagination isn't completely broken.
  239. channel = make_request(
  240. self.reactor,
  241. sync_hs_site,
  242. "GET",
  243. "/rooms/{}/messages?from={}&to={}&dir=b".format(
  244. room_id2, prev_batch2, vector_clock_token
  245. ),
  246. access_token=access_token,
  247. )
  248. self.assertEqual(len(channel.json_body["chunk"]), 1)
  249. self.assertEqual(
  250. channel.json_body["chunk"][0]["event_id"], first_event_in_room2
  251. )
  252. # Paginating forwards should give the same results
  253. channel = make_request(
  254. self.reactor,
  255. sync_hs_site,
  256. "GET",
  257. "/rooms/{}/messages?from={}&to={}&dir=f".format(
  258. room_id1, vector_clock_token, prev_batch1
  259. ),
  260. access_token=access_token,
  261. )
  262. self.assertListEqual([], channel.json_body["chunk"])
  263. channel = make_request(
  264. self.reactor,
  265. sync_hs_site,
  266. "GET",
  267. "/rooms/{}/messages?from={}&to={}&dir=f".format(
  268. room_id2, vector_clock_token, prev_batch2,
  269. ),
  270. access_token=access_token,
  271. )
  272. self.assertEqual(len(channel.json_body["chunk"]), 1)
  273. self.assertEqual(
  274. channel.json_body["chunk"][0]["event_id"], first_event_in_room2
  275. )