test_event_federation.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580
  1. # Copyright 2018 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the 'License');
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an 'AS IS' BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import attr
  15. from parameterized import parameterized
  16. from synapse.api.room_versions import RoomVersions
  17. from synapse.events import _EventInternalMetadata
  18. from synapse.util import json_encoder
  19. import tests.unittest
  20. import tests.utils
  21. class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
  22. def prepare(self, reactor, clock, hs):
  23. self.store = hs.get_datastore()
  24. def test_get_prev_events_for_room(self):
  25. room_id = "@ROOM:local"
  26. # add a bunch of events and hashes to act as forward extremities
  27. def insert_event(txn, i):
  28. event_id = "$event_%i:local" % i
  29. txn.execute(
  30. (
  31. "INSERT INTO events ("
  32. " room_id, event_id, type, depth, topological_ordering,"
  33. " content, processed, outlier, stream_ordering) "
  34. "VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?, ?)"
  35. ),
  36. (room_id, event_id, i, i, True, False, i),
  37. )
  38. txn.execute(
  39. (
  40. "INSERT INTO event_forward_extremities (room_id, event_id) "
  41. "VALUES (?, ?)"
  42. ),
  43. (room_id, event_id),
  44. )
  45. txn.execute(
  46. (
  47. "INSERT INTO event_reference_hashes "
  48. "(event_id, algorithm, hash) "
  49. "VALUES (?, 'sha256', ?)"
  50. ),
  51. (event_id, bytearray(b"ffff")),
  52. )
  53. for i in range(0, 20):
  54. self.get_success(
  55. self.store.db_pool.runInteraction("insert", insert_event, i)
  56. )
  57. # this should get the last ten
  58. r = self.get_success(self.store.get_prev_events_for_room(room_id))
  59. self.assertEqual(10, len(r))
  60. for i in range(0, 10):
  61. self.assertEqual("$event_%i:local" % (19 - i), r[i])
  62. def test_get_rooms_with_many_extremities(self):
  63. room1 = "#room1"
  64. room2 = "#room2"
  65. room3 = "#room3"
  66. def insert_event(txn, i, room_id):
  67. event_id = "$event_%i:local" % i
  68. txn.execute(
  69. (
  70. "INSERT INTO event_forward_extremities (room_id, event_id) "
  71. "VALUES (?, ?)"
  72. ),
  73. (room_id, event_id),
  74. )
  75. for i in range(0, 20):
  76. self.get_success(
  77. self.store.db_pool.runInteraction("insert", insert_event, i, room1)
  78. )
  79. self.get_success(
  80. self.store.db_pool.runInteraction("insert", insert_event, i, room2)
  81. )
  82. self.get_success(
  83. self.store.db_pool.runInteraction("insert", insert_event, i, room3)
  84. )
  85. # Test simple case
  86. r = self.get_success(self.store.get_rooms_with_many_extremities(5, 5, []))
  87. self.assertEqual(len(r), 3)
  88. # Does filter work?
  89. r = self.get_success(self.store.get_rooms_with_many_extremities(5, 5, [room1]))
  90. self.assertTrue(room2 in r)
  91. self.assertTrue(room3 in r)
  92. self.assertEqual(len(r), 2)
  93. r = self.get_success(
  94. self.store.get_rooms_with_many_extremities(5, 5, [room1, room2])
  95. )
  96. self.assertEqual(r, [room3])
  97. # Does filter and limit work?
  98. r = self.get_success(self.store.get_rooms_with_many_extremities(5, 1, [room1]))
  99. self.assertTrue(r == [room2] or r == [room3])
  100. def _setup_auth_chain(self, use_chain_cover_index: bool) -> str:
  101. room_id = "@ROOM:local"
  102. # The silly auth graph we use to test the auth difference algorithm,
  103. # where the top are the most recent events.
  104. #
  105. # A B
  106. # \ /
  107. # D E
  108. # \ |
  109. # ` F C
  110. # | /|
  111. # G ´ |
  112. # | \ |
  113. # H I
  114. # | |
  115. # K J
  116. auth_graph = {
  117. "a": ["e"],
  118. "b": ["e"],
  119. "c": ["g", "i"],
  120. "d": ["f"],
  121. "e": ["f"],
  122. "f": ["g"],
  123. "g": ["h", "i"],
  124. "h": ["k"],
  125. "i": ["j"],
  126. "k": [],
  127. "j": [],
  128. }
  129. depth_map = {
  130. "a": 7,
  131. "b": 7,
  132. "c": 4,
  133. "d": 6,
  134. "e": 6,
  135. "f": 5,
  136. "g": 3,
  137. "h": 2,
  138. "i": 2,
  139. "k": 1,
  140. "j": 1,
  141. }
  142. # Mark the room as maybe having a cover index.
  143. def store_room(txn):
  144. self.store.db_pool.simple_insert_txn(
  145. txn,
  146. "rooms",
  147. {
  148. "room_id": room_id,
  149. "creator": "room_creator_user_id",
  150. "is_public": True,
  151. "room_version": "6",
  152. "has_auth_chain_index": use_chain_cover_index,
  153. },
  154. )
  155. self.get_success(self.store.db_pool.runInteraction("store_room", store_room))
  156. # We rudely fiddle with the appropriate tables directly, as that's much
  157. # easier than constructing events properly.
  158. def insert_event(txn):
  159. stream_ordering = 0
  160. for event_id in auth_graph:
  161. stream_ordering += 1
  162. depth = depth_map[event_id]
  163. self.store.db_pool.simple_insert_txn(
  164. txn,
  165. table="events",
  166. values={
  167. "event_id": event_id,
  168. "room_id": room_id,
  169. "depth": depth,
  170. "topological_ordering": depth,
  171. "type": "m.test",
  172. "processed": True,
  173. "outlier": False,
  174. "stream_ordering": stream_ordering,
  175. },
  176. )
  177. self.hs.datastores.persist_events._persist_event_auth_chain_txn(
  178. txn,
  179. [
  180. FakeEvent(event_id, room_id, auth_graph[event_id])
  181. for event_id in auth_graph
  182. ],
  183. )
  184. self.get_success(
  185. self.store.db_pool.runInteraction(
  186. "insert",
  187. insert_event,
  188. )
  189. )
  190. return room_id
  191. @parameterized.expand([(True,), (False,)])
  192. def test_auth_chain_ids(self, use_chain_cover_index: bool):
  193. room_id = self._setup_auth_chain(use_chain_cover_index)
  194. # a and b have the same auth chain.
  195. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["a"]))
  196. self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
  197. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["b"]))
  198. self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
  199. auth_chain_ids = self.get_success(
  200. self.store.get_auth_chain_ids(room_id, ["a", "b"])
  201. )
  202. self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
  203. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["c"]))
  204. self.assertCountEqual(auth_chain_ids, ["g", "h", "i", "j", "k"])
  205. # d and e have the same auth chain.
  206. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["d"]))
  207. self.assertCountEqual(auth_chain_ids, ["f", "g", "h", "i", "j", "k"])
  208. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["e"]))
  209. self.assertCountEqual(auth_chain_ids, ["f", "g", "h", "i", "j", "k"])
  210. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["f"]))
  211. self.assertCountEqual(auth_chain_ids, ["g", "h", "i", "j", "k"])
  212. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["g"]))
  213. self.assertCountEqual(auth_chain_ids, ["h", "i", "j", "k"])
  214. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["h"]))
  215. self.assertEqual(auth_chain_ids, ["k"])
  216. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["i"]))
  217. self.assertEqual(auth_chain_ids, ["j"])
  218. # j and k have no parents.
  219. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["j"]))
  220. self.assertEqual(auth_chain_ids, [])
  221. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["k"]))
  222. self.assertEqual(auth_chain_ids, [])
  223. # More complex input sequences.
  224. auth_chain_ids = self.get_success(
  225. self.store.get_auth_chain_ids(room_id, ["b", "c", "d"])
  226. )
  227. self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
  228. auth_chain_ids = self.get_success(
  229. self.store.get_auth_chain_ids(room_id, ["h", "i"])
  230. )
  231. self.assertCountEqual(auth_chain_ids, ["k", "j"])
  232. # e gets returned even though include_given is false, but it is in the
  233. # auth chain of b.
  234. auth_chain_ids = self.get_success(
  235. self.store.get_auth_chain_ids(room_id, ["b", "e"])
  236. )
  237. self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
  238. # Test include_given.
  239. auth_chain_ids = self.get_success(
  240. self.store.get_auth_chain_ids(room_id, ["i"], include_given=True)
  241. )
  242. self.assertCountEqual(auth_chain_ids, ["i", "j"])
  243. @parameterized.expand([(True,), (False,)])
  244. def test_auth_difference(self, use_chain_cover_index: bool):
  245. room_id = self._setup_auth_chain(use_chain_cover_index)
  246. # Now actually test that various combinations give the right result:
  247. difference = self.get_success(
  248. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}])
  249. )
  250. self.assertSetEqual(difference, {"a", "b"})
  251. difference = self.get_success(
  252. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}])
  253. )
  254. self.assertSetEqual(difference, {"a", "b", "c", "e", "f"})
  255. difference = self.get_success(
  256. self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b"}])
  257. )
  258. self.assertSetEqual(difference, {"a", "b", "c"})
  259. difference = self.get_success(
  260. self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b", "c"}])
  261. )
  262. self.assertSetEqual(difference, {"a", "b"})
  263. difference = self.get_success(
  264. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"d"}])
  265. )
  266. self.assertSetEqual(difference, {"a", "b", "d", "e"})
  267. difference = self.get_success(
  268. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}, {"d"}])
  269. )
  270. self.assertSetEqual(difference, {"a", "b", "c", "d", "e", "f"})
  271. difference = self.get_success(
  272. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"e"}])
  273. )
  274. self.assertSetEqual(difference, {"a", "b"})
  275. difference = self.get_success(
  276. self.store.get_auth_chain_difference(room_id, [{"a"}])
  277. )
  278. self.assertSetEqual(difference, set())
  279. def test_auth_difference_partial_cover(self):
  280. """Test that we correctly handle rooms where not all events have a chain
  281. cover calculated. This can happen in some obscure edge cases, including
  282. during the background update that calculates the chain cover for old
  283. rooms.
  284. """
  285. room_id = "@ROOM:local"
  286. # The silly auth graph we use to test the auth difference algorithm,
  287. # where the top are the most recent events.
  288. #
  289. # A B
  290. # \ /
  291. # D E
  292. # \ |
  293. # ` F C
  294. # | /|
  295. # G ´ |
  296. # | \ |
  297. # H I
  298. # | |
  299. # K J
  300. auth_graph = {
  301. "a": ["e"],
  302. "b": ["e"],
  303. "c": ["g", "i"],
  304. "d": ["f"],
  305. "e": ["f"],
  306. "f": ["g"],
  307. "g": ["h", "i"],
  308. "h": ["k"],
  309. "i": ["j"],
  310. "k": [],
  311. "j": [],
  312. }
  313. depth_map = {
  314. "a": 7,
  315. "b": 7,
  316. "c": 4,
  317. "d": 6,
  318. "e": 6,
  319. "f": 5,
  320. "g": 3,
  321. "h": 2,
  322. "i": 2,
  323. "k": 1,
  324. "j": 1,
  325. }
  326. # We rudely fiddle with the appropriate tables directly, as that's much
  327. # easier than constructing events properly.
  328. def insert_event(txn):
  329. # First insert the room and mark it as having a chain cover.
  330. self.store.db_pool.simple_insert_txn(
  331. txn,
  332. "rooms",
  333. {
  334. "room_id": room_id,
  335. "creator": "room_creator_user_id",
  336. "is_public": True,
  337. "room_version": "6",
  338. "has_auth_chain_index": True,
  339. },
  340. )
  341. stream_ordering = 0
  342. for event_id in auth_graph:
  343. stream_ordering += 1
  344. depth = depth_map[event_id]
  345. self.store.db_pool.simple_insert_txn(
  346. txn,
  347. table="events",
  348. values={
  349. "event_id": event_id,
  350. "room_id": room_id,
  351. "depth": depth,
  352. "topological_ordering": depth,
  353. "type": "m.test",
  354. "processed": True,
  355. "outlier": False,
  356. "stream_ordering": stream_ordering,
  357. },
  358. )
  359. # Insert all events apart from 'B'
  360. self.hs.datastores.persist_events._persist_event_auth_chain_txn(
  361. txn,
  362. [
  363. FakeEvent(event_id, room_id, auth_graph[event_id])
  364. for event_id in auth_graph
  365. if event_id != "b"
  366. ],
  367. )
  368. # Now we insert the event 'B' without a chain cover, by temporarily
  369. # pretending the room doesn't have a chain cover.
  370. self.store.db_pool.simple_update_txn(
  371. txn,
  372. table="rooms",
  373. keyvalues={"room_id": room_id},
  374. updatevalues={"has_auth_chain_index": False},
  375. )
  376. self.hs.datastores.persist_events._persist_event_auth_chain_txn(
  377. txn,
  378. [FakeEvent("b", room_id, auth_graph["b"])],
  379. )
  380. self.store.db_pool.simple_update_txn(
  381. txn,
  382. table="rooms",
  383. keyvalues={"room_id": room_id},
  384. updatevalues={"has_auth_chain_index": True},
  385. )
  386. self.get_success(
  387. self.store.db_pool.runInteraction(
  388. "insert",
  389. insert_event,
  390. )
  391. )
  392. # Now actually test that various combinations give the right result:
  393. difference = self.get_success(
  394. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}])
  395. )
  396. self.assertSetEqual(difference, {"a", "b"})
  397. difference = self.get_success(
  398. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}])
  399. )
  400. self.assertSetEqual(difference, {"a", "b", "c", "e", "f"})
  401. difference = self.get_success(
  402. self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b"}])
  403. )
  404. self.assertSetEqual(difference, {"a", "b", "c"})
  405. difference = self.get_success(
  406. self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b", "c"}])
  407. )
  408. self.assertSetEqual(difference, {"a", "b"})
  409. difference = self.get_success(
  410. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"d"}])
  411. )
  412. self.assertSetEqual(difference, {"a", "b", "d", "e"})
  413. difference = self.get_success(
  414. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}, {"d"}])
  415. )
  416. self.assertSetEqual(difference, {"a", "b", "c", "d", "e", "f"})
  417. difference = self.get_success(
  418. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"e"}])
  419. )
  420. self.assertSetEqual(difference, {"a", "b"})
  421. difference = self.get_success(
  422. self.store.get_auth_chain_difference(room_id, [{"a"}])
  423. )
  424. self.assertSetEqual(difference, set())
  425. def test_prune_inbound_federation_queue(self):
  426. "Test that pruning of inbound federation queues work"
  427. room_id = "some_room_id"
  428. # Insert a bunch of events that all reference the previous one.
  429. self.get_success(
  430. self.store.db_pool.simple_insert_many(
  431. table="federation_inbound_events_staging",
  432. values=[
  433. {
  434. "origin": "some_origin",
  435. "room_id": room_id,
  436. "received_ts": 0,
  437. "event_id": f"$fake_event_id_{i + 1}",
  438. "event_json": json_encoder.encode(
  439. {"prev_events": [f"$fake_event_id_{i}"]}
  440. ),
  441. "internal_metadata": "{}",
  442. }
  443. for i in range(500)
  444. ],
  445. desc="test_prune_inbound_federation_queue",
  446. )
  447. )
  448. # Calling prune once should return True, i.e. a prune happen. The second
  449. # time it shouldn't.
  450. pruned = self.get_success(
  451. self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
  452. )
  453. self.assertTrue(pruned)
  454. pruned = self.get_success(
  455. self.store.prune_staged_events_in_room(room_id, RoomVersions.V6)
  456. )
  457. self.assertFalse(pruned)
  458. # Assert that we only have a single event left in the queue, and that it
  459. # is the last one.
  460. count = self.get_success(
  461. self.store.db_pool.simple_select_one_onecol(
  462. table="federation_inbound_events_staging",
  463. keyvalues={"room_id": room_id},
  464. retcol="COALESCE(COUNT(*), 0)",
  465. desc="test_prune_inbound_federation_queue",
  466. )
  467. )
  468. self.assertEqual(count, 1)
  469. _, event_id = self.get_success(
  470. self.store.get_next_staged_event_id_for_room(room_id)
  471. )
  472. self.assertEqual(event_id, "$fake_event_id_500")
  473. @attr.s
  474. class FakeEvent:
  475. event_id = attr.ib()
  476. room_id = attr.ib()
  477. auth_events = attr.ib()
  478. type = "foo"
  479. state_key = "foo"
  480. internal_metadata = _EventInternalMetadata({})
  481. def auth_event_ids(self):
  482. return self.auth_events
  483. def is_state(self):
  484. return True