test_events.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2019 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from typing import List, Optional
  16. from synapse.api.constants import EventTypes, Membership
  17. from synapse.events import EventBase
  18. from synapse.replication.tcp.commands import RdataCommand
  19. from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
  20. from synapse.replication.tcp.streams.events import (
  21. EventsStreamCurrentStateRow,
  22. EventsStreamEventRow,
  23. EventsStreamRow,
  24. )
  25. from synapse.rest import admin
  26. from synapse.rest.client.v1 import login, room
  27. from tests.replication._base import BaseStreamTestCase
  28. from tests.test_utils.event_injection import inject_event, inject_member_event
  29. class EventsStreamTestCase(BaseStreamTestCase):
  30. servlets = [
  31. admin.register_servlets,
  32. login.register_servlets,
  33. room.register_servlets,
  34. ]
  35. def prepare(self, reactor, clock, hs):
  36. super().prepare(reactor, clock, hs)
  37. self.user_id = self.register_user("u1", "pass")
  38. self.user_tok = self.login("u1", "pass")
  39. self.reconnect()
  40. self.room_id = self.helper.create_room_as(tok=self.user_tok)
  41. self.test_handler.received_rdata_rows.clear()
  42. def test_update_function_event_row_limit(self):
  43. """Test replication with many non-state events
  44. Checks that all events are correctly replicated when there are lots of
  45. event rows to be replicated.
  46. """
  47. # disconnect, so that we can stack up some changes
  48. self.disconnect()
  49. # generate lots of non-state events. We inject them using inject_event
  50. # so that they are not send out over replication until we call self.replicate().
  51. events = [
  52. self._inject_test_event()
  53. for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 1)
  54. ]
  55. # also one state event
  56. state_event = self._inject_state_event()
  57. # check we're testing what we think we are: no rows should yet have been
  58. # received
  59. self.assertEqual([], self.test_handler.received_rdata_rows)
  60. # now reconnect to pull the updates
  61. self.reconnect()
  62. self.replicate()
  63. # we should have received all the expected rows in the right order (as
  64. # well as various cache invalidation updates which we ignore)
  65. received_rows = [
  66. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  67. ]
  68. for event in events:
  69. stream_name, token, row = received_rows.pop(0)
  70. self.assertEqual("events", stream_name)
  71. self.assertIsInstance(row, EventsStreamRow)
  72. self.assertEqual(row.type, "ev")
  73. self.assertIsInstance(row.data, EventsStreamEventRow)
  74. self.assertEqual(row.data.event_id, event.event_id)
  75. stream_name, token, row = received_rows.pop(0)
  76. self.assertIsInstance(row, EventsStreamRow)
  77. self.assertIsInstance(row.data, EventsStreamEventRow)
  78. self.assertEqual(row.data.event_id, state_event.event_id)
  79. stream_name, token, row = received_rows.pop(0)
  80. self.assertEqual("events", stream_name)
  81. self.assertIsInstance(row, EventsStreamRow)
  82. self.assertEqual(row.type, "state")
  83. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  84. self.assertEqual(row.data.event_id, state_event.event_id)
  85. self.assertEqual([], received_rows)
  86. def test_update_function_huge_state_change(self):
  87. """Test replication with many state events
  88. Ensures that all events are correctly replicated when there are lots of
  89. state change rows to be replicated.
  90. """
  91. # we want to generate lots of state changes at a single stream ID.
  92. #
  93. # We do this by having two branches in the DAG. On one, we have a moderator
  94. # which that generates lots of state; on the other, we de-op the moderator,
  95. # thus invalidating all the state.
  96. OTHER_USER = "@other_user:localhost"
  97. # have the user join
  98. self.get_success(
  99. inject_member_event(self.hs, self.room_id, OTHER_USER, Membership.JOIN)
  100. )
  101. # Update existing power levels with mod at PL50
  102. pls = self.helper.get_state(
  103. self.room_id, EventTypes.PowerLevels, tok=self.user_tok
  104. )
  105. pls["users"][OTHER_USER] = 50
  106. self.helper.send_state(
  107. self.room_id,
  108. EventTypes.PowerLevels,
  109. pls,
  110. tok=self.user_tok,
  111. )
  112. # this is the point in the DAG where we make a fork
  113. fork_point = self.get_success(
  114. self.hs.get_datastore().get_latest_event_ids_in_room(self.room_id)
  115. ) # type: List[str]
  116. events = [
  117. self._inject_state_event(sender=OTHER_USER)
  118. for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT)
  119. ]
  120. self.replicate()
  121. # all those events and state changes should have landed
  122. self.assertGreaterEqual(
  123. len(self.test_handler.received_rdata_rows), 2 * len(events)
  124. )
  125. # disconnect, so that we can stack up the changes
  126. self.disconnect()
  127. self.test_handler.received_rdata_rows.clear()
  128. # a state event which doesn't get rolled back, to check that the state
  129. # before the huge update comes through ok
  130. state1 = self._inject_state_event()
  131. # roll back all the state by de-modding the user
  132. prev_events = fork_point
  133. pls["users"][OTHER_USER] = 0
  134. pl_event = self.get_success(
  135. inject_event(
  136. self.hs,
  137. prev_event_ids=prev_events,
  138. type=EventTypes.PowerLevels,
  139. state_key="",
  140. sender=self.user_id,
  141. room_id=self.room_id,
  142. content=pls,
  143. )
  144. )
  145. # one more bit of state that doesn't get rolled back
  146. state2 = self._inject_state_event()
  147. # check we're testing what we think we are: no rows should yet have been
  148. # received
  149. self.assertEqual([], self.test_handler.received_rdata_rows)
  150. # now reconnect to pull the updates
  151. self.reconnect()
  152. self.replicate()
  153. # we should have received all the expected rows in the right order (as
  154. # well as various cache invalidation updates which we ignore)
  155. #
  156. # we expect:
  157. #
  158. # - two rows for state1
  159. # - the PL event row, plus state rows for the PL event and each
  160. # of the states that got reverted.
  161. # - two rows for state2
  162. received_rows = [
  163. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  164. ]
  165. # first check the first two rows, which should be state1
  166. stream_name, token, row = received_rows.pop(0)
  167. self.assertEqual("events", stream_name)
  168. self.assertIsInstance(row, EventsStreamRow)
  169. self.assertEqual(row.type, "ev")
  170. self.assertIsInstance(row.data, EventsStreamEventRow)
  171. self.assertEqual(row.data.event_id, state1.event_id)
  172. stream_name, token, row = received_rows.pop(0)
  173. self.assertIsInstance(row, EventsStreamRow)
  174. self.assertEqual(row.type, "state")
  175. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  176. self.assertEqual(row.data.event_id, state1.event_id)
  177. # now the last two rows, which should be state2
  178. stream_name, token, row = received_rows.pop(-2)
  179. self.assertEqual("events", stream_name)
  180. self.assertIsInstance(row, EventsStreamRow)
  181. self.assertEqual(row.type, "ev")
  182. self.assertIsInstance(row.data, EventsStreamEventRow)
  183. self.assertEqual(row.data.event_id, state2.event_id)
  184. stream_name, token, row = received_rows.pop(-1)
  185. self.assertIsInstance(row, EventsStreamRow)
  186. self.assertEqual(row.type, "state")
  187. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  188. self.assertEqual(row.data.event_id, state2.event_id)
  189. # that should leave us with the rows for the PL event
  190. self.assertEqual(len(received_rows), len(events) + 2)
  191. stream_name, token, row = received_rows.pop(0)
  192. self.assertEqual("events", stream_name)
  193. self.assertIsInstance(row, EventsStreamRow)
  194. self.assertEqual(row.type, "ev")
  195. self.assertIsInstance(row.data, EventsStreamEventRow)
  196. self.assertEqual(row.data.event_id, pl_event.event_id)
  197. # the state rows are unsorted
  198. state_rows = [] # type: List[EventsStreamCurrentStateRow]
  199. for stream_name, token, row in received_rows:
  200. self.assertEqual("events", stream_name)
  201. self.assertIsInstance(row, EventsStreamRow)
  202. self.assertEqual(row.type, "state")
  203. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  204. state_rows.append(row.data)
  205. state_rows.sort(key=lambda r: r.state_key)
  206. sr = state_rows.pop(0)
  207. self.assertEqual(sr.type, EventTypes.PowerLevels)
  208. self.assertEqual(sr.event_id, pl_event.event_id)
  209. for sr in state_rows:
  210. self.assertEqual(sr.type, "test_state_event")
  211. # "None" indicates the state has been deleted
  212. self.assertIsNone(sr.event_id)
  213. def test_update_function_state_row_limit(self):
  214. """Test replication with many state events over several stream ids."""
  215. # we want to generate lots of state changes, but for this test, we want to
  216. # spread out the state changes over a few stream IDs.
  217. #
  218. # We do this by having two branches in the DAG. On one, we have four moderators,
  219. # each of which that generates lots of state; on the other, we de-op the users,
  220. # thus invalidating all the state.
  221. NUM_USERS = 4
  222. STATES_PER_USER = _STREAM_UPDATE_TARGET_ROW_COUNT // 4 + 1
  223. user_ids = ["@user%i:localhost" % (i,) for i in range(NUM_USERS)]
  224. # have the users join
  225. for u in user_ids:
  226. self.get_success(
  227. inject_member_event(self.hs, self.room_id, u, Membership.JOIN)
  228. )
  229. # Update existing power levels with mod at PL50
  230. pls = self.helper.get_state(
  231. self.room_id, EventTypes.PowerLevels, tok=self.user_tok
  232. )
  233. pls["users"].update({u: 50 for u in user_ids})
  234. self.helper.send_state(
  235. self.room_id,
  236. EventTypes.PowerLevels,
  237. pls,
  238. tok=self.user_tok,
  239. )
  240. # this is the point in the DAG where we make a fork
  241. fork_point = self.get_success(
  242. self.hs.get_datastore().get_latest_event_ids_in_room(self.room_id)
  243. ) # type: List[str]
  244. events = [] # type: List[EventBase]
  245. for user in user_ids:
  246. events.extend(
  247. self._inject_state_event(sender=user) for _ in range(STATES_PER_USER)
  248. )
  249. self.replicate()
  250. # all those events and state changes should have landed
  251. self.assertGreaterEqual(
  252. len(self.test_handler.received_rdata_rows), 2 * len(events)
  253. )
  254. # disconnect, so that we can stack up the changes
  255. self.disconnect()
  256. self.test_handler.received_rdata_rows.clear()
  257. # now roll back all that state by de-modding the users
  258. prev_events = fork_point
  259. pl_events = []
  260. for u in user_ids:
  261. pls["users"][u] = 0
  262. e = self.get_success(
  263. inject_event(
  264. self.hs,
  265. prev_event_ids=prev_events,
  266. type=EventTypes.PowerLevels,
  267. state_key="",
  268. sender=self.user_id,
  269. room_id=self.room_id,
  270. content=pls,
  271. )
  272. )
  273. prev_events = [e.event_id]
  274. pl_events.append(e)
  275. # check we're testing what we think we are: no rows should yet have been
  276. # received
  277. self.assertEqual([], self.test_handler.received_rdata_rows)
  278. # now reconnect to pull the updates
  279. self.reconnect()
  280. self.replicate()
  281. # we should have received all the expected rows in the right order (as
  282. # well as various cache invalidation updates which we ignore)
  283. received_rows = [
  284. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  285. ]
  286. self.assertGreaterEqual(len(received_rows), len(events))
  287. for i in range(NUM_USERS):
  288. # for each user, we expect the PL event row, followed by state rows for
  289. # the PL event and each of the states that got reverted.
  290. stream_name, token, row = received_rows.pop(0)
  291. self.assertEqual("events", stream_name)
  292. self.assertIsInstance(row, EventsStreamRow)
  293. self.assertEqual(row.type, "ev")
  294. self.assertIsInstance(row.data, EventsStreamEventRow)
  295. self.assertEqual(row.data.event_id, pl_events[i].event_id)
  296. # the state rows are unsorted
  297. state_rows = [] # type: List[EventsStreamCurrentStateRow]
  298. for j in range(STATES_PER_USER + 1):
  299. stream_name, token, row = received_rows.pop(0)
  300. self.assertEqual("events", stream_name)
  301. self.assertIsInstance(row, EventsStreamRow)
  302. self.assertEqual(row.type, "state")
  303. self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
  304. state_rows.append(row.data)
  305. state_rows.sort(key=lambda r: r.state_key)
  306. sr = state_rows.pop(0)
  307. self.assertEqual(sr.type, EventTypes.PowerLevels)
  308. self.assertEqual(sr.event_id, pl_events[i].event_id)
  309. for sr in state_rows:
  310. self.assertEqual(sr.type, "test_state_event")
  311. # "None" indicates the state has been deleted
  312. self.assertIsNone(sr.event_id)
  313. self.assertEqual([], received_rows)
  314. def test_backwards_stream_id(self):
  315. """
  316. Test that RDATA that comes after the current position should be discarded.
  317. """
  318. # disconnect, so that we can stack up some changes
  319. self.disconnect()
  320. # Generate an events. We inject them using inject_event so that they are
  321. # not send out over replication until we call self.replicate().
  322. event = self._inject_test_event()
  323. # check we're testing what we think we are: no rows should yet have been
  324. # received
  325. self.assertEqual([], self.test_handler.received_rdata_rows)
  326. # now reconnect to pull the updates
  327. self.reconnect()
  328. self.replicate()
  329. # We should have received the expected single row (as well as various
  330. # cache invalidation updates which we ignore).
  331. received_rows = [
  332. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  333. ]
  334. # There should be a single received row.
  335. self.assertEqual(len(received_rows), 1)
  336. stream_name, token, row = received_rows[0]
  337. self.assertEqual("events", stream_name)
  338. self.assertIsInstance(row, EventsStreamRow)
  339. self.assertEqual(row.type, "ev")
  340. self.assertIsInstance(row.data, EventsStreamEventRow)
  341. self.assertEqual(row.data.event_id, event.event_id)
  342. # Reset the data.
  343. self.test_handler.received_rdata_rows = []
  344. # Save the current token for later.
  345. worker_events_stream = self.worker_hs.get_replication_streams()["events"]
  346. prev_token = worker_events_stream.current_token("master")
  347. # Manually send an old RDATA command, which should get dropped. This
  348. # re-uses the row from above, but with an earlier stream token.
  349. self.hs.get_tcp_replication().send_command(
  350. RdataCommand("events", "master", 1, row)
  351. )
  352. # No updates have been received (because it was discard as old).
  353. received_rows = [
  354. row for row in self.test_handler.received_rdata_rows if row[0] == "events"
  355. ]
  356. self.assertEqual(len(received_rows), 0)
  357. # Ensure the stream has not gone backwards.
  358. current_token = worker_events_stream.current_token("master")
  359. self.assertGreaterEqual(current_token, prev_token)
  360. event_count = 0
  361. def _inject_test_event(
  362. self, body: Optional[str] = None, sender: Optional[str] = None, **kwargs
  363. ) -> EventBase:
  364. if sender is None:
  365. sender = self.user_id
  366. if body is None:
  367. body = "event %i" % (self.event_count,)
  368. self.event_count += 1
  369. return self.get_success(
  370. inject_event(
  371. self.hs,
  372. room_id=self.room_id,
  373. sender=sender,
  374. type="test_event",
  375. content={"body": body},
  376. **kwargs,
  377. )
  378. )
  379. def _inject_state_event(
  380. self,
  381. body: Optional[str] = None,
  382. state_key: Optional[str] = None,
  383. sender: Optional[str] = None,
  384. ) -> EventBase:
  385. if sender is None:
  386. sender = self.user_id
  387. if state_key is None:
  388. state_key = "state_%i" % (self.event_count,)
  389. self.event_count += 1
  390. if body is None:
  391. body = "state event %s" % (state_key,)
  392. return self.get_success(
  393. inject_event(
  394. self.hs,
  395. room_id=self.room_id,
  396. sender=sender,
  397. type="test_state_event",
  398. state_key=state_key,
  399. content={"body": body},
  400. )
  401. )