test_federation_catch_up.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471
  1. from typing import List, Tuple
  2. from unittest.mock import Mock
  3. from synapse.api.constants import EventTypes
  4. from synapse.events import EventBase
  5. from synapse.federation.sender import PerDestinationQueue, TransactionManager
  6. from synapse.federation.units import Edu
  7. from synapse.rest import admin
  8. from synapse.rest.client.v1 import login, room
  9. from synapse.util.retryutils import NotRetryingDestination
  10. from tests.test_utils import event_injection, make_awaitable
  11. from tests.unittest import FederatingHomeserverTestCase, override_config
  12. class FederationCatchUpTestCases(FederatingHomeserverTestCase):
  13. servlets = [
  14. admin.register_servlets,
  15. room.register_servlets,
  16. login.register_servlets,
  17. ]
  18. def make_homeserver(self, reactor, clock):
  19. return self.setup_test_homeserver(
  20. federation_transport_client=Mock(spec=["send_transaction"]),
  21. )
  22. def prepare(self, reactor, clock, hs):
  23. # stub out get_current_hosts_in_room
  24. state_handler = hs.get_state_handler()
  25. # This mock is crucial for destination_rooms to be populated.
  26. state_handler.get_current_hosts_in_room = Mock(
  27. return_value=make_awaitable(["test", "host2"])
  28. )
  29. # whenever send_transaction is called, record the pdu data
  30. self.pdus = []
  31. self.failed_pdus = []
  32. self.is_online = True
  33. self.hs.get_federation_transport_client().send_transaction.side_effect = (
  34. self.record_transaction
  35. )
  36. async def record_transaction(self, txn, json_cb):
  37. if self.is_online:
  38. data = json_cb()
  39. self.pdus.extend(data["pdus"])
  40. return {}
  41. else:
  42. data = json_cb()
  43. self.failed_pdus.extend(data["pdus"])
  44. raise NotRetryingDestination(0, 24 * 60 * 60 * 1000, txn.destination)
  45. def get_destination_room(self, room: str, destination: str = "host2") -> dict:
  46. """
  47. Gets the destination_rooms entry for a (destination, room_id) pair.
  48. Args:
  49. room: room ID
  50. destination: what destination, default is "host2"
  51. Returns:
  52. Dictionary of { event_id: str, stream_ordering: int }
  53. """
  54. event_id, stream_ordering = self.get_success(
  55. self.hs.get_datastore().db_pool.execute(
  56. "test:get_destination_rooms",
  57. None,
  58. """
  59. SELECT event_id, stream_ordering
  60. FROM destination_rooms dr
  61. JOIN events USING (stream_ordering)
  62. WHERE dr.destination = ? AND dr.room_id = ?
  63. """,
  64. destination,
  65. room,
  66. )
  67. )[0]
  68. return {"event_id": event_id, "stream_ordering": stream_ordering}
  69. @override_config({"send_federation": True})
  70. def test_catch_up_destination_rooms_tracking(self):
  71. """
  72. Tests that we populate the `destination_rooms` table as needed.
  73. """
  74. self.register_user("u1", "you the one")
  75. u1_token = self.login("u1", "you the one")
  76. room = self.helper.create_room_as("u1", tok=u1_token)
  77. self.get_success(
  78. event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
  79. )
  80. event_id_1 = self.helper.send(room, "wombats!", tok=u1_token)["event_id"]
  81. row_1 = self.get_destination_room(room)
  82. event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]
  83. row_2 = self.get_destination_room(room)
  84. # check: events correctly registered in order
  85. self.assertEqual(row_1["event_id"], event_id_1)
  86. self.assertEqual(row_2["event_id"], event_id_2)
  87. self.assertEqual(row_1["stream_ordering"], row_2["stream_ordering"] - 1)
  88. @override_config({"send_federation": True})
  89. def test_catch_up_last_successful_stream_ordering_tracking(self):
  90. """
  91. Tests that we populate the `destination_rooms` table as needed.
  92. """
  93. self.register_user("u1", "you the one")
  94. u1_token = self.login("u1", "you the one")
  95. room = self.helper.create_room_as("u1", tok=u1_token)
  96. # take the remote offline
  97. self.is_online = False
  98. self.get_success(
  99. event_injection.inject_member_event(self.hs, room, "@user:host2", "join")
  100. )
  101. self.helper.send(room, "wombats!", tok=u1_token)
  102. self.pump()
  103. lsso_1 = self.get_success(
  104. self.hs.get_datastore().get_destination_last_successful_stream_ordering(
  105. "host2"
  106. )
  107. )
  108. self.assertIsNone(
  109. lsso_1,
  110. "There should be no last successful stream ordering for an always-offline destination",
  111. )
  112. # bring the remote online
  113. self.is_online = True
  114. event_id_2 = self.helper.send(room, "rabbits!", tok=u1_token)["event_id"]
  115. lsso_2 = self.get_success(
  116. self.hs.get_datastore().get_destination_last_successful_stream_ordering(
  117. "host2"
  118. )
  119. )
  120. row_2 = self.get_destination_room(room)
  121. self.assertEqual(
  122. self.pdus[0]["content"]["body"],
  123. "rabbits!",
  124. "Test fault: didn't receive the right PDU",
  125. )
  126. self.assertEqual(
  127. row_2["event_id"],
  128. event_id_2,
  129. "Test fault: destination_rooms not updated correctly",
  130. )
  131. self.assertEqual(
  132. lsso_2,
  133. row_2["stream_ordering"],
  134. "Send succeeded but not marked as last_successful_stream_ordering",
  135. )
  136. @override_config({"send_federation": True}) # critical to federate
  137. def test_catch_up_from_blank_state(self):
  138. """
  139. Runs an overall test of federation catch-up from scratch.
  140. Further tests will focus on more narrow aspects and edge-cases, but I
  141. hope to provide an overall view with this test.
  142. """
  143. # bring the other server online
  144. self.is_online = True
  145. # let's make some events for the other server to receive
  146. self.register_user("u1", "you the one")
  147. u1_token = self.login("u1", "you the one")
  148. room_1 = self.helper.create_room_as("u1", tok=u1_token)
  149. room_2 = self.helper.create_room_as("u1", tok=u1_token)
  150. # also critical to federate
  151. self.get_success(
  152. event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
  153. )
  154. self.get_success(
  155. event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
  156. )
  157. self.helper.send_state(
  158. room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token
  159. )
  160. # check: PDU received for topic event
  161. self.assertEqual(len(self.pdus), 1)
  162. self.assertEqual(self.pdus[0]["type"], "m.room.topic")
  163. # take the remote offline
  164. self.is_online = False
  165. # send another event
  166. self.helper.send(room_1, "hi user!", tok=u1_token)
  167. # check: things didn't go well since the remote is down
  168. self.assertEqual(len(self.failed_pdus), 1)
  169. self.assertEqual(self.failed_pdus[0]["content"]["body"], "hi user!")
  170. # let's delete the federation transmission queue
  171. # (this pretends we are starting up fresh.)
  172. self.assertFalse(
  173. self.hs.get_federation_sender()
  174. ._per_destination_queues["host2"]
  175. .transmission_loop_running
  176. )
  177. del self.hs.get_federation_sender()._per_destination_queues["host2"]
  178. # let's also clear any backoffs
  179. self.get_success(
  180. self.hs.get_datastore().set_destination_retry_timings("host2", None, 0, 0)
  181. )
  182. # bring the remote online and clear the received pdu list
  183. self.is_online = True
  184. self.pdus = []
  185. # now we need to initiate a federation transaction somehow…
  186. # to do that, let's send another event (because it's simple to do)
  187. # (do it to another room otherwise the catch-up logic decides it doesn't
  188. # need to catch up room_1 — something I overlooked when first writing
  189. # this test)
  190. self.helper.send(room_2, "wombats!", tok=u1_token)
  191. # we should now have received both PDUs
  192. self.assertEqual(len(self.pdus), 2)
  193. self.assertEqual(self.pdus[0]["content"]["body"], "hi user!")
  194. self.assertEqual(self.pdus[1]["content"]["body"], "wombats!")
  195. def make_fake_destination_queue(
  196. self, destination: str = "host2"
  197. ) -> Tuple[PerDestinationQueue, List[EventBase]]:
  198. """
  199. Makes a fake per-destination queue.
  200. """
  201. transaction_manager = TransactionManager(self.hs)
  202. per_dest_queue = PerDestinationQueue(self.hs, transaction_manager, destination)
  203. results_list = []
  204. async def fake_send(
  205. destination_tm: str,
  206. pending_pdus: List[EventBase],
  207. _pending_edus: List[Edu],
  208. ) -> bool:
  209. assert destination == destination_tm
  210. results_list.extend(pending_pdus)
  211. return True # success!
  212. transaction_manager.send_new_transaction = fake_send
  213. return per_dest_queue, results_list
  214. @override_config({"send_federation": True})
  215. def test_catch_up_loop(self):
  216. """
  217. Tests the behaviour of _catch_up_transmission_loop.
  218. """
  219. # ARRANGE:
  220. # - a local user (u1)
  221. # - 3 rooms which u1 is joined to (and remote user @user:host2 is
  222. # joined to)
  223. # - some events (1 to 5) in those rooms
  224. # we have 'already sent' events 1 and 2 to host2
  225. per_dest_queue, sent_pdus = self.make_fake_destination_queue()
  226. self.register_user("u1", "you the one")
  227. u1_token = self.login("u1", "you the one")
  228. room_1 = self.helper.create_room_as("u1", tok=u1_token)
  229. room_2 = self.helper.create_room_as("u1", tok=u1_token)
  230. room_3 = self.helper.create_room_as("u1", tok=u1_token)
  231. self.get_success(
  232. event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
  233. )
  234. self.get_success(
  235. event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
  236. )
  237. self.get_success(
  238. event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")
  239. )
  240. # create some events
  241. self.helper.send(room_1, "you hear me!!", tok=u1_token)
  242. event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"]
  243. self.helper.send(room_3, "Matrix!", tok=u1_token)
  244. event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"]
  245. event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"]
  246. # destination_rooms should already be populated, but let us pretend that we already
  247. # sent (successfully) up to and including event id 2
  248. event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2))
  249. # also fetch event 5 so we know its last_successful_stream_ordering later
  250. event_5 = self.get_success(self.hs.get_datastore().get_event(event_id_5))
  251. self.get_success(
  252. self.hs.get_datastore().set_destination_last_successful_stream_ordering(
  253. "host2", event_2.internal_metadata.stream_ordering
  254. )
  255. )
  256. # ACT
  257. self.get_success(per_dest_queue._catch_up_transmission_loop())
  258. # ASSERT, noticing in particular:
  259. # - event 3 not sent out, because event 5 replaces it
  260. # - order is least recent first, so event 5 comes after event 4
  261. # - catch-up is completed
  262. self.assertEqual(len(sent_pdus), 2)
  263. self.assertEqual(sent_pdus[0].event_id, event_id_4)
  264. self.assertEqual(sent_pdus[1].event_id, event_id_5)
  265. self.assertFalse(per_dest_queue._catching_up)
  266. self.assertEqual(
  267. per_dest_queue._last_successful_stream_ordering,
  268. event_5.internal_metadata.stream_ordering,
  269. )
  270. @override_config({"send_federation": True})
  271. def test_catch_up_on_synapse_startup(self):
  272. """
  273. Tests the behaviour of get_catch_up_outstanding_destinations and
  274. _wake_destinations_needing_catchup.
  275. """
  276. # list of sorted server names (note that there are more servers than the batch
  277. # size used in get_catch_up_outstanding_destinations).
  278. server_names = ["server%02d" % number for number in range(42)] + ["zzzerver"]
  279. # ARRANGE:
  280. # - a local user (u1)
  281. # - a room which u1 is joined to (and remote users @user:serverXX are
  282. # joined to)
  283. # mark the remotes as online
  284. self.is_online = True
  285. self.register_user("u1", "you the one")
  286. u1_token = self.login("u1", "you the one")
  287. room_id = self.helper.create_room_as("u1", tok=u1_token)
  288. for server_name in server_names:
  289. self.get_success(
  290. event_injection.inject_member_event(
  291. self.hs, room_id, "@user:%s" % server_name, "join"
  292. )
  293. )
  294. # create an event
  295. self.helper.send(room_id, "deary me!", tok=u1_token)
  296. # ASSERT:
  297. # - All servers are up to date so none should have outstanding catch-up
  298. outstanding_when_successful = self.get_success(
  299. self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
  300. )
  301. self.assertEqual(outstanding_when_successful, [])
  302. # ACT:
  303. # - Make the remote servers unreachable
  304. self.is_online = False
  305. # - Mark zzzerver as being backed-off from
  306. now = self.clock.time_msec()
  307. self.get_success(
  308. self.hs.get_datastore().set_destination_retry_timings(
  309. "zzzerver", now, now, 24 * 60 * 60 * 1000 # retry in 1 day
  310. )
  311. )
  312. # - Send an event
  313. self.helper.send(room_id, "can anyone hear me?", tok=u1_token)
  314. # ASSERT (get_catch_up_outstanding_destinations):
  315. # - all remotes are outstanding
  316. # - they are returned in batches of 25, in order
  317. outstanding_1 = self.get_success(
  318. self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
  319. )
  320. self.assertEqual(len(outstanding_1), 25)
  321. self.assertEqual(outstanding_1, server_names[0:25])
  322. outstanding_2 = self.get_success(
  323. self.hs.get_datastore().get_catch_up_outstanding_destinations(
  324. outstanding_1[-1]
  325. )
  326. )
  327. self.assertNotIn("zzzerver", outstanding_2)
  328. self.assertEqual(len(outstanding_2), 17)
  329. self.assertEqual(outstanding_2, server_names[25:-1])
  330. # ACT: call _wake_destinations_needing_catchup
  331. # patch wake_destination to just count the destinations instead
  332. woken = []
  333. def wake_destination_track(destination):
  334. woken.append(destination)
  335. self.hs.get_federation_sender().wake_destination = wake_destination_track
  336. # cancel the pre-existing timer for _wake_destinations_needing_catchup
  337. # this is because we are calling it manually rather than waiting for it
  338. # to be called automatically
  339. self.hs.get_federation_sender()._catchup_after_startup_timer.cancel()
  340. self.get_success(
  341. self.hs.get_federation_sender()._wake_destinations_needing_catchup(), by=5.0
  342. )
  343. # ASSERT (_wake_destinations_needing_catchup):
  344. # - all remotes are woken up, save for zzzerver
  345. self.assertNotIn("zzzerver", woken)
  346. # - all destinations are woken exactly once; they appear once in woken.
  347. self.assertCountEqual(woken, server_names[:-1])
  348. @override_config({"send_federation": True})
  349. def test_not_latest_event(self):
  350. """Test that we send the latest event in the room even if its not ours."""
  351. per_dest_queue, sent_pdus = self.make_fake_destination_queue()
  352. # Make a room with a local user, and two servers. One will go offline
  353. # and one will send some events.
  354. self.register_user("u1", "you the one")
  355. u1_token = self.login("u1", "you the one")
  356. room_1 = self.helper.create_room_as("u1", tok=u1_token)
  357. self.get_success(
  358. event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
  359. )
  360. event_1 = self.get_success(
  361. event_injection.inject_member_event(self.hs, room_1, "@user:host3", "join")
  362. )
  363. # First we send something from the local server, so that we notice the
  364. # remote is down and go into catchup mode.
  365. self.helper.send(room_1, "you hear me!!", tok=u1_token)
  366. # Now simulate us receiving an event from the still online remote.
  367. event_2 = self.get_success(
  368. event_injection.inject_event(
  369. self.hs,
  370. type=EventTypes.Message,
  371. sender="@user:host3",
  372. room_id=room_1,
  373. content={"msgtype": "m.text", "body": "Hello"},
  374. )
  375. )
  376. self.get_success(
  377. self.hs.get_datastore().set_destination_last_successful_stream_ordering(
  378. "host2", event_1.internal_metadata.stream_ordering
  379. )
  380. )
  381. self.get_success(per_dest_queue._catch_up_transmission_loop())
  382. # We expect only the last message from the remote, event_2, to have been
  383. # sent, rather than the last *local* event that was sent.
  384. self.assertEqual(len(sent_pdus), 1)
  385. self.assertEqual(sent_pdus[0].event_id, event_2.event_id)
  386. self.assertFalse(per_dest_queue._catching_up)