test_federation_sender.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637
  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Optional
  15. from unittest.mock import Mock
  16. from signedjson import key, sign
  17. from signedjson.types import BaseKey, SigningKey
  18. from twisted.internet import defer
  19. from synapse.api.constants import EduTypes, RoomEncryptionAlgorithms
  20. from synapse.rest import admin
  21. from synapse.rest.client import login
  22. from synapse.types import JsonDict, ReadReceipt
  23. from tests.test_utils import make_awaitable
  24. from tests.unittest import HomeserverTestCase, override_config
  25. class FederationSenderReceiptsTestCases(HomeserverTestCase):
  26. def make_homeserver(self, reactor, clock):
  27. mock_state_handler = Mock(spec=["get_current_hosts_in_room"])
  28. # Ensure a new Awaitable is created for each call.
  29. mock_state_handler.get_current_hosts_in_room.return_value = make_awaitable(
  30. ["test", "host2"]
  31. )
  32. return self.setup_test_homeserver(
  33. state_handler=mock_state_handler,
  34. federation_transport_client=Mock(spec=["send_transaction"]),
  35. )
  36. @override_config({"send_federation": True})
  37. def test_send_receipts(self):
  38. mock_send_transaction = (
  39. self.hs.get_federation_transport_client().send_transaction
  40. )
  41. mock_send_transaction.return_value = make_awaitable({})
  42. sender = self.hs.get_federation_sender()
  43. receipt = ReadReceipt(
  44. "room_id", "m.read", "user_id", ["event_id"], {"ts": 1234}
  45. )
  46. self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
  47. self.pump()
  48. # expect a call to send_transaction
  49. mock_send_transaction.assert_called_once()
  50. json_cb = mock_send_transaction.call_args[0][1]
  51. data = json_cb()
  52. self.assertEqual(
  53. data["edus"],
  54. [
  55. {
  56. "edu_type": EduTypes.RECEIPT,
  57. "content": {
  58. "room_id": {
  59. "m.read": {
  60. "user_id": {
  61. "event_ids": ["event_id"],
  62. "data": {"ts": 1234},
  63. }
  64. }
  65. }
  66. },
  67. }
  68. ],
  69. )
  70. @override_config({"send_federation": True})
  71. def test_send_receipts_with_backoff(self):
  72. """Send two receipts in quick succession; the second should be flushed, but
  73. only after 20ms"""
  74. mock_send_transaction = (
  75. self.hs.get_federation_transport_client().send_transaction
  76. )
  77. mock_send_transaction.return_value = make_awaitable({})
  78. sender = self.hs.get_federation_sender()
  79. receipt = ReadReceipt(
  80. "room_id", "m.read", "user_id", ["event_id"], {"ts": 1234}
  81. )
  82. self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
  83. self.pump()
  84. # expect a call to send_transaction
  85. mock_send_transaction.assert_called_once()
  86. json_cb = mock_send_transaction.call_args[0][1]
  87. data = json_cb()
  88. self.assertEqual(
  89. data["edus"],
  90. [
  91. {
  92. "edu_type": EduTypes.RECEIPT,
  93. "content": {
  94. "room_id": {
  95. "m.read": {
  96. "user_id": {
  97. "event_ids": ["event_id"],
  98. "data": {"ts": 1234},
  99. }
  100. }
  101. }
  102. },
  103. }
  104. ],
  105. )
  106. mock_send_transaction.reset_mock()
  107. # send the second RR
  108. receipt = ReadReceipt(
  109. "room_id", "m.read", "user_id", ["other_id"], {"ts": 1234}
  110. )
  111. self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt)))
  112. self.pump()
  113. mock_send_transaction.assert_not_called()
  114. self.reactor.advance(19)
  115. mock_send_transaction.assert_not_called()
  116. self.reactor.advance(10)
  117. mock_send_transaction.assert_called_once()
  118. json_cb = mock_send_transaction.call_args[0][1]
  119. data = json_cb()
  120. self.assertEqual(
  121. data["edus"],
  122. [
  123. {
  124. "edu_type": EduTypes.RECEIPT,
  125. "content": {
  126. "room_id": {
  127. "m.read": {
  128. "user_id": {
  129. "event_ids": ["other_id"],
  130. "data": {"ts": 1234},
  131. }
  132. }
  133. }
  134. },
  135. }
  136. ],
  137. )
  138. class FederationSenderDevicesTestCases(HomeserverTestCase):
  139. servlets = [
  140. admin.register_servlets,
  141. login.register_servlets,
  142. ]
  143. def make_homeserver(self, reactor, clock):
  144. return self.setup_test_homeserver(
  145. federation_transport_client=Mock(
  146. spec=["send_transaction", "query_user_devices"]
  147. ),
  148. )
  149. def default_config(self):
  150. c = super().default_config()
  151. c["send_federation"] = True
  152. return c
  153. def prepare(self, reactor, clock, hs):
  154. # stub out `get_rooms_for_user` and `get_users_in_room` so that the
  155. # server thinks the user shares a room with `@user2:host2`
  156. def get_rooms_for_user(user_id):
  157. return defer.succeed({"!room:host1"})
  158. hs.get_datastores().main.get_rooms_for_user = get_rooms_for_user
  159. def get_users_in_room(room_id):
  160. return defer.succeed({"@user2:host2"})
  161. hs.get_datastores().main.get_users_in_room = get_users_in_room
  162. # whenever send_transaction is called, record the edu data
  163. self.edus = []
  164. self.hs.get_federation_transport_client().send_transaction.side_effect = (
  165. self.record_transaction
  166. )
  167. def record_transaction(self, txn, json_cb):
  168. data = json_cb()
  169. self.edus.extend(data["edus"])
  170. return defer.succeed({})
  171. def test_send_device_updates(self):
  172. """Basic case: each device update should result in an EDU"""
  173. # create a device
  174. u1 = self.register_user("user", "pass")
  175. self.login(u1, "pass", device_id="D1")
  176. # expect one edu
  177. self.assertEqual(len(self.edus), 1)
  178. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", None)
  179. # We queue up device list updates to be sent over federation, so we
  180. # advance to clear the queue.
  181. self.reactor.advance(1)
  182. # a second call should produce no new device EDUs
  183. self.hs.get_federation_sender().send_device_messages("host2")
  184. self.assertEqual(self.edus, [])
  185. # a second device
  186. self.login("user", "pass", device_id="D2")
  187. self.assertEqual(len(self.edus), 1)
  188. self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id)
  189. def test_dont_send_device_updates_for_remote_users(self):
  190. """Check that we don't send device updates for remote users"""
  191. # Send the server a device list EDU for the other user, this will cause
  192. # it to try and resync the device lists.
  193. self.hs.get_federation_transport_client().query_user_devices.return_value = (
  194. make_awaitable(
  195. {
  196. "stream_id": "1",
  197. "user_id": "@user2:host2",
  198. "devices": [{"device_id": "D1"}],
  199. }
  200. )
  201. )
  202. self.get_success(
  203. self.hs.get_device_handler().device_list_updater.incoming_device_list_update(
  204. "host2",
  205. {
  206. "user_id": "@user2:host2",
  207. "device_id": "D1",
  208. "stream_id": "1",
  209. "prev_ids": [],
  210. },
  211. )
  212. )
  213. self.reactor.advance(1)
  214. # We shouldn't see an EDU for that update
  215. self.assertEqual(self.edus, [])
  216. # Check that we did successfully process the inbound EDU (otherwise this
  217. # test would pass if we failed to process the EDU)
  218. devices = self.get_success(
  219. self.hs.get_datastores().main.get_cached_devices_for_user("@user2:host2")
  220. )
  221. self.assertIn("D1", devices)
  222. def test_upload_signatures(self):
  223. """Uploading signatures on some devices should produce updates for that user"""
  224. e2e_handler = self.hs.get_e2e_keys_handler()
  225. # register two devices
  226. u1 = self.register_user("user", "pass")
  227. self.login(u1, "pass", device_id="D1")
  228. self.login(u1, "pass", device_id="D2")
  229. # expect two edus
  230. self.assertEqual(len(self.edus), 2)
  231. stream_id = None
  232. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", stream_id)
  233. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id)
  234. # upload signing keys for each device
  235. device1_signing_key = self.generate_and_upload_device_signing_key(u1, "D1")
  236. device2_signing_key = self.generate_and_upload_device_signing_key(u1, "D2")
  237. # We queue up device list updates to be sent over federation, so we
  238. # advance to clear the queue.
  239. self.reactor.advance(1)
  240. # expect two more edus
  241. self.assertEqual(len(self.edus), 2)
  242. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", stream_id)
  243. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id)
  244. # upload master key and self-signing key
  245. master_signing_key = generate_self_id_key()
  246. master_key = {
  247. "user_id": u1,
  248. "usage": ["master"],
  249. "keys": {key_id(master_signing_key): encode_pubkey(master_signing_key)},
  250. }
  251. # private key: HvQBbU+hc2Zr+JP1sE0XwBe1pfZZEYtJNPJLZJtS+F8
  252. selfsigning_signing_key = generate_self_id_key()
  253. selfsigning_key = {
  254. "user_id": u1,
  255. "usage": ["self_signing"],
  256. "keys": {
  257. key_id(selfsigning_signing_key): encode_pubkey(selfsigning_signing_key)
  258. },
  259. }
  260. sign.sign_json(selfsigning_key, u1, master_signing_key)
  261. cross_signing_keys = {
  262. "master_key": master_key,
  263. "self_signing_key": selfsigning_key,
  264. }
  265. self.get_success(
  266. e2e_handler.upload_signing_keys_for_user(u1, cross_signing_keys)
  267. )
  268. # We queue up device list updates to be sent over federation, so we
  269. # advance to clear the queue.
  270. self.reactor.advance(1)
  271. # expect signing key update edu
  272. self.assertEqual(len(self.edus), 2)
  273. self.assertEqual(self.edus.pop(0)["edu_type"], EduTypes.SIGNING_KEY_UPDATE)
  274. self.assertEqual(
  275. self.edus.pop(0)["edu_type"], EduTypes.UNSTABLE_SIGNING_KEY_UPDATE
  276. )
  277. # sign the devices
  278. d1_json = build_device_dict(u1, "D1", device1_signing_key)
  279. sign.sign_json(d1_json, u1, selfsigning_signing_key)
  280. d2_json = build_device_dict(u1, "D2", device2_signing_key)
  281. sign.sign_json(d2_json, u1, selfsigning_signing_key)
  282. ret = self.get_success(
  283. e2e_handler.upload_signatures_for_device_keys(
  284. u1,
  285. {u1: {"D1": d1_json, "D2": d2_json}},
  286. )
  287. )
  288. self.assertEqual(ret["failures"], {})
  289. # We queue up device list updates to be sent over federation, so we
  290. # advance to clear the queue.
  291. self.reactor.advance(1)
  292. # expect two edus, in one or two transactions. We don't know what order the
  293. # devices will be updated.
  294. self.assertEqual(len(self.edus), 2)
  295. stream_id = None # FIXME: there is a discontinuity in the stream IDs: see #7142
  296. for edu in self.edus:
  297. self.assertEqual(edu["edu_type"], EduTypes.DEVICE_LIST_UPDATE)
  298. c = edu["content"]
  299. if stream_id is not None:
  300. self.assertEqual(c["prev_id"], [stream_id])
  301. self.assertGreaterEqual(c["stream_id"], stream_id)
  302. stream_id = c["stream_id"]
  303. devices = {edu["content"]["device_id"] for edu in self.edus}
  304. self.assertEqual({"D1", "D2"}, devices)
  305. def test_delete_devices(self):
  306. """If devices are deleted, that should result in EDUs too"""
  307. # create devices
  308. u1 = self.register_user("user", "pass")
  309. self.login("user", "pass", device_id="D1")
  310. self.login("user", "pass", device_id="D2")
  311. self.login("user", "pass", device_id="D3")
  312. # We queue up device list updates to be sent over federation, so we
  313. # advance to clear the queue.
  314. self.reactor.advance(1)
  315. # expect three edus
  316. self.assertEqual(len(self.edus), 3)
  317. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", None)
  318. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id)
  319. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D3", stream_id)
  320. # delete them again
  321. self.get_success(
  322. self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
  323. )
  324. # We queue up device list updates to be sent over federation, so we
  325. # advance to clear the queue.
  326. self.reactor.advance(1)
  327. # expect three edus, in an unknown order
  328. self.assertEqual(len(self.edus), 3)
  329. for edu in self.edus:
  330. self.assertEqual(edu["edu_type"], EduTypes.DEVICE_LIST_UPDATE)
  331. c = edu["content"]
  332. self.assertGreaterEqual(
  333. c.items(),
  334. {"user_id": u1, "prev_id": [stream_id], "deleted": True}.items(),
  335. )
  336. self.assertGreaterEqual(c["stream_id"], stream_id)
  337. stream_id = c["stream_id"]
  338. devices = {edu["content"]["device_id"] for edu in self.edus}
  339. self.assertEqual({"D1", "D2", "D3"}, devices)
  340. def test_unreachable_server(self):
  341. """If the destination server is unreachable, all the updates should get sent on
  342. recovery
  343. """
  344. mock_send_txn = self.hs.get_federation_transport_client().send_transaction
  345. mock_send_txn.side_effect = lambda t, cb: defer.fail(AssertionError("fail"))
  346. # create devices
  347. u1 = self.register_user("user", "pass")
  348. self.login("user", "pass", device_id="D1")
  349. self.login("user", "pass", device_id="D2")
  350. self.login("user", "pass", device_id="D3")
  351. # delete them again
  352. self.get_success(
  353. self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
  354. )
  355. # We queue up device list updates to be sent over federation, so we
  356. # advance to clear the queue.
  357. self.reactor.advance(1)
  358. self.assertGreaterEqual(mock_send_txn.call_count, 4)
  359. # recover the server
  360. mock_send_txn.side_effect = self.record_transaction
  361. self.hs.get_federation_sender().send_device_messages("host2")
  362. # We queue up device list updates to be sent over federation, so we
  363. # advance to clear the queue.
  364. self.reactor.advance(1)
  365. # for each device, there should be a single update
  366. self.assertEqual(len(self.edus), 3)
  367. stream_id = None
  368. for edu in self.edus:
  369. self.assertEqual(edu["edu_type"], EduTypes.DEVICE_LIST_UPDATE)
  370. c = edu["content"]
  371. self.assertEqual(c["prev_id"], [stream_id] if stream_id is not None else [])
  372. if stream_id is not None:
  373. self.assertGreaterEqual(c["stream_id"], stream_id)
  374. stream_id = c["stream_id"]
  375. devices = {edu["content"]["device_id"] for edu in self.edus}
  376. self.assertEqual({"D1", "D2", "D3"}, devices)
  377. def test_prune_outbound_device_pokes1(self):
  378. """If a destination is unreachable, and the updates are pruned, we should get
  379. a single update.
  380. This case tests the behaviour when the server has never been reachable.
  381. """
  382. mock_send_txn = self.hs.get_federation_transport_client().send_transaction
  383. mock_send_txn.side_effect = lambda t, cb: defer.fail(AssertionError("fail"))
  384. # create devices
  385. u1 = self.register_user("user", "pass")
  386. self.login("user", "pass", device_id="D1")
  387. self.login("user", "pass", device_id="D2")
  388. self.login("user", "pass", device_id="D3")
  389. # delete them again
  390. self.get_success(
  391. self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
  392. )
  393. # We queue up device list updates to be sent over federation, so we
  394. # advance to clear the queue.
  395. self.reactor.advance(1)
  396. self.assertGreaterEqual(mock_send_txn.call_count, 4)
  397. # run the prune job
  398. self.reactor.advance(10)
  399. self.get_success(
  400. self.hs.get_datastores().main._prune_old_outbound_device_pokes(prune_age=1)
  401. )
  402. # recover the server
  403. mock_send_txn.side_effect = self.record_transaction
  404. self.hs.get_federation_sender().send_device_messages("host2")
  405. # We queue up device list updates to be sent over federation, so we
  406. # advance to clear the queue.
  407. self.reactor.advance(1)
  408. # there should be a single update for this user.
  409. self.assertEqual(len(self.edus), 1)
  410. edu = self.edus.pop(0)
  411. self.assertEqual(edu["edu_type"], EduTypes.DEVICE_LIST_UPDATE)
  412. c = edu["content"]
  413. # synapse uses an empty prev_id list to indicate "needs a full resync".
  414. self.assertEqual(c["prev_id"], [])
  415. def test_prune_outbound_device_pokes2(self):
  416. """If a destination is unreachable, and the updates are pruned, we should get
  417. a single update.
  418. This case tests the behaviour when the server was reachable, but then goes
  419. offline.
  420. """
  421. # create first device
  422. u1 = self.register_user("user", "pass")
  423. self.login("user", "pass", device_id="D1")
  424. # expect the update EDU
  425. self.assertEqual(len(self.edus), 1)
  426. self.check_device_update_edu(self.edus.pop(0), u1, "D1", None)
  427. # now the server goes offline
  428. mock_send_txn = self.hs.get_federation_transport_client().send_transaction
  429. mock_send_txn.side_effect = lambda t, cb: defer.fail(AssertionError("fail"))
  430. self.login("user", "pass", device_id="D2")
  431. self.login("user", "pass", device_id="D3")
  432. # We queue up device list updates to be sent over federation, so we
  433. # advance to clear the queue.
  434. self.reactor.advance(1)
  435. # delete them again
  436. self.get_success(
  437. self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
  438. )
  439. self.assertGreaterEqual(mock_send_txn.call_count, 3)
  440. # run the prune job
  441. self.reactor.advance(10)
  442. self.get_success(
  443. self.hs.get_datastores().main._prune_old_outbound_device_pokes(prune_age=1)
  444. )
  445. # recover the server
  446. mock_send_txn.side_effect = self.record_transaction
  447. self.hs.get_federation_sender().send_device_messages("host2")
  448. # We queue up device list updates to be sent over federation, so we
  449. # advance to clear the queue.
  450. self.reactor.advance(1)
  451. # ... and we should get a single update for this user.
  452. self.assertEqual(len(self.edus), 1)
  453. edu = self.edus.pop(0)
  454. self.assertEqual(edu["edu_type"], EduTypes.DEVICE_LIST_UPDATE)
  455. c = edu["content"]
  456. # synapse uses an empty prev_id list to indicate "needs a full resync".
  457. self.assertEqual(c["prev_id"], [])
  458. def check_device_update_edu(
  459. self,
  460. edu: JsonDict,
  461. user_id: str,
  462. device_id: str,
  463. prev_stream_id: Optional[int],
  464. ) -> int:
  465. """Check that the given EDU is an update for the given device
  466. Returns the stream_id.
  467. """
  468. self.assertEqual(edu["edu_type"], EduTypes.DEVICE_LIST_UPDATE)
  469. content = edu["content"]
  470. expected = {
  471. "user_id": user_id,
  472. "device_id": device_id,
  473. "prev_id": [prev_stream_id] if prev_stream_id is not None else [],
  474. }
  475. self.assertLessEqual(expected.items(), content.items())
  476. if prev_stream_id is not None:
  477. self.assertGreaterEqual(content["stream_id"], prev_stream_id)
  478. return content["stream_id"]
  479. def check_signing_key_update_txn(
  480. self,
  481. txn: JsonDict,
  482. ) -> None:
  483. """Check that the txn has an EDU with a signing key update."""
  484. edus = txn["edus"]
  485. self.assertEqual(len(edus), 2)
  486. def generate_and_upload_device_signing_key(
  487. self, user_id: str, device_id: str
  488. ) -> SigningKey:
  489. """Generate a signing keypair for the given device, and upload it"""
  490. sk = key.generate_signing_key(device_id)
  491. device_dict = build_device_dict(user_id, device_id, sk)
  492. self.get_success(
  493. self.hs.get_e2e_keys_handler().upload_keys_for_user(
  494. user_id,
  495. device_id,
  496. {"device_keys": device_dict},
  497. )
  498. )
  499. return sk
  500. def generate_self_id_key() -> SigningKey:
  501. """generate a signing key whose version is its public key
  502. ... as used by the cross-signing-keys.
  503. """
  504. k = key.generate_signing_key("x")
  505. k.version = encode_pubkey(k)
  506. return k
  507. def key_id(k: BaseKey) -> str:
  508. return "%s:%s" % (k.alg, k.version)
  509. def encode_pubkey(sk: SigningKey) -> str:
  510. """Encode the public key corresponding to the given signing key as base64"""
  511. return key.encode_verify_key_base64(key.get_verify_key(sk))
  512. def build_device_dict(user_id: str, device_id: str, sk: SigningKey):
  513. """Build a dict representing the given device"""
  514. return {
  515. "user_id": user_id,
  516. "device_id": device_id,
  517. "algorithms": [
  518. "m.olm.curve25519-aes-sha2",
  519. RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2,
  520. ],
  521. "keys": {
  522. "curve25519:" + device_id: "curve25519+key",
  523. key_id(sk): encode_pubkey(sk),
  524. },
  525. }