test_event_chain.py 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the 'License');
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an 'AS IS' BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Dict, List, Set, Tuple
  15. from twisted.trial import unittest
  16. from synapse.api.constants import EventTypes
  17. from synapse.api.room_versions import RoomVersions
  18. from synapse.events import EventBase
  19. from synapse.rest import admin
  20. from synapse.rest.client import login, room
  21. from synapse.storage.databases.main.events import _LinkMap
  22. from synapse.types import create_requester
  23. from tests.unittest import HomeserverTestCase
  24. class EventChainStoreTestCase(HomeserverTestCase):
  25. def prepare(self, reactor, clock, hs):
  26. self.store = hs.get_datastore()
  27. self._next_stream_ordering = 1
  28. def test_simple(self):
  29. """Test that the example in `docs/auth_chain_difference_algorithm.md`
  30. works.
  31. """
  32. event_factory = self.hs.get_event_builder_factory()
  33. bob = "@creator:test"
  34. alice = "@alice:test"
  35. room_id = "!room:test"
  36. # Ensure that we have a rooms entry so that we generate the chain index.
  37. self.get_success(
  38. self.store.store_room(
  39. room_id=room_id,
  40. room_creator_user_id="",
  41. is_public=True,
  42. room_version=RoomVersions.V6,
  43. )
  44. )
  45. create = self.get_success(
  46. event_factory.for_room_version(
  47. RoomVersions.V6,
  48. {
  49. "type": EventTypes.Create,
  50. "state_key": "",
  51. "sender": bob,
  52. "room_id": room_id,
  53. "content": {"tag": "create"},
  54. },
  55. ).build(prev_event_ids=[], auth_event_ids=[])
  56. )
  57. bob_join = self.get_success(
  58. event_factory.for_room_version(
  59. RoomVersions.V6,
  60. {
  61. "type": EventTypes.Member,
  62. "state_key": bob,
  63. "sender": bob,
  64. "room_id": room_id,
  65. "content": {"tag": "bob_join"},
  66. },
  67. ).build(prev_event_ids=[], auth_event_ids=[create.event_id])
  68. )
  69. power = self.get_success(
  70. event_factory.for_room_version(
  71. RoomVersions.V6,
  72. {
  73. "type": EventTypes.PowerLevels,
  74. "state_key": "",
  75. "sender": bob,
  76. "room_id": room_id,
  77. "content": {"tag": "power"},
  78. },
  79. ).build(
  80. prev_event_ids=[],
  81. auth_event_ids=[create.event_id, bob_join.event_id],
  82. )
  83. )
  84. alice_invite = self.get_success(
  85. event_factory.for_room_version(
  86. RoomVersions.V6,
  87. {
  88. "type": EventTypes.Member,
  89. "state_key": alice,
  90. "sender": bob,
  91. "room_id": room_id,
  92. "content": {"tag": "alice_invite"},
  93. },
  94. ).build(
  95. prev_event_ids=[],
  96. auth_event_ids=[create.event_id, bob_join.event_id, power.event_id],
  97. )
  98. )
  99. alice_join = self.get_success(
  100. event_factory.for_room_version(
  101. RoomVersions.V6,
  102. {
  103. "type": EventTypes.Member,
  104. "state_key": alice,
  105. "sender": alice,
  106. "room_id": room_id,
  107. "content": {"tag": "alice_join"},
  108. },
  109. ).build(
  110. prev_event_ids=[],
  111. auth_event_ids=[create.event_id, alice_invite.event_id, power.event_id],
  112. )
  113. )
  114. power_2 = self.get_success(
  115. event_factory.for_room_version(
  116. RoomVersions.V6,
  117. {
  118. "type": EventTypes.PowerLevels,
  119. "state_key": "",
  120. "sender": bob,
  121. "room_id": room_id,
  122. "content": {"tag": "power_2"},
  123. },
  124. ).build(
  125. prev_event_ids=[],
  126. auth_event_ids=[create.event_id, bob_join.event_id, power.event_id],
  127. )
  128. )
  129. bob_join_2 = self.get_success(
  130. event_factory.for_room_version(
  131. RoomVersions.V6,
  132. {
  133. "type": EventTypes.Member,
  134. "state_key": bob,
  135. "sender": bob,
  136. "room_id": room_id,
  137. "content": {"tag": "bob_join_2"},
  138. },
  139. ).build(
  140. prev_event_ids=[],
  141. auth_event_ids=[create.event_id, bob_join.event_id, power.event_id],
  142. )
  143. )
  144. alice_join2 = self.get_success(
  145. event_factory.for_room_version(
  146. RoomVersions.V6,
  147. {
  148. "type": EventTypes.Member,
  149. "state_key": alice,
  150. "sender": alice,
  151. "room_id": room_id,
  152. "content": {"tag": "alice_join2"},
  153. },
  154. ).build(
  155. prev_event_ids=[],
  156. auth_event_ids=[
  157. create.event_id,
  158. alice_join.event_id,
  159. power_2.event_id,
  160. ],
  161. )
  162. )
  163. events = [
  164. create,
  165. bob_join,
  166. power,
  167. alice_invite,
  168. alice_join,
  169. bob_join_2,
  170. power_2,
  171. alice_join2,
  172. ]
  173. expected_links = [
  174. (bob_join, create),
  175. (power, create),
  176. (power, bob_join),
  177. (alice_invite, create),
  178. (alice_invite, power),
  179. (alice_invite, bob_join),
  180. (bob_join_2, power),
  181. (alice_join2, power_2),
  182. ]
  183. self.persist(events)
  184. chain_map, link_map = self.fetch_chains(events)
  185. # Check that the expected links and only the expected links have been
  186. # added.
  187. self.assertEqual(len(expected_links), len(list(link_map.get_additions())))
  188. for start, end in expected_links:
  189. start_id, start_seq = chain_map[start.event_id]
  190. end_id, end_seq = chain_map[end.event_id]
  191. self.assertIn(
  192. (start_seq, end_seq), list(link_map.get_links_between(start_id, end_id))
  193. )
  194. # Test that everything can reach the create event, but the create event
  195. # can't reach anything.
  196. for event in events[1:]:
  197. self.assertTrue(
  198. link_map.exists_path_from(
  199. chain_map[event.event_id], chain_map[create.event_id]
  200. ),
  201. )
  202. self.assertFalse(
  203. link_map.exists_path_from(
  204. chain_map[create.event_id],
  205. chain_map[event.event_id],
  206. ),
  207. )
  208. def test_out_of_order_events(self):
  209. """Test that we handle persisting events that we don't have the full
  210. auth chain for yet (which should only happen for out of band memberships).
  211. """
  212. event_factory = self.hs.get_event_builder_factory()
  213. bob = "@creator:test"
  214. alice = "@alice:test"
  215. room_id = "!room:test"
  216. # Ensure that we have a rooms entry so that we generate the chain index.
  217. self.get_success(
  218. self.store.store_room(
  219. room_id=room_id,
  220. room_creator_user_id="",
  221. is_public=True,
  222. room_version=RoomVersions.V6,
  223. )
  224. )
  225. # First persist the base room.
  226. create = self.get_success(
  227. event_factory.for_room_version(
  228. RoomVersions.V6,
  229. {
  230. "type": EventTypes.Create,
  231. "state_key": "",
  232. "sender": bob,
  233. "room_id": room_id,
  234. "content": {"tag": "create"},
  235. },
  236. ).build(prev_event_ids=[], auth_event_ids=[])
  237. )
  238. bob_join = self.get_success(
  239. event_factory.for_room_version(
  240. RoomVersions.V6,
  241. {
  242. "type": EventTypes.Member,
  243. "state_key": bob,
  244. "sender": bob,
  245. "room_id": room_id,
  246. "content": {"tag": "bob_join"},
  247. },
  248. ).build(prev_event_ids=[], auth_event_ids=[create.event_id])
  249. )
  250. power = self.get_success(
  251. event_factory.for_room_version(
  252. RoomVersions.V6,
  253. {
  254. "type": EventTypes.PowerLevels,
  255. "state_key": "",
  256. "sender": bob,
  257. "room_id": room_id,
  258. "content": {"tag": "power"},
  259. },
  260. ).build(
  261. prev_event_ids=[],
  262. auth_event_ids=[create.event_id, bob_join.event_id],
  263. )
  264. )
  265. self.persist([create, bob_join, power])
  266. # Now persist an invite and a couple of memberships out of order.
  267. alice_invite = self.get_success(
  268. event_factory.for_room_version(
  269. RoomVersions.V6,
  270. {
  271. "type": EventTypes.Member,
  272. "state_key": alice,
  273. "sender": bob,
  274. "room_id": room_id,
  275. "content": {"tag": "alice_invite"},
  276. },
  277. ).build(
  278. prev_event_ids=[],
  279. auth_event_ids=[create.event_id, bob_join.event_id, power.event_id],
  280. )
  281. )
  282. alice_join = self.get_success(
  283. event_factory.for_room_version(
  284. RoomVersions.V6,
  285. {
  286. "type": EventTypes.Member,
  287. "state_key": alice,
  288. "sender": alice,
  289. "room_id": room_id,
  290. "content": {"tag": "alice_join"},
  291. },
  292. ).build(
  293. prev_event_ids=[],
  294. auth_event_ids=[create.event_id, alice_invite.event_id, power.event_id],
  295. )
  296. )
  297. alice_join2 = self.get_success(
  298. event_factory.for_room_version(
  299. RoomVersions.V6,
  300. {
  301. "type": EventTypes.Member,
  302. "state_key": alice,
  303. "sender": alice,
  304. "room_id": room_id,
  305. "content": {"tag": "alice_join2"},
  306. },
  307. ).build(
  308. prev_event_ids=[],
  309. auth_event_ids=[create.event_id, alice_join.event_id, power.event_id],
  310. )
  311. )
  312. self.persist([alice_join])
  313. self.persist([alice_join2])
  314. self.persist([alice_invite])
  315. # The end result should be sane.
  316. events = [create, bob_join, power, alice_invite, alice_join]
  317. chain_map, link_map = self.fetch_chains(events)
  318. expected_links = [
  319. (bob_join, create),
  320. (power, create),
  321. (power, bob_join),
  322. (alice_invite, create),
  323. (alice_invite, power),
  324. (alice_invite, bob_join),
  325. ]
  326. # Check that the expected links and only the expected links have been
  327. # added.
  328. self.assertEqual(len(expected_links), len(list(link_map.get_additions())))
  329. for start, end in expected_links:
  330. start_id, start_seq = chain_map[start.event_id]
  331. end_id, end_seq = chain_map[end.event_id]
  332. self.assertIn(
  333. (start_seq, end_seq), list(link_map.get_links_between(start_id, end_id))
  334. )
  335. def persist(
  336. self,
  337. events: List[EventBase],
  338. ):
  339. """Persist the given events and check that the links generated match
  340. those given.
  341. """
  342. persist_events_store = self.hs.get_datastores().persist_events
  343. for e in events:
  344. e.internal_metadata.stream_ordering = self._next_stream_ordering
  345. self._next_stream_ordering += 1
  346. def _persist(txn):
  347. # We need to persist the events to the events and state_events
  348. # tables.
  349. persist_events_store._store_event_txn(txn, [(e, {}) for e in events])
  350. # Actually call the function that calculates the auth chain stuff.
  351. persist_events_store._persist_event_auth_chain_txn(txn, events)
  352. self.get_success(
  353. persist_events_store.db_pool.runInteraction(
  354. "_persist",
  355. _persist,
  356. )
  357. )
  358. def fetch_chains(
  359. self, events: List[EventBase]
  360. ) -> Tuple[Dict[str, Tuple[int, int]], _LinkMap]:
  361. # Fetch the map from event ID -> (chain ID, sequence number)
  362. rows = self.get_success(
  363. self.store.db_pool.simple_select_many_batch(
  364. table="event_auth_chains",
  365. column="event_id",
  366. iterable=[e.event_id for e in events],
  367. retcols=("event_id", "chain_id", "sequence_number"),
  368. keyvalues={},
  369. )
  370. )
  371. chain_map = {
  372. row["event_id"]: (row["chain_id"], row["sequence_number"]) for row in rows
  373. }
  374. # Fetch all the links and pass them to the _LinkMap.
  375. rows = self.get_success(
  376. self.store.db_pool.simple_select_many_batch(
  377. table="event_auth_chain_links",
  378. column="origin_chain_id",
  379. iterable=[chain_id for chain_id, _ in chain_map.values()],
  380. retcols=(
  381. "origin_chain_id",
  382. "origin_sequence_number",
  383. "target_chain_id",
  384. "target_sequence_number",
  385. ),
  386. keyvalues={},
  387. )
  388. )
  389. link_map = _LinkMap()
  390. for row in rows:
  391. added = link_map.add_link(
  392. (row["origin_chain_id"], row["origin_sequence_number"]),
  393. (row["target_chain_id"], row["target_sequence_number"]),
  394. )
  395. # We shouldn't have persisted any redundant links
  396. self.assertTrue(added)
  397. return chain_map, link_map
  398. class LinkMapTestCase(unittest.TestCase):
  399. def test_simple(self):
  400. """Basic tests for the LinkMap."""
  401. link_map = _LinkMap()
  402. link_map.add_link((1, 1), (2, 1), new=False)
  403. self.assertCountEqual(link_map.get_links_between(1, 2), [(1, 1)])
  404. self.assertCountEqual(link_map.get_links_from((1, 1)), [(2, 1)])
  405. self.assertCountEqual(link_map.get_additions(), [])
  406. self.assertTrue(link_map.exists_path_from((1, 5), (2, 1)))
  407. self.assertFalse(link_map.exists_path_from((1, 5), (2, 2)))
  408. self.assertTrue(link_map.exists_path_from((1, 5), (1, 1)))
  409. self.assertFalse(link_map.exists_path_from((1, 1), (1, 5)))
  410. # Attempting to add a redundant link is ignored.
  411. self.assertFalse(link_map.add_link((1, 4), (2, 1)))
  412. self.assertCountEqual(link_map.get_links_between(1, 2), [(1, 1)])
  413. # Adding new non-redundant links works
  414. self.assertTrue(link_map.add_link((1, 3), (2, 3)))
  415. self.assertCountEqual(link_map.get_links_between(1, 2), [(1, 1), (3, 3)])
  416. self.assertTrue(link_map.add_link((2, 5), (1, 3)))
  417. self.assertCountEqual(link_map.get_links_between(2, 1), [(5, 3)])
  418. self.assertCountEqual(link_map.get_links_between(1, 2), [(1, 1), (3, 3)])
  419. self.assertCountEqual(link_map.get_additions(), [(1, 3, 2, 3), (2, 5, 1, 3)])
  420. class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
  421. servlets = [
  422. admin.register_servlets,
  423. room.register_servlets,
  424. login.register_servlets,
  425. ]
  426. def prepare(self, reactor, clock, hs):
  427. self.store = hs.get_datastore()
  428. self.user_id = self.register_user("foo", "pass")
  429. self.token = self.login("foo", "pass")
  430. self.requester = create_requester(self.user_id)
  431. def _generate_room(self) -> Tuple[str, List[Set[str]]]:
  432. """Insert a room without a chain cover index."""
  433. room_id = self.helper.create_room_as(self.user_id, tok=self.token)
  434. # Mark the room as not having a chain cover index
  435. self.get_success(
  436. self.store.db_pool.simple_update(
  437. table="rooms",
  438. keyvalues={"room_id": room_id},
  439. updatevalues={"has_auth_chain_index": False},
  440. desc="test",
  441. )
  442. )
  443. # Create a fork in the DAG with different events.
  444. event_handler = self.hs.get_event_creation_handler()
  445. latest_event_ids = self.get_success(
  446. self.store.get_prev_events_for_room(room_id)
  447. )
  448. event, context = self.get_success(
  449. event_handler.create_event(
  450. self.requester,
  451. {
  452. "type": "some_state_type",
  453. "state_key": "",
  454. "content": {},
  455. "room_id": room_id,
  456. "sender": self.user_id,
  457. },
  458. prev_event_ids=latest_event_ids,
  459. )
  460. )
  461. self.get_success(
  462. event_handler.handle_new_client_event(self.requester, event, context)
  463. )
  464. state1 = set(self.get_success(context.get_current_state_ids()).values())
  465. event, context = self.get_success(
  466. event_handler.create_event(
  467. self.requester,
  468. {
  469. "type": "some_state_type",
  470. "state_key": "",
  471. "content": {},
  472. "room_id": room_id,
  473. "sender": self.user_id,
  474. },
  475. prev_event_ids=latest_event_ids,
  476. )
  477. )
  478. self.get_success(
  479. event_handler.handle_new_client_event(self.requester, event, context)
  480. )
  481. state2 = set(self.get_success(context.get_current_state_ids()).values())
  482. # Delete the chain cover info.
  483. def _delete_tables(txn):
  484. txn.execute("DELETE FROM event_auth_chains")
  485. txn.execute("DELETE FROM event_auth_chain_links")
  486. self.get_success(self.store.db_pool.runInteraction("test", _delete_tables))
  487. return room_id, [state1, state2]
  488. def test_background_update_single_room(self):
  489. """Test that the background update to calculate auth chains for historic
  490. rooms works correctly.
  491. """
  492. # Create a room
  493. room_id, states = self._generate_room()
  494. # Insert and run the background update.
  495. self.get_success(
  496. self.store.db_pool.simple_insert(
  497. "background_updates",
  498. {"update_name": "chain_cover", "progress_json": "{}"},
  499. )
  500. )
  501. # Ugh, have to reset this flag
  502. self.store.db_pool.updates._all_done = False
  503. self.wait_for_background_updates()
  504. # Test that the `has_auth_chain_index` has been set
  505. self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id)))
  506. # Test that calculating the auth chain difference using the newly
  507. # calculated chain cover works.
  508. self.get_success(
  509. self.store.db_pool.runInteraction(
  510. "test",
  511. self.store._get_auth_chain_difference_using_cover_index_txn,
  512. room_id,
  513. states,
  514. )
  515. )
  516. def test_background_update_multiple_rooms(self):
  517. """Test that the background update to calculate auth chains for historic
  518. rooms works correctly.
  519. """
  520. # Create a room
  521. room_id1, states1 = self._generate_room()
  522. room_id2, states2 = self._generate_room()
  523. room_id3, states2 = self._generate_room()
  524. # Insert and run the background update.
  525. self.get_success(
  526. self.store.db_pool.simple_insert(
  527. "background_updates",
  528. {"update_name": "chain_cover", "progress_json": "{}"},
  529. )
  530. )
  531. # Ugh, have to reset this flag
  532. self.store.db_pool.updates._all_done = False
  533. self.wait_for_background_updates()
  534. # Test that the `has_auth_chain_index` has been set
  535. self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id1)))
  536. self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id2)))
  537. self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id3)))
  538. # Test that calculating the auth chain difference using the newly
  539. # calculated chain cover works.
  540. self.get_success(
  541. self.store.db_pool.runInteraction(
  542. "test",
  543. self.store._get_auth_chain_difference_using_cover_index_txn,
  544. room_id1,
  545. states1,
  546. )
  547. )
  548. def test_background_update_single_large_room(self):
  549. """Test that the background update to calculate auth chains for historic
  550. rooms works correctly.
  551. """
  552. # Create a room
  553. room_id, states = self._generate_room()
  554. # Add a bunch of state so that it takes multiple iterations of the
  555. # background update to process the room.
  556. for i in range(0, 150):
  557. self.helper.send_state(
  558. room_id, event_type="m.test", body={"index": i}, tok=self.token
  559. )
  560. # Insert and run the background update.
  561. self.get_success(
  562. self.store.db_pool.simple_insert(
  563. "background_updates",
  564. {"update_name": "chain_cover", "progress_json": "{}"},
  565. )
  566. )
  567. # Ugh, have to reset this flag
  568. self.store.db_pool.updates._all_done = False
  569. iterations = 0
  570. while not self.get_success(
  571. self.store.db_pool.updates.has_completed_background_updates()
  572. ):
  573. iterations += 1
  574. self.get_success(
  575. self.store.db_pool.updates.do_next_background_update(100), by=0.1
  576. )
  577. # Ensure that we did actually take multiple iterations to process the
  578. # room.
  579. self.assertGreater(iterations, 1)
  580. # Test that the `has_auth_chain_index` has been set
  581. self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id)))
  582. # Test that calculating the auth chain difference using the newly
  583. # calculated chain cover works.
  584. self.get_success(
  585. self.store.db_pool.runInteraction(
  586. "test",
  587. self.store._get_auth_chain_difference_using_cover_index_txn,
  588. room_id,
  589. states,
  590. )
  591. )
  592. def test_background_update_multiple_large_room(self):
  593. """Test that the background update to calculate auth chains for historic
  594. rooms works correctly.
  595. """
  596. # Create the rooms
  597. room_id1, _ = self._generate_room()
  598. room_id2, _ = self._generate_room()
  599. # Add a bunch of state so that it takes multiple iterations of the
  600. # background update to process the room.
  601. for i in range(0, 150):
  602. self.helper.send_state(
  603. room_id1, event_type="m.test", body={"index": i}, tok=self.token
  604. )
  605. for i in range(0, 150):
  606. self.helper.send_state(
  607. room_id2, event_type="m.test", body={"index": i}, tok=self.token
  608. )
  609. # Insert and run the background update.
  610. self.get_success(
  611. self.store.db_pool.simple_insert(
  612. "background_updates",
  613. {"update_name": "chain_cover", "progress_json": "{}"},
  614. )
  615. )
  616. # Ugh, have to reset this flag
  617. self.store.db_pool.updates._all_done = False
  618. iterations = 0
  619. while not self.get_success(
  620. self.store.db_pool.updates.has_completed_background_updates()
  621. ):
  622. iterations += 1
  623. self.get_success(
  624. self.store.db_pool.updates.do_next_background_update(100), by=0.1
  625. )
  626. # Ensure that we did actually take multiple iterations to process the
  627. # room.
  628. self.assertGreater(iterations, 1)
  629. # Test that the `has_auth_chain_index` has been set
  630. self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id1)))
  631. self.assertTrue(self.get_success(self.store.has_auth_chain_index(room_id2)))