test_events.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485
  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import List, Optional
  15. from synapse.api.constants import EventTypes, Membership
  16. from synapse.events import EventBase
  17. from synapse.replication.tcp.commands import RdataCommand
  18. from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
  19. from synapse.replication.tcp.streams.events import (
  20. EventsStreamCurrentStateRow,
  21. EventsStreamEventRow,
  22. EventsStreamRow,
  23. )
  24. from synapse.rest import admin
  25. from synapse.rest.client import login, room
  26. from tests.replication._base import BaseStreamTestCase
  27. from tests.test_utils.event_injection import inject_event, inject_member_event
  28. class EventsStreamTestCase(BaseStreamTestCase):
  29. servlets = [
  30. admin.register_servlets,
  31. login.register_servlets,
  32. room.register_servlets,
  33. ]
  34. def prepare(self, reactor, clock, hs):
  35. super().prepare(reactor, clock, hs)
  36. self.user_id = self.register_user("u1", "pass")
  37. self.user_tok = self.login("u1", "pass")
  38. self.reconnect()
  39. self.room_id = self.helper.create_room_as(tok=self.user_tok)
  40. self.test_handler.received_rdata_rows.clear()
  41. def test_update_function_event_row_limit(self):
  42. """Test replication with many non-state events
  43. Checks that all events are correctly replicated when there are lots of
  44. event rows to be replicated.
  45. """
  46. # disconnect, so that we can stack up some changes
  47. self.disconnect()
  48. # generate lots of non-state events. We inject them using inject_event
  49. # so that they are not send out over replication until we call self.replicate().
  50. events = [
  51. self._inject_test_event()
  52. for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 1)
  53. ]
  54. # also one state event
  55. state_event = self._inject_state_event()
  56. # check we're testing what we think we are: no rows should yet have been
  57. # received
  58. self.assertEqual([], self.test_handler.received_rdata_rows)
  59. # now reconnect to pull the updates
  60. self.reconnect()
  61. self.replicate()
  62. # we should have received all the expected rows in the right order (as
  63. # well as various cache invalidation updates which we ignore)
  64. received_rows = [
  65. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  66. ]
  67. for event in events:
  68. stream_name, token, row = received_rows.pop(0)
  69. self.assertEqual("events", stream_name)
  70. self.assertIsInstance(row, EventsStreamRow)
  71. self.assertEqual(row.type, "ev")
  72. self.assertIsInstance(row.data, EventsStreamEventRow)
  73. self.assertEqual(row.data.event_id, event.event_id)
  74. stream_name, token, row = received_rows.pop(0)
  75. self.assertIsInstance(row, EventsStreamRow)
  76. self.assertIsInstance(row.data, EventsStreamEventRow)
  77. self.assertEqual(row.data.event_id, state_event.event_id)
  78. stream_name, token, row = received_rows.pop(0)
  79. self.assertEqual("events", stream_name)
  80. self.assertIsInstance(row, EventsStreamRow)
  81. self.assertEqual(row.type, "state")
  82. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  83. self.assertEqual(row.data.event_id, state_event.event_id)
  84. self.assertEqual([], received_rows)
  85. def test_update_function_huge_state_change(self):
  86. """Test replication with many state events
  87. Ensures that all events are correctly replicated when there are lots of
  88. state change rows to be replicated.
  89. """
  90. # we want to generate lots of state changes at a single stream ID.
  91. #
  92. # We do this by having two branches in the DAG. On one, we have a moderator
  93. # which that generates lots of state; on the other, we de-op the moderator,
  94. # thus invalidating all the state.
  95. OTHER_USER = "@other_user:localhost"
  96. # have the user join
  97. self.get_success(
  98. inject_member_event(self.hs, self.room_id, OTHER_USER, Membership.JOIN)
  99. )
  100. # Update existing power levels with mod at PL50
  101. pls = self.helper.get_state(
  102. self.room_id, EventTypes.PowerLevels, tok=self.user_tok
  103. )
  104. pls["users"][OTHER_USER] = 50
  105. self.helper.send_state(
  106. self.room_id,
  107. EventTypes.PowerLevels,
  108. pls,
  109. tok=self.user_tok,
  110. )
  111. # this is the point in the DAG where we make a fork
  112. fork_point: List[str] = self.get_success(
  113. self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id)
  114. )
  115. events = [
  116. self._inject_state_event(sender=OTHER_USER)
  117. for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT)
  118. ]
  119. self.replicate()
  120. # all those events and state changes should have landed
  121. self.assertGreaterEqual(
  122. len(self.test_handler.received_rdata_rows), 2 * len(events)
  123. )
  124. # disconnect, so that we can stack up the changes
  125. self.disconnect()
  126. self.test_handler.received_rdata_rows.clear()
  127. # a state event which doesn't get rolled back, to check that the state
  128. # before the huge update comes through ok
  129. state1 = self._inject_state_event()
  130. # roll back all the state by de-modding the user
  131. prev_events = fork_point
  132. pls["users"][OTHER_USER] = 0
  133. pl_event = self.get_success(
  134. inject_event(
  135. self.hs,
  136. prev_event_ids=prev_events,
  137. type=EventTypes.PowerLevels,
  138. state_key="",
  139. sender=self.user_id,
  140. room_id=self.room_id,
  141. content=pls,
  142. )
  143. )
  144. # one more bit of state that doesn't get rolled back
  145. state2 = self._inject_state_event()
  146. # check we're testing what we think we are: no rows should yet have been
  147. # received
  148. self.assertEqual([], self.test_handler.received_rdata_rows)
  149. # now reconnect to pull the updates
  150. self.reconnect()
  151. self.replicate()
  152. # we should have received all the expected rows in the right order (as
  153. # well as various cache invalidation updates which we ignore)
  154. #
  155. # we expect:
  156. #
  157. # - two rows for state1
  158. # - the PL event row, plus state rows for the PL event and each
  159. # of the states that got reverted.
  160. # - two rows for state2
  161. received_rows = [
  162. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  163. ]
  164. # first check the first two rows, which should be state1
  165. stream_name, token, row = received_rows.pop(0)
  166. self.assertEqual("events", stream_name)
  167. self.assertIsInstance(row, EventsStreamRow)
  168. self.assertEqual(row.type, "ev")
  169. self.assertIsInstance(row.data, EventsStreamEventRow)
  170. self.assertEqual(row.data.event_id, state1.event_id)
  171. stream_name, token, row = received_rows.pop(0)
  172. self.assertIsInstance(row, EventsStreamRow)
  173. self.assertEqual(row.type, "state")
  174. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  175. self.assertEqual(row.data.event_id, state1.event_id)
  176. # now the last two rows, which should be state2
  177. stream_name, token, row = received_rows.pop(-2)
  178. self.assertEqual("events", stream_name)
  179. self.assertIsInstance(row, EventsStreamRow)
  180. self.assertEqual(row.type, "ev")
  181. self.assertIsInstance(row.data, EventsStreamEventRow)
  182. self.assertEqual(row.data.event_id, state2.event_id)
  183. stream_name, token, row = received_rows.pop(-1)
  184. self.assertIsInstance(row, EventsStreamRow)
  185. self.assertEqual(row.type, "state")
  186. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  187. self.assertEqual(row.data.event_id, state2.event_id)
  188. # that should leave us with the rows for the PL event
  189. self.assertEqual(len(received_rows), len(events) + 2)
  190. stream_name, token, row = received_rows.pop(0)
  191. self.assertEqual("events", stream_name)
  192. self.assertIsInstance(row, EventsStreamRow)
  193. self.assertEqual(row.type, "ev")
  194. self.assertIsInstance(row.data, EventsStreamEventRow)
  195. self.assertEqual(row.data.event_id, pl_event.event_id)
  196. # the state rows are unsorted
  197. state_rows: List[EventsStreamCurrentStateRow] = []
  198. for stream_name, _, row in received_rows:
  199. self.assertEqual("events", stream_name)
  200. self.assertIsInstance(row, EventsStreamRow)
  201. self.assertEqual(row.type, "state")
  202. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  203. state_rows.append(row.data)
  204. state_rows.sort(key=lambda r: r.state_key)
  205. sr = state_rows.pop(0)
  206. self.assertEqual(sr.type, EventTypes.PowerLevels)
  207. self.assertEqual(sr.event_id, pl_event.event_id)
  208. for sr in state_rows:
  209. self.assertEqual(sr.type, "test_state_event")
  210. # "None" indicates the state has been deleted
  211. self.assertIsNone(sr.event_id)
  212. def test_update_function_state_row_limit(self):
  213. """Test replication with many state events over several stream ids."""
  214. # we want to generate lots of state changes, but for this test, we want to
  215. # spread out the state changes over a few stream IDs.
  216. #
  217. # We do this by having two branches in the DAG. On one, we have four moderators,
  218. # each of which that generates lots of state; on the other, we de-op the users,
  219. # thus invalidating all the state.
  220. NUM_USERS = 4
  221. STATES_PER_USER = _STREAM_UPDATE_TARGET_ROW_COUNT // 4 + 1
  222. user_ids = ["@user%i:localhost" % (i,) for i in range(NUM_USERS)]
  223. # have the users join
  224. for u in user_ids:
  225. self.get_success(
  226. inject_member_event(self.hs, self.room_id, u, Membership.JOIN)
  227. )
  228. # Update existing power levels with mod at PL50
  229. pls = self.helper.get_state(
  230. self.room_id, EventTypes.PowerLevels, tok=self.user_tok
  231. )
  232. pls["users"].update({u: 50 for u in user_ids})
  233. self.helper.send_state(
  234. self.room_id,
  235. EventTypes.PowerLevels,
  236. pls,
  237. tok=self.user_tok,
  238. )
  239. # this is the point in the DAG where we make a fork
  240. fork_point: List[str] = self.get_success(
  241. self.hs.get_datastores().main.get_latest_event_ids_in_room(self.room_id)
  242. )
  243. events: List[EventBase] = []
  244. for user in user_ids:
  245. events.extend(
  246. self._inject_state_event(sender=user) for _ in range(STATES_PER_USER)
  247. )
  248. self.replicate()
  249. # all those events and state changes should have landed
  250. self.assertGreaterEqual(
  251. len(self.test_handler.received_rdata_rows), 2 * len(events)
  252. )
  253. # disconnect, so that we can stack up the changes
  254. self.disconnect()
  255. self.test_handler.received_rdata_rows.clear()
  256. # now roll back all that state by de-modding the users
  257. prev_events = fork_point
  258. pl_events = []
  259. for u in user_ids:
  260. pls["users"][u] = 0
  261. e = self.get_success(
  262. inject_event(
  263. self.hs,
  264. prev_event_ids=prev_events,
  265. type=EventTypes.PowerLevels,
  266. state_key="",
  267. sender=self.user_id,
  268. room_id=self.room_id,
  269. content=pls,
  270. )
  271. )
  272. prev_events = [e.event_id]
  273. pl_events.append(e)
  274. # check we're testing what we think we are: no rows should yet have been
  275. # received
  276. self.assertEqual([], self.test_handler.received_rdata_rows)
  277. # now reconnect to pull the updates
  278. self.reconnect()
  279. self.replicate()
  280. # we should have received all the expected rows in the right order (as
  281. # well as various cache invalidation updates which we ignore)
  282. received_rows = [
  283. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  284. ]
  285. self.assertGreaterEqual(len(received_rows), len(events))
  286. for i in range(NUM_USERS):
  287. # for each user, we expect the PL event row, followed by state rows for
  288. # the PL event and each of the states that got reverted.
  289. stream_name, token, row = received_rows.pop(0)
  290. self.assertEqual("events", stream_name)
  291. self.assertIsInstance(row, EventsStreamRow)
  292. self.assertEqual(row.type, "ev")
  293. self.assertIsInstance(row.data, EventsStreamEventRow)
  294. self.assertEqual(row.data.event_id, pl_events[i].event_id)
  295. # the state rows are unsorted
  296. state_rows: List[EventsStreamCurrentStateRow] = []
  297. for _ in range(STATES_PER_USER + 1):
  298. stream_name, token, row = received_rows.pop(0)
  299. self.assertEqual("events", stream_name)
  300. self.assertIsInstance(row, EventsStreamRow)
  301. self.assertEqual(row.type, "state")
  302. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  303. state_rows.append(row.data)
  304. state_rows.sort(key=lambda r: r.state_key)
  305. sr = state_rows.pop(0)
  306. self.assertEqual(sr.type, EventTypes.PowerLevels)
  307. self.assertEqual(sr.event_id, pl_events[i].event_id)
  308. for sr in state_rows:
  309. self.assertEqual(sr.type, "test_state_event")
  310. # "None" indicates the state has been deleted
  311. self.assertIsNone(sr.event_id)
  312. self.assertEqual([], received_rows)
  313. def test_backwards_stream_id(self):
  314. """
  315. Test that RDATA that comes after the current position should be discarded.
  316. """
  317. # disconnect, so that we can stack up some changes
  318. self.disconnect()
  319. # Generate an events. We inject them using inject_event so that they are
  320. # not send out over replication until we call self.replicate().
  321. event = self._inject_test_event()
  322. # check we're testing what we think we are: no rows should yet have been
  323. # received
  324. self.assertEqual([], self.test_handler.received_rdata_rows)
  325. # now reconnect to pull the updates
  326. self.reconnect()
  327. self.replicate()
  328. # We should have received the expected single row (as well as various
  329. # cache invalidation updates which we ignore).
  330. received_rows = [
  331. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  332. ]
  333. # There should be a single received row.
  334. self.assertEqual(len(received_rows), 1)
  335. stream_name, token, row = received_rows[0]
  336. self.assertEqual("events", stream_name)
  337. self.assertIsInstance(row, EventsStreamRow)
  338. self.assertEqual(row.type, "ev")
  339. self.assertIsInstance(row.data, EventsStreamEventRow)
  340. self.assertEqual(row.data.event_id, event.event_id)
  341. # Reset the data.
  342. self.test_handler.received_rdata_rows = []
  343. # Save the current token for later.
  344. worker_events_stream = self.worker_hs.get_replication_streams()["events"]
  345. prev_token = worker_events_stream.current_token("master")
  346. # Manually send an old RDATA command, which should get dropped. This
  347. # re-uses the row from above, but with an earlier stream token.
  348. self.hs.get_replication_command_handler().send_command(
  349. RdataCommand("events", "master", 1, row)
  350. )
  351. # No updates have been received (because it was discard as old).
  352. received_rows = [
  353. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  354. ]
  355. self.assertEqual(len(received_rows), 0)
  356. # Ensure the stream has not gone backwards.
  357. current_token = worker_events_stream.current_token("master")
  358. self.assertGreaterEqual(current_token, prev_token)
  359. event_count = 0
  360. def _inject_test_event(
  361. self, body: Optional[str] = None, sender: Optional[str] = None, **kwargs
  362. ) -> EventBase:
  363. if sender is None:
  364. sender = self.user_id
  365. if body is None:
  366. body = "event %i" % (self.event_count,)
  367. self.event_count += 1
  368. return self.get_success(
  369. inject_event(
  370. self.hs,
  371. room_id=self.room_id,
  372. sender=sender,
  373. type="test_event",
  374. content={"body": body},
  375. **kwargs,
  376. )
  377. )
  378. def _inject_state_event(
  379. self,
  380. body: Optional[str] = None,
  381. state_key: Optional[str] = None,
  382. sender: Optional[str] = None,
  383. ) -> EventBase:
  384. if sender is None:
  385. sender = self.user_id
  386. if state_key is None:
  387. state_key = "state_%i" % (self.event_count,)
  388. self.event_count += 1
  389. if body is None:
  390. body = "state event %s" % (state_key,)
  391. return self.get_success(
  392. inject_event(
  393. self.hs,
  394. room_id=self.room_id,
  395. sender=sender,
  396. type="test_state_event",
  397. state_key=state_key,
  398. content={"body": body},
  399. )
  400. )