test_federation_catch_up.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. from typing import List, Tuple
  2. from unittest.mock import Mock
  3. from synapse.api.constants import EventTypes
  4. from synapse.events import EventBase
  5. from synapse.federation.sender import PerDestinationQueue, TransactionManager
  6. from synapse.federation.units import Edu
  7. from synapse.rest import admin
  8. from synapse.rest.client import login, room
  9. from synapse.util.retryutils import NotRetryingDestination
  10. from tests.test_utils import event_injection, make_awaitable
  11. from tests.unittest import FederatingHomeserverTestCase, override_config
  12. class FederationCatchUpTestCases(FederatingHomeserverTestCase):
  13. servlets = [
  14. admin.register_servlets,
  15. room.register_servlets,
  16. login.register_servlets,
  17. ]
  18. def make_homeserver(self, reactor, clock):
  19. return self.setup_test_homeserver(
  20. federation_transport_client=Mock(spec=["send_transaction"]),
  21. )
  22. def prepare(self, reactor, clock, hs):
  23. # stub out get_current_hosts_in_room
  24. state_handler = hs.get_state_handler()
  25. # This mock is crucial for destination_rooms to be populated.
  26. state_handler.get_current_hosts_in_room = Mock(
  27. return_value=make_awaitable(["test", "host2"])
  28. )
  29. # whenever send_transaction is called, record the pdu data
  30. self.pdus = []
  31. self.failed_pdus = []
  32. self.is_online = True
  33. self.hs.get_federation_transport_client().send_transaction.side_effect = (
  34. self.record_transaction
  35. )
  36. async def record_transaction(self, txn, json_cb):
  37. if self.is_online:
  38. data = json_cb()
  39. self.pdus.extend(data["pdus"])
  40. return {}
  41. else:
  42. data = json_cb()
  43. self.failed_pdus.extend(data["pdus"])
  44. raise NotRetryingDestination(0, 24 * 60 * 60 * 1000, txn.destination)
  45. def get_destination_room(self, room: str, destination: str = "host2") -> dict:
  46. """
  47. Gets the destination_rooms entry for a (destination, room_id) pair.
  48. Args:
  49. room: room ID
  50. destination: what destination, default is "host2"
  51. Returns:
  52. Dictionary of { event_id: str, stream_ordering: int }
  53. """
  54. event_id, stream_ordering = self.get_success(
  55. self.hs.get_datastores().main.db_pool.execute(
  56. "test:get_destination_rooms",
  57. None,
  58. """
  59. SELECT event_id, stream_ordering
  60. FROM destination_rooms dr
  61. JOIN events USING (stream_ordering)
  62. WHERE dr.destination = ? AND dr.room_id = ?
  63. """,
  64. destination,
  65. room,
  66. )
  67. )[0]
  68. return {"event_id": event_id, "stream_ordering": stream_ordering}
  69. @override_config({"send_federation": True})
  70. def test_catch_up_destination_rooms_tracking(self):
  71. """
  72. Tests that we populate the `destination_rooms` table as needed.
  73. """
  74. self.register_user("u1", "you the one")
  75. u1_token = self.login("u1", "you the one")
  76. room = self.helper.create_room_as("u1", tok=u1_token)
  77. self.get_success(
  78. event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
  79. )
  80. event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"]
  81. row_1 = self.get_destination_room(room)
  82. event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]
  83. row_2 = self.get_destination_room(room)
  84. # check: events correctly registered in order
  85. self.assertEqual(row_1["event_id"], event_id_1)
  86. self.assertEqual(row_2["event_id"], event_id_2)
  87. self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)
  88. @override_config({"send_federation": True})
  89. def test_catch_up_last_successful_stream_ordering_tracking(self):
  90. """
  91. Tests that we populate the `destination_rooms` table as needed.
  92. """
  93. self.register_user("u1", "you the one")
  94. u1_token = self.login("u1", "you the one")
  95. room = self.helper.create_room_as("u1", tok=u1_token)
  96. # take the remote offline
  97. self.is_online = False
  98. self.get_success(
  99. event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
  100. )
  101. self.helper.send(room, "wombats!", tok=u1_token)
  102. self.pump()
  103. lsso_1 = self.get_success(
  104. self.hs.get_datastores().main.get_destination_last_successful_stream_ordering(
  105. "host2"
  106. )
  107. )
  108. self.assertIsNone(
  109. lsso_1,
  110. "There should be no last successful stream ordering for an always-offline destination",
  111. )
  112. # bring the remote online
  113. self.is_online = True
  114. event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]
  115. lsso_2 = self.get_success(
  116. self.hs.get_datastores().main.get_destination_last_successful_stream_ordering(
  117. "host2"
  118. )
  119. )
  120. row_2 = self.get_destination_room(room)
  121. self.assertEqual(
  122. self.pdus[0]["content"]["body"],
  123. "rabbits!",
  124. "Test fault: didn't receive the right PDU",
  125. )
  126. self.assertEqual(
  127. row_2["event_id"],
  128. event_id_2,
  129. "Test fault: destination_rooms not updated correctly",
  130. )
  131. self.assertEqual(
  132. lsso_2,
  133. row_2["stream_ordering"],
  134. "Send succeeded but not marked as last_successful_stream_ordering",
  135. )
  136. @override_config({"send_federation": True}) # critical to federate
  137. def test_catch_up_from_blank_state(self):
  138. """
  139. Runs an overall test of federation catch-up from scratch.
  140. Further tests will focus on more narrow aspects and edge-cases, but I
  141. hope to provide an overall view with this test.
  142. """
  143. # bring the other server online
  144. self.is_online = True
  145. # let's make some events for the other server to receive
  146. self.register_user("u1", "you the one")
  147. u1_token = self.login("u1", "you the one")
  148. room_1 = self.helper.create_room_as("u1", tok=u1_token)
  149. room_2 = self.helper.create_room_as("u1", tok=u1_token)
  150. # also critical to federate
  151. self.get_success(
  152. event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
  153. )
  154. self.get_success(
  155. event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
  156. )
  157. self.helper.send_state(
  158. room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token
  159. )
  160. # check: PDU received for topic event
  161. self.assertEqual(len(self.pdus), 1)
  162. self.assertEqual(self.pdus[0]["type"], "m.room.topic")
  163. # take the remote offline
  164. self.is_online = False
  165. # send another event
  166. self.helper.send(room_1, "hi user!", tok=u1_token)
  167. # check: things didn't go well since the remote is down
  168. self.assertEqual(len(self.failed_pdus), 1)
  169. self.assertEqual(self.failed_pdus[0]["content"]["body"], "hi user!")
  170. # let's delete the federation transmission queue
  171. # (this pretends we are starting up fresh.)
  172. self.assertFalse(
  173. self.hs.get_federation_sender()
  174. ._per_destination_queues["host2"]
  175. .transmission_loop_running
  176. )
  177. del self.hs.get_federation_sender()._per_destination_queues["host2"]
  178. # let's also clear any backoffs
  179. self.get_success(
  180. self.hs.get_datastores().main.set_destination_retry_timings(
  181. "host2", None, 0, 0
  182. )
  183. )
  184. # bring the remote online and clear the received pdu list
  185. self.is_online = True
  186. self.pdus = []
  187. # now we need to initiate a federation transaction somehow…
  188. # to do that, let's send another event (because it's simple to do)
  189. # (do it to another room otherwise the catch-up logic decides it doesn't
  190. # need to catch up room_1 — something I overlooked when first writing
  191. # this test)
  192. self.helper.send(room_2, "wombats!", tok=u1_token)
  193. # we should now have received both PDUs
  194. self.assertEqual(len(self.pdus), 2)
  195. self.assertEqual(self.pdus[0]["content"]["body"], "hi user!")
  196. self.assertEqual(self.pdus[1]["content"]["body"], "wombats!")
  197. def make_fake_destination_queue(
  198. self, destination: str = "host2"
  199. ) -> Tuple[PerDestinationQueue, List[EventBase]]:
  200. """
  201. Makes a fake per-destination queue.
  202. """
  203. transaction_manager = TransactionManager(self.hs)
  204. per_dest_queue = PerDestinationQueue(self.hs, transaction_manager, destination)
  205. results_list = []
  206. async def fake_send(
  207. destination_tm: str,
  208. pending_pdus: List[EventBase],
  209. _pending_edus: List[Edu],
  210. ) -> bool:
  211. assert destination == destination_tm
  212. results_list.extend(pending_pdus)
  213. return True # success!
  214. transaction_manager.send_new_transaction = fake_send
  215. return per_dest_queue, results_list
  216. @override_config({"send_federation": True})
  217. def test_catch_up_loop(self):
  218. """
  219. Tests the behaviour of _catch_up_transmission_loop.
  220. """
  221. # ARRANGE:
  222. # - a local user (u1)
  223. # - 3 rooms which u1 is joined to (and remote user @user:host2 is
  224. # joined to)
  225. # - some events (1 to 5) in those rooms
  226. # we have 'already sent' events 1 and 2 to host2
  227. per_dest_queue, sent_pdus = self.make_fake_destination_queue()
  228. self.register_user("u1", "you the one")
  229. u1_token = self.login("u1", "you the one")
  230. room_1 = self.helper.create_room_as("u1", tok=u1_token)
  231. room_2 = self.helper.create_room_as("u1", tok=u1_token)
  232. room_3 = self.helper.create_room_as("u1", tok=u1_token)
  233. self.get_success(
  234. event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
  235. )
  236. self.get_success(
  237. event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
  238. )
  239. self.get_success(
  240. event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")
  241. )
  242. # create some events
  243. self.helper.send(room_1, "you hear me!!", tok=u1_token)
  244. event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"]
  245. self.helper.send(room_3, "Matrix!", tok=u1_token)
  246. event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"]
  247. event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"]
  248. # destination_rooms should already be populated, but let us pretend that we already
  249. # sent (successfully) up to and including event id 2
  250. event_2 = self.get_success(self.hs.get_datastores().main.get_event(event_id_2))
  251. # also fetch event 5 so we know its last_successful_stream_ordering later
  252. event_5 = self.get_success(self.hs.get_datastores().main.get_event(event_id_5))
  253. self.get_success(
  254. self.hs.get_datastores().main.set_destination_last_successful_stream_ordering(
  255. "host2", event_2.internal_metadata.stream_ordering
  256. )
  257. )
  258. # ACT
  259. self.get_success(per_dest_queue._catch_up_transmission_loop())
  260. # ASSERT, noticing in particular:
  261. # - event 3 not sent out, because event 5 replaces it
  262. # - order is least recent first, so event 5 comes after event 4
  263. # - catch-up is completed
  264. self.assertEqual(len(sent_pdus), 2)
  265. self.assertEqual(sent_pdus[0].event_id, event_id_4)
  266. self.assertEqual(sent_pdus[1].event_id, event_id_5)
  267. self.assertFalse(per_dest_queue._catching_up)
  268. self.assertEqual(
  269. per_dest_queue._last_successful_stream_ordering,
  270. event_5.internal_metadata.stream_ordering,
  271. )
  272. @override_config({"send_federation": True})
  273. def test_catch_up_on_synapse_startup(self):
  274. """
  275. Tests the behaviour of get_catch_up_outstanding_destinations and
  276. _wake_destinations_needing_catchup.
  277. """
  278. # list of sorted server names (note that there are more servers than the batch
  279. # size used in get_catch_up_outstanding_destinations).
  280. server_names = ["server%02d" % number for number in range(42)] + ["zzzerver"]
  281. # ARRANGE:
  282. # - a local user (u1)
  283. # - a room which u1 is joined to (and remote users @user:serverXX are
  284. # joined to)
  285. # mark the remotes as online
  286. self.is_online = True
  287. self.register_user("u1", "you the one")
  288. u1_token = self.login("u1", "you the one")
  289. room_id = self.helper.create_room_as("u1", tok=u1_token)
  290. for server_name in server_names:
  291. self.get_success(
  292. event_injection.inject_member_event(
  293. self.hs, room_id, "@user:%s" % server_name, "join"
  294. )
  295. )
  296. # create an event
  297. self.helper.send(room_id, "deary me!", tok=u1_token)
  298. # ASSERT:
  299. # - All servers are up to date so none should have outstanding catch-up
  300. outstanding_when_successful = self.get_success(
  301. self.hs.get_datastores().main.get_catch_up_outstanding_destinations(None)
  302. )
  303. self.assertEqual(outstanding_when_successful, [])
  304. # ACT:
  305. # - Make the remote servers unreachable
  306. self.is_online = False
  307. # - Mark zzzerver as being backed-off from
  308. now = self.clock.time_msec()
  309. self.get_success(
  310. self.hs.get_datastores().main.set_destination_retry_timings(
  311. "zzzerver", now, now, 24 * 60 * 60 * 1000 # retry in 1 day
  312. )
  313. )
  314. # - Send an event
  315. self.helper.send(room_id, "can anyone hear me?", tok=u1_token)
  316. # ASSERT (get_catch_up_outstanding_destinations):
  317. # - all remotes are outstanding
  318. # - they are returned in batches of 25, in order
  319. outstanding_1 = self.get_success(
  320. self.hs.get_datastores().main.get_catch_up_outstanding_destinations(None)
  321. )
  322. self.assertEqual(len(outstanding_1), 25)
  323. self.assertEqual(outstanding_1, server_names[0:25])
  324. outstanding_2 = self.get_success(
  325. self.hs.get_datastores().main.get_catch_up_outstanding_destinations(
  326. outstanding_1[-1]
  327. )
  328. )
  329. self.assertNotIn("zzzerver", outstanding_2)
  330. self.assertEqual(len(outstanding_2), 17)
  331. self.assertEqual(outstanding_2, server_names[25:-1])
  332. # ACT: call _wake_destinations_needing_catchup
  333. # patch wake_destination to just count the destinations instead
  334. woken = []
  335. def wake_destination_track(destination):
  336. woken.append(destination)
  337. self.hs.get_federation_sender().wake_destination = wake_destination_track
  338. # cancel the pre-existing timer for _wake_destinations_needing_catchup
  339. # this is because we are calling it manually rather than waiting for it
  340. # to be called automatically
  341. self.hs.get_federation_sender()._catchup_after_startup_timer.cancel()
  342. self.get_success(
  343. self.hs.get_federation_sender()._wake_destinations_needing_catchup(), by=5.0
  344. )
  345. # ASSERT (_wake_destinations_needing_catchup):
  346. # - all remotes are woken up, save for zzzerver
  347. self.assertNotIn("zzzerver", woken)
  348. # - all destinations are woken exactly once; they appear once in woken.
  349. self.assertCountEqual(woken, server_names[:-1])
  350. @override_config({"send_federation": True})
  351. def test_not_latest_event(self):
  352. """Test that we send the latest event in the room even if its not ours."""
  353. per_dest_queue, sent_pdus = self.make_fake_destination_queue()
  354. # Make a room with a local user, and two servers. One will go offline
  355. # and one will send some events.
  356. self.register_user("u1", "you the one")
  357. u1_token = self.login("u1", "you the one")
  358. room_1 = self.helper.create_room_as("u1", tok=u1_token)
  359. self.get_success(
  360. event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
  361. )
  362. event_1 = self.get_success(
  363. event_injection.inject_member_event(self.hs, room_1, "@user:host3", "join")
  364. )
  365. # First we send something from the local server, so that we notice the
  366. # remote is down and go into catchup mode.
  367. self.helper.send(room_1, "you hear me!!", tok=u1_token)
  368. # Now simulate us receiving an event from the still online remote.
  369. event_2 = self.get_success(
  370. event_injection.inject_event(
  371. self.hs,
  372. type=EventTypes.Message,
  373. sender="@user:host3",
  374. room_id=room_1,
  375. content={"msgtype": "m.text", "body": "Hello"},
  376. )
  377. )
  378. self.get_success(
  379. self.hs.get_datastores().main.set_destination_last_successful_stream_ordering(
  380. "host2", event_1.internal_metadata.stream_ordering
  381. )
  382. )
  383. self.get_success(per_dest_queue._catch_up_transmission_loop())
  384. # We expect only the last message from the remote, event_2, to have been
  385. # sent, rather than the last *local* event that was sent.
  386. self.assertEqual(len(sent_pdus), 1)
  387. self.assertEqual(sent_pdus[0].event_id, event_2.event_id)
  388. self.assertFalse(per_dest_queue._catching_up)