test_event_federation.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595
  1. # Copyright 2018 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the 'License');
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an 'AS IS' BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Tuple, Union
  15. import attr
  16. from parameterized import parameterized
  17. from synapse.api.room_versions import (
  18. KNOWN_ROOM_VERSIONS,
  19. EventFormatVersions,
  20. RoomVersion,
  21. )
  22. from synapse.events import _EventInternalMetadata
  23. from synapse.util import json_encoder
  24. import tests.unittest
  25. import tests.utils
  26. class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase):
  27. def prepare(self, reactor, clock, hs):
  28. self.store = hs.get_datastores().main
  29. def test_get_prev_events_for_room(self):
  30. room_id = "@ROOM:local"
  31. # add a bunch of events and hashes to act as forward extremities
  32. def insert_event(txn, i):
  33. event_id = "$event_%i:local" % i
  34. txn.execute(
  35. (
  36. "INSERT INTO events ("
  37. " room_id, event_id, type, depth, topological_ordering,"
  38. " content, processed, outlier, stream_ordering) "
  39. "VALUES (?, ?, 'm.test', ?, ?, 'test', ?, ?, ?)"
  40. ),
  41. (room_id, event_id, i, i, True, False, i),
  42. )
  43. txn.execute(
  44. (
  45. "INSERT INTO event_forward_extremities (room_id, event_id) "
  46. "VALUES (?, ?)"
  47. ),
  48. (room_id, event_id),
  49. )
  50. for i in range(0, 20):
  51. self.get_success(
  52. self.store.db_pool.runInteraction("insert", insert_event, i)
  53. )
  54. # this should get the last ten
  55. r = self.get_success(self.store.get_prev_events_for_room(room_id))
  56. self.assertEqual(10, len(r))
  57. for i in range(0, 10):
  58. self.assertEqual("$event_%i:local" % (19 - i), r[i])
  59. def test_get_rooms_with_many_extremities(self):
  60. room1 = "#room1"
  61. room2 = "#room2"
  62. room3 = "#room3"
  63. def insert_event(txn, i, room_id):
  64. event_id = "$event_%i:local" % i
  65. txn.execute(
  66. (
  67. "INSERT INTO event_forward_extremities (room_id, event_id) "
  68. "VALUES (?, ?)"
  69. ),
  70. (room_id, event_id),
  71. )
  72. for i in range(0, 20):
  73. self.get_success(
  74. self.store.db_pool.runInteraction("insert", insert_event, i, room1)
  75. )
  76. self.get_success(
  77. self.store.db_pool.runInteraction("insert", insert_event, i, room2)
  78. )
  79. self.get_success(
  80. self.store.db_pool.runInteraction("insert", insert_event, i, room3)
  81. )
  82. # Test simple case
  83. r = self.get_success(self.store.get_rooms_with_many_extremities(5, 5, []))
  84. self.assertEqual(len(r), 3)
  85. # Does filter work?
  86. r = self.get_success(self.store.get_rooms_with_many_extremities(5, 5, [room1]))
  87. self.assertTrue(room2 in r)
  88. self.assertTrue(room3 in r)
  89. self.assertEqual(len(r), 2)
  90. r = self.get_success(
  91. self.store.get_rooms_with_many_extremities(5, 5, [room1, room2])
  92. )
  93. self.assertEqual(r, [room3])
  94. # Does filter and limit work?
  95. r = self.get_success(self.store.get_rooms_with_many_extremities(5, 1, [room1]))
  96. self.assertTrue(r == [room2] or r == [room3])
  97. def _setup_auth_chain(self, use_chain_cover_index: bool) -> str:
  98. room_id = "@ROOM:local"
  99. # The silly auth graph we use to test the auth difference algorithm,
  100. # where the top are the most recent events.
  101. #
  102. # A B
  103. # \ /
  104. # D E
  105. # \ |
  106. # ` F C
  107. # | /|
  108. # G ´ |
  109. # | \ |
  110. # H I
  111. # | |
  112. # K J
  113. auth_graph = {
  114. "a": ["e"],
  115. "b": ["e"],
  116. "c": ["g", "i"],
  117. "d": ["f"],
  118. "e": ["f"],
  119. "f": ["g"],
  120. "g": ["h", "i"],
  121. "h": ["k"],
  122. "i": ["j"],
  123. "k": [],
  124. "j": [],
  125. }
  126. depth_map = {
  127. "a": 7,
  128. "b": 7,
  129. "c": 4,
  130. "d": 6,
  131. "e": 6,
  132. "f": 5,
  133. "g": 3,
  134. "h": 2,
  135. "i": 2,
  136. "k": 1,
  137. "j": 1,
  138. }
  139. # Mark the room as maybe having a cover index.
  140. def store_room(txn):
  141. self.store.db_pool.simple_insert_txn(
  142. txn,
  143. "rooms",
  144. {
  145. "room_id": room_id,
  146. "creator": "room_creator_user_id",
  147. "is_public": True,
  148. "room_version": "6",
  149. "has_auth_chain_index": use_chain_cover_index,
  150. },
  151. )
  152. self.get_success(self.store.db_pool.runInteraction("store_room", store_room))
  153. # We rudely fiddle with the appropriate tables directly, as that's much
  154. # easier than constructing events properly.
  155. def insert_event(txn):
  156. stream_ordering = 0
  157. for event_id in auth_graph:
  158. stream_ordering += 1
  159. depth = depth_map[event_id]
  160. self.store.db_pool.simple_insert_txn(
  161. txn,
  162. table="events",
  163. values={
  164. "event_id": event_id,
  165. "room_id": room_id,
  166. "depth": depth,
  167. "topological_ordering": depth,
  168. "type": "m.test",
  169. "processed": True,
  170. "outlier": False,
  171. "stream_ordering": stream_ordering,
  172. },
  173. )
  174. self.hs.datastores.persist_events._persist_event_auth_chain_txn(
  175. txn,
  176. [
  177. FakeEvent(event_id, room_id, auth_graph[event_id])
  178. for event_id in auth_graph
  179. ],
  180. )
  181. self.get_success(
  182. self.store.db_pool.runInteraction(
  183. "insert",
  184. insert_event,
  185. )
  186. )
  187. return room_id
  188. @parameterized.expand([(True,), (False,)])
  189. def test_auth_chain_ids(self, use_chain_cover_index: bool):
  190. room_id = self._setup_auth_chain(use_chain_cover_index)
  191. # a and b have the same auth chain.
  192. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["a"]))
  193. self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
  194. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["b"]))
  195. self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
  196. auth_chain_ids = self.get_success(
  197. self.store.get_auth_chain_ids(room_id, ["a", "b"])
  198. )
  199. self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
  200. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["c"]))
  201. self.assertCountEqual(auth_chain_ids, ["g", "h", "i", "j", "k"])
  202. # d and e have the same auth chain.
  203. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["d"]))
  204. self.assertCountEqual(auth_chain_ids, ["f", "g", "h", "i", "j", "k"])
  205. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["e"]))
  206. self.assertCountEqual(auth_chain_ids, ["f", "g", "h", "i", "j", "k"])
  207. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["f"]))
  208. self.assertCountEqual(auth_chain_ids, ["g", "h", "i", "j", "k"])
  209. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["g"]))
  210. self.assertCountEqual(auth_chain_ids, ["h", "i", "j", "k"])
  211. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["h"]))
  212. self.assertEqual(auth_chain_ids, {"k"})
  213. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["i"]))
  214. self.assertEqual(auth_chain_ids, {"j"})
  215. # j and k have no parents.
  216. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["j"]))
  217. self.assertEqual(auth_chain_ids, set())
  218. auth_chain_ids = self.get_success(self.store.get_auth_chain_ids(room_id, ["k"]))
  219. self.assertEqual(auth_chain_ids, set())
  220. # More complex input sequences.
  221. auth_chain_ids = self.get_success(
  222. self.store.get_auth_chain_ids(room_id, ["b", "c", "d"])
  223. )
  224. self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
  225. auth_chain_ids = self.get_success(
  226. self.store.get_auth_chain_ids(room_id, ["h", "i"])
  227. )
  228. self.assertCountEqual(auth_chain_ids, ["k", "j"])
  229. # e gets returned even though include_given is false, but it is in the
  230. # auth chain of b.
  231. auth_chain_ids = self.get_success(
  232. self.store.get_auth_chain_ids(room_id, ["b", "e"])
  233. )
  234. self.assertCountEqual(auth_chain_ids, ["e", "f", "g", "h", "i", "j", "k"])
  235. # Test include_given.
  236. auth_chain_ids = self.get_success(
  237. self.store.get_auth_chain_ids(room_id, ["i"], include_given=True)
  238. )
  239. self.assertCountEqual(auth_chain_ids, ["i", "j"])
  240. @parameterized.expand([(True,), (False,)])
  241. def test_auth_difference(self, use_chain_cover_index: bool):
  242. room_id = self._setup_auth_chain(use_chain_cover_index)
  243. # Now actually test that various combinations give the right result:
  244. difference = self.get_success(
  245. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}])
  246. )
  247. self.assertSetEqual(difference, {"a", "b"})
  248. difference = self.get_success(
  249. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}])
  250. )
  251. self.assertSetEqual(difference, {"a", "b", "c", "e", "f"})
  252. difference = self.get_success(
  253. self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b"}])
  254. )
  255. self.assertSetEqual(difference, {"a", "b", "c"})
  256. difference = self.get_success(
  257. self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b", "c"}])
  258. )
  259. self.assertSetEqual(difference, {"a", "b"})
  260. difference = self.get_success(
  261. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"d"}])
  262. )
  263. self.assertSetEqual(difference, {"a", "b", "d", "e"})
  264. difference = self.get_success(
  265. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}, {"d"}])
  266. )
  267. self.assertSetEqual(difference, {"a", "b", "c", "d", "e", "f"})
  268. difference = self.get_success(
  269. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"e"}])
  270. )
  271. self.assertSetEqual(difference, {"a", "b"})
  272. difference = self.get_success(
  273. self.store.get_auth_chain_difference(room_id, [{"a"}])
  274. )
  275. self.assertSetEqual(difference, set())
  276. def test_auth_difference_partial_cover(self):
  277. """Test that we correctly handle rooms where not all events have a chain
  278. cover calculated. This can happen in some obscure edge cases, including
  279. during the background update that calculates the chain cover for old
  280. rooms.
  281. """
  282. room_id = "@ROOM:local"
  283. # The silly auth graph we use to test the auth difference algorithm,
  284. # where the top are the most recent events.
  285. #
  286. # A B
  287. # \ /
  288. # D E
  289. # \ |
  290. # ` F C
  291. # | /|
  292. # G ´ |
  293. # | \ |
  294. # H I
  295. # | |
  296. # K J
  297. auth_graph = {
  298. "a": ["e"],
  299. "b": ["e"],
  300. "c": ["g", "i"],
  301. "d": ["f"],
  302. "e": ["f"],
  303. "f": ["g"],
  304. "g": ["h", "i"],
  305. "h": ["k"],
  306. "i": ["j"],
  307. "k": [],
  308. "j": [],
  309. }
  310. depth_map = {
  311. "a": 7,
  312. "b": 7,
  313. "c": 4,
  314. "d": 6,
  315. "e": 6,
  316. "f": 5,
  317. "g": 3,
  318. "h": 2,
  319. "i": 2,
  320. "k": 1,
  321. "j": 1,
  322. }
  323. # We rudely fiddle with the appropriate tables directly, as that's much
  324. # easier than constructing events properly.
  325. def insert_event(txn):
  326. # First insert the room and mark it as having a chain cover.
  327. self.store.db_pool.simple_insert_txn(
  328. txn,
  329. "rooms",
  330. {
  331. "room_id": room_id,
  332. "creator": "room_creator_user_id",
  333. "is_public": True,
  334. "room_version": "6",
  335. "has_auth_chain_index": True,
  336. },
  337. )
  338. stream_ordering = 0
  339. for event_id in auth_graph:
  340. stream_ordering += 1
  341. depth = depth_map[event_id]
  342. self.store.db_pool.simple_insert_txn(
  343. txn,
  344. table="events",
  345. values={
  346. "event_id": event_id,
  347. "room_id": room_id,
  348. "depth": depth,
  349. "topological_ordering": depth,
  350. "type": "m.test",
  351. "processed": True,
  352. "outlier": False,
  353. "stream_ordering": stream_ordering,
  354. },
  355. )
  356. # Insert all events apart from 'B'
  357. self.hs.datastores.persist_events._persist_event_auth_chain_txn(
  358. txn,
  359. [
  360. FakeEvent(event_id, room_id, auth_graph[event_id])
  361. for event_id in auth_graph
  362. if event_id != "b"
  363. ],
  364. )
  365. # Now we insert the event 'B' without a chain cover, by temporarily
  366. # pretending the room doesn't have a chain cover.
  367. self.store.db_pool.simple_update_txn(
  368. txn,
  369. table="rooms",
  370. keyvalues={"room_id": room_id},
  371. updatevalues={"has_auth_chain_index": False},
  372. )
  373. self.hs.datastores.persist_events._persist_event_auth_chain_txn(
  374. txn,
  375. [FakeEvent("b", room_id, auth_graph["b"])],
  376. )
  377. self.store.db_pool.simple_update_txn(
  378. txn,
  379. table="rooms",
  380. keyvalues={"room_id": room_id},
  381. updatevalues={"has_auth_chain_index": True},
  382. )
  383. self.get_success(
  384. self.store.db_pool.runInteraction(
  385. "insert",
  386. insert_event,
  387. )
  388. )
  389. # Now actually test that various combinations give the right result:
  390. difference = self.get_success(
  391. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}])
  392. )
  393. self.assertSetEqual(difference, {"a", "b"})
  394. difference = self.get_success(
  395. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}])
  396. )
  397. self.assertSetEqual(difference, {"a", "b", "c", "e", "f"})
  398. difference = self.get_success(
  399. self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b"}])
  400. )
  401. self.assertSetEqual(difference, {"a", "b", "c"})
  402. difference = self.get_success(
  403. self.store.get_auth_chain_difference(room_id, [{"a", "c"}, {"b", "c"}])
  404. )
  405. self.assertSetEqual(difference, {"a", "b"})
  406. difference = self.get_success(
  407. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"d"}])
  408. )
  409. self.assertSetEqual(difference, {"a", "b", "d", "e"})
  410. difference = self.get_success(
  411. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"c"}, {"d"}])
  412. )
  413. self.assertSetEqual(difference, {"a", "b", "c", "d", "e", "f"})
  414. difference = self.get_success(
  415. self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}, {"e"}])
  416. )
  417. self.assertSetEqual(difference, {"a", "b"})
  418. difference = self.get_success(
  419. self.store.get_auth_chain_difference(room_id, [{"a"}])
  420. )
  421. self.assertSetEqual(difference, set())
  422. @parameterized.expand(
  423. [(room_version,) for room_version in KNOWN_ROOM_VERSIONS.values()]
  424. )
  425. def test_prune_inbound_federation_queue(self, room_version: RoomVersion):
  426. """Test that pruning of inbound federation queues work"""
  427. room_id = "some_room_id"
  428. def prev_event_format(prev_event_id: str) -> Union[Tuple[str, dict], str]:
  429. """Account for differences in prev_events format across room versions"""
  430. if room_version.event_format == EventFormatVersions.V1:
  431. return prev_event_id, {}
  432. return prev_event_id
  433. # Insert a bunch of events that all reference the previous one.
  434. self.get_success(
  435. self.store.db_pool.simple_insert_many(
  436. table="federation_inbound_events_staging",
  437. keys=(
  438. "origin",
  439. "room_id",
  440. "received_ts",
  441. "event_id",
  442. "event_json",
  443. "internal_metadata",
  444. ),
  445. values=[
  446. (
  447. "some_origin",
  448. room_id,
  449. 0,
  450. f"$fake_event_id_{i + 1}",
  451. json_encoder.encode(
  452. {"prev_events": [prev_event_format(f"$fake_event_id_{i}")]}
  453. ),
  454. "{}",
  455. )
  456. for i in range(500)
  457. ],
  458. desc="test_prune_inbound_federation_queue",
  459. )
  460. )
  461. # Calling prune once should return True, i.e. a prune happen. The second
  462. # time it shouldn't.
  463. pruned = self.get_success(
  464. self.store.prune_staged_events_in_room(room_id, room_version)
  465. )
  466. self.assertTrue(pruned)
  467. pruned = self.get_success(
  468. self.store.prune_staged_events_in_room(room_id, room_version)
  469. )
  470. self.assertFalse(pruned)
  471. # Assert that we only have a single event left in the queue, and that it
  472. # is the last one.
  473. count = self.get_success(
  474. self.store.db_pool.simple_select_one_onecol(
  475. table="federation_inbound_events_staging",
  476. keyvalues={"room_id": room_id},
  477. retcol="COUNT(*)",
  478. desc="test_prune_inbound_federation_queue",
  479. )
  480. )
  481. self.assertEqual(count, 1)
  482. _, event_id = self.get_success(
  483. self.store.get_next_staged_event_id_for_room(room_id)
  484. )
  485. self.assertEqual(event_id, "$fake_event_id_500")
  486. @attr.s
  487. class FakeEvent:
  488. event_id = attr.ib()
  489. room_id = attr.ib()
  490. auth_events = attr.ib()
  491. type = "foo"
  492. state_key = "foo"
  493. internal_metadata = _EventInternalMetadata({})
  494. def auth_event_ids(self):
  495. return self.auth_events
  496. def is_state(self):
  497. return True