test_federation_sender.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2019 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from typing import Optional
  16. from mock import Mock
  17. from signedjson import key, sign
  18. from signedjson.types import BaseKey, SigningKey
  19. from twisted.internet import defer
  20. from synapse.api.constants import RoomEncryptionAlgorithms
  21. from synapse.rest import admin
  22. from synapse.rest.client.v1 import login
  23. from synapse.types import JsonDict, ReadReceipt
  24. from tests.unittest import HomeserverTestCase, override_config
  25. class FederationSenderReceiptsTestCases(HomeserverTestCase):
  26. def make_homeserver(self, reactor, clock):
  27. return self.setup_test_homeserver(
  28. state_handler=Mock(spec=["get_current_hosts_in_room"]),
  29. federation_transport_client=Mock(spec=["send_transaction"]),
  30. )
  31. @override_config({"send_federation": True})
  32. def test_send_receipts(self):
  33. mock_state_handler = self.hs.get_state_handler()
  34. mock_state_handler.get_current_hosts_in_room.return_value = ["test", "host2"]
  35. mock_send_transaction = (
  36. self.hs.get_federation_transport_client().send_transaction
  37. )
  38. mock_send_transaction.return_value = defer.succeed({})
  39. sender = self.hs.get_federation_sender()
  40. receipt = ReadReceipt(
  41. "room_id", "m.read", "user_id", ["event_id"], {"ts": 1234}
  42. )
  43. self.successResultOf(sender.send_read_receipt(receipt))
  44. self.pump()
  45. # expect a call to send_transaction
  46. mock_send_transaction.assert_called_once()
  47. json_cb = mock_send_transaction.call_args[0][1]
  48. data = json_cb()
  49. self.assertEqual(
  50. data["edus"],
  51. [
  52. {
  53. "edu_type": "m.receipt",
  54. "content": {
  55. "room_id": {
  56. "m.read": {
  57. "user_id": {
  58. "event_ids": ["event_id"],
  59. "data": {"ts": 1234},
  60. }
  61. }
  62. }
  63. },
  64. }
  65. ],
  66. )
  67. @override_config({"send_federation": True})
  68. def test_send_receipts_with_backoff(self):
  69. """Send two receipts in quick succession; the second should be flushed, but
  70. only after 20ms"""
  71. mock_state_handler = self.hs.get_state_handler()
  72. mock_state_handler.get_current_hosts_in_room.return_value = ["test", "host2"]
  73. mock_send_transaction = (
  74. self.hs.get_federation_transport_client().send_transaction
  75. )
  76. mock_send_transaction.return_value = defer.succeed({})
  77. sender = self.hs.get_federation_sender()
  78. receipt = ReadReceipt(
  79. "room_id", "m.read", "user_id", ["event_id"], {"ts": 1234}
  80. )
  81. self.successResultOf(sender.send_read_receipt(receipt))
  82. self.pump()
  83. # expect a call to send_transaction
  84. mock_send_transaction.assert_called_once()
  85. json_cb = mock_send_transaction.call_args[0][1]
  86. data = json_cb()
  87. self.assertEqual(
  88. data["edus"],
  89. [
  90. {
  91. "edu_type": "m.receipt",
  92. "content": {
  93. "room_id": {
  94. "m.read": {
  95. "user_id": {
  96. "event_ids": ["event_id"],
  97. "data": {"ts": 1234},
  98. }
  99. }
  100. }
  101. },
  102. }
  103. ],
  104. )
  105. mock_send_transaction.reset_mock()
  106. # send the second RR
  107. receipt = ReadReceipt(
  108. "room_id", "m.read", "user_id", ["other_id"], {"ts": 1234}
  109. )
  110. self.successResultOf(sender.send_read_receipt(receipt))
  111. self.pump()
  112. mock_send_transaction.assert_not_called()
  113. self.reactor.advance(19)
  114. mock_send_transaction.assert_not_called()
  115. self.reactor.advance(10)
  116. mock_send_transaction.assert_called_once()
  117. json_cb = mock_send_transaction.call_args[0][1]
  118. data = json_cb()
  119. self.assertEqual(
  120. data["edus"],
  121. [
  122. {
  123. "edu_type": "m.receipt",
  124. "content": {
  125. "room_id": {
  126. "m.read": {
  127. "user_id": {
  128. "event_ids": ["other_id"],
  129. "data": {"ts": 1234},
  130. }
  131. }
  132. }
  133. },
  134. }
  135. ],
  136. )
  137. class FederationSenderDevicesTestCases(HomeserverTestCase):
  138. servlets = [
  139. admin.register_servlets,
  140. login.register_servlets,
  141. ]
  142. def make_homeserver(self, reactor, clock):
  143. return self.setup_test_homeserver(
  144. state_handler=Mock(spec=["get_current_hosts_in_room"]),
  145. federation_transport_client=Mock(spec=["send_transaction"]),
  146. )
  147. def default_config(self):
  148. c = super().default_config()
  149. c["send_federation"] = True
  150. return c
  151. def prepare(self, reactor, clock, hs):
  152. # stub out get_current_hosts_in_room
  153. mock_state_handler = hs.get_state_handler()
  154. mock_state_handler.get_current_hosts_in_room.return_value = ["test", "host2"]
  155. # stub out get_users_who_share_room_with_user so that it claims that
  156. # `@user2:host2` is in the room
  157. def get_users_who_share_room_with_user(user_id):
  158. return defer.succeed({"@user2:host2"})
  159. hs.get_datastore().get_users_who_share_room_with_user = (
  160. get_users_who_share_room_with_user
  161. )
  162. # whenever send_transaction is called, record the edu data
  163. self.edus = []
  164. self.hs.get_federation_transport_client().send_transaction.side_effect = (
  165. self.record_transaction
  166. )
  167. def record_transaction(self, txn, json_cb):
  168. data = json_cb()
  169. self.edus.extend(data["edus"])
  170. return defer.succeed({})
  171. def test_send_device_updates(self):
  172. """Basic case: each device update should result in an EDU"""
  173. # create a device
  174. u1 = self.register_user("user", "pass")
  175. self.login(u1, "pass", device_id="D1")
  176. # expect one edu
  177. self.assertEqual(len(self.edus), 1)
  178. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", None)
  179. # a second call should produce no new device EDUs
  180. self.hs.get_federation_sender().send_device_messages("host2")
  181. self.pump()
  182. self.assertEqual(self.edus, [])
  183. # a second device
  184. self.login("user", "pass", device_id="D2")
  185. self.assertEqual(len(self.edus), 1)
  186. self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id)
  187. def test_upload_signatures(self):
  188. """Uploading signatures on some devices should produce updates for that user"""
  189. e2e_handler = self.hs.get_e2e_keys_handler()
  190. # register two devices
  191. u1 = self.register_user("user", "pass")
  192. self.login(u1, "pass", device_id="D1")
  193. self.login(u1, "pass", device_id="D2")
  194. # expect two edus
  195. self.assertEqual(len(self.edus), 2)
  196. stream_id = None
  197. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", stream_id)
  198. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id)
  199. # upload signing keys for each device
  200. device1_signing_key = self.generate_and_upload_device_signing_key(u1, "D1")
  201. device2_signing_key = self.generate_and_upload_device_signing_key(u1, "D2")
  202. # expect two more edus
  203. self.assertEqual(len(self.edus), 2)
  204. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", stream_id)
  205. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id)
  206. # upload master key and self-signing key
  207. master_signing_key = generate_self_id_key()
  208. master_key = {
  209. "user_id": u1,
  210. "usage": ["master"],
  211. "keys": {key_id(master_signing_key): encode_pubkey(master_signing_key)},
  212. }
  213. # private key: HvQBbU+hc2Zr+JP1sE0XwBe1pfZZEYtJNPJLZJtS+F8
  214. selfsigning_signing_key = generate_self_id_key()
  215. selfsigning_key = {
  216. "user_id": u1,
  217. "usage": ["self_signing"],
  218. "keys": {
  219. key_id(selfsigning_signing_key): encode_pubkey(selfsigning_signing_key)
  220. },
  221. }
  222. sign.sign_json(selfsigning_key, u1, master_signing_key)
  223. cross_signing_keys = {
  224. "master_key": master_key,
  225. "self_signing_key": selfsigning_key,
  226. }
  227. self.get_success(
  228. e2e_handler.upload_signing_keys_for_user(u1, cross_signing_keys)
  229. )
  230. # expect signing key update edu
  231. self.assertEqual(len(self.edus), 1)
  232. self.assertEqual(self.edus.pop(0)["edu_type"], "org.matrix.signing_key_update")
  233. # sign the devices
  234. d1_json = build_device_dict(u1, "D1", device1_signing_key)
  235. sign.sign_json(d1_json, u1, selfsigning_signing_key)
  236. d2_json = build_device_dict(u1, "D2", device2_signing_key)
  237. sign.sign_json(d2_json, u1, selfsigning_signing_key)
  238. ret = self.get_success(
  239. e2e_handler.upload_signatures_for_device_keys(
  240. u1, {u1: {"D1": d1_json, "D2": d2_json}},
  241. )
  242. )
  243. self.assertEqual(ret["failures"], {})
  244. # expect two edus, in one or two transactions. We don't know what order the
  245. # devices will be updated.
  246. self.assertEqual(len(self.edus), 2)
  247. stream_id = None # FIXME: there is a discontinuity in the stream IDs: see #7142
  248. for edu in self.edus:
  249. self.assertEqual(edu["edu_type"], "m.device_list_update")
  250. c = edu["content"]
  251. if stream_id is not None:
  252. self.assertEqual(c["prev_id"], [stream_id])
  253. self.assertGreaterEqual(c["stream_id"], stream_id)
  254. stream_id = c["stream_id"]
  255. devices = {edu["content"]["device_id"] for edu in self.edus}
  256. self.assertEqual({"D1", "D2"}, devices)
  257. def test_delete_devices(self):
  258. """If devices are deleted, that should result in EDUs too"""
  259. # create devices
  260. u1 = self.register_user("user", "pass")
  261. self.login("user", "pass", device_id="D1")
  262. self.login("user", "pass", device_id="D2")
  263. self.login("user", "pass", device_id="D3")
  264. # expect three edus
  265. self.assertEqual(len(self.edus), 3)
  266. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D1", None)
  267. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D2", stream_id)
  268. stream_id = self.check_device_update_edu(self.edus.pop(0), u1, "D3", stream_id)
  269. # delete them again
  270. self.get_success(
  271. self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
  272. )
  273. # expect three edus, in an unknown order
  274. self.assertEqual(len(self.edus), 3)
  275. for edu in self.edus:
  276. self.assertEqual(edu["edu_type"], "m.device_list_update")
  277. c = edu["content"]
  278. self.assertGreaterEqual(
  279. c.items(),
  280. {"user_id": u1, "prev_id": [stream_id], "deleted": True}.items(),
  281. )
  282. self.assertGreaterEqual(c["stream_id"], stream_id)
  283. stream_id = c["stream_id"]
  284. devices = {edu["content"]["device_id"] for edu in self.edus}
  285. self.assertEqual({"D1", "D2", "D3"}, devices)
  286. def test_unreachable_server(self):
  287. """If the destination server is unreachable, all the updates should get sent on
  288. recovery
  289. """
  290. mock_send_txn = self.hs.get_federation_transport_client().send_transaction
  291. mock_send_txn.side_effect = lambda t, cb: defer.fail("fail")
  292. # create devices
  293. u1 = self.register_user("user", "pass")
  294. self.login("user", "pass", device_id="D1")
  295. self.login("user", "pass", device_id="D2")
  296. self.login("user", "pass", device_id="D3")
  297. # delete them again
  298. self.get_success(
  299. self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
  300. )
  301. self.assertGreaterEqual(mock_send_txn.call_count, 4)
  302. # recover the server
  303. mock_send_txn.side_effect = self.record_transaction
  304. self.hs.get_federation_sender().send_device_messages("host2")
  305. self.pump()
  306. # for each device, there should be a single update
  307. self.assertEqual(len(self.edus), 3)
  308. stream_id = None
  309. for edu in self.edus:
  310. self.assertEqual(edu["edu_type"], "m.device_list_update")
  311. c = edu["content"]
  312. self.assertEqual(c["prev_id"], [stream_id] if stream_id is not None else [])
  313. if stream_id is not None:
  314. self.assertGreaterEqual(c["stream_id"], stream_id)
  315. stream_id = c["stream_id"]
  316. devices = {edu["content"]["device_id"] for edu in self.edus}
  317. self.assertEqual({"D1", "D2", "D3"}, devices)
  318. def test_prune_outbound_device_pokes1(self):
  319. """If a destination is unreachable, and the updates are pruned, we should get
  320. a single update.
  321. This case tests the behaviour when the server has never been reachable.
  322. """
  323. mock_send_txn = self.hs.get_federation_transport_client().send_transaction
  324. mock_send_txn.side_effect = lambda t, cb: defer.fail("fail")
  325. # create devices
  326. u1 = self.register_user("user", "pass")
  327. self.login("user", "pass", device_id="D1")
  328. self.login("user", "pass", device_id="D2")
  329. self.login("user", "pass", device_id="D3")
  330. # delete them again
  331. self.get_success(
  332. self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
  333. )
  334. self.assertGreaterEqual(mock_send_txn.call_count, 4)
  335. # run the prune job
  336. self.reactor.advance(10)
  337. self.get_success(
  338. self.hs.get_datastore()._prune_old_outbound_device_pokes(prune_age=1)
  339. )
  340. # recover the server
  341. mock_send_txn.side_effect = self.record_transaction
  342. self.hs.get_federation_sender().send_device_messages("host2")
  343. self.pump()
  344. # there should be a single update for this user.
  345. self.assertEqual(len(self.edus), 1)
  346. edu = self.edus.pop(0)
  347. self.assertEqual(edu["edu_type"], "m.device_list_update")
  348. c = edu["content"]
  349. # synapse uses an empty prev_id list to indicate "needs a full resync".
  350. self.assertEqual(c["prev_id"], [])
  351. def test_prune_outbound_device_pokes2(self):
  352. """If a destination is unreachable, and the updates are pruned, we should get
  353. a single update.
  354. This case tests the behaviour when the server was reachable, but then goes
  355. offline.
  356. """
  357. # create first device
  358. u1 = self.register_user("user", "pass")
  359. self.login("user", "pass", device_id="D1")
  360. # expect the update EDU
  361. self.assertEqual(len(self.edus), 1)
  362. self.check_device_update_edu(self.edus.pop(0), u1, "D1", None)
  363. # now the server goes offline
  364. mock_send_txn = self.hs.get_federation_transport_client().send_transaction
  365. mock_send_txn.side_effect = lambda t, cb: defer.fail("fail")
  366. self.login("user", "pass", device_id="D2")
  367. self.login("user", "pass", device_id="D3")
  368. # delete them again
  369. self.get_success(
  370. self.hs.get_device_handler().delete_devices(u1, ["D1", "D2", "D3"])
  371. )
  372. self.assertGreaterEqual(mock_send_txn.call_count, 3)
  373. # run the prune job
  374. self.reactor.advance(10)
  375. self.get_success(
  376. self.hs.get_datastore()._prune_old_outbound_device_pokes(prune_age=1)
  377. )
  378. # recover the server
  379. mock_send_txn.side_effect = self.record_transaction
  380. self.hs.get_federation_sender().send_device_messages("host2")
  381. self.pump()
  382. # ... and we should get a single update for this user.
  383. self.assertEqual(len(self.edus), 1)
  384. edu = self.edus.pop(0)
  385. self.assertEqual(edu["edu_type"], "m.device_list_update")
  386. c = edu["content"]
  387. # synapse uses an empty prev_id list to indicate "needs a full resync".
  388. self.assertEqual(c["prev_id"], [])
  389. def check_device_update_edu(
  390. self,
  391. edu: JsonDict,
  392. user_id: str,
  393. device_id: str,
  394. prev_stream_id: Optional[int],
  395. ) -> int:
  396. """Check that the given EDU is an update for the given device
  397. Returns the stream_id.
  398. """
  399. self.assertEqual(edu["edu_type"], "m.device_list_update")
  400. content = edu["content"]
  401. expected = {
  402. "user_id": user_id,
  403. "device_id": device_id,
  404. "prev_id": [prev_stream_id] if prev_stream_id is not None else [],
  405. }
  406. self.assertLessEqual(expected.items(), content.items())
  407. if prev_stream_id is not None:
  408. self.assertGreaterEqual(content["stream_id"], prev_stream_id)
  409. return content["stream_id"]
  410. def check_signing_key_update_txn(self, txn: JsonDict,) -> None:
  411. """Check that the txn has an EDU with a signing key update.
  412. """
  413. edus = txn["edus"]
  414. self.assertEqual(len(edus), 1)
  415. def generate_and_upload_device_signing_key(
  416. self, user_id: str, device_id: str
  417. ) -> SigningKey:
  418. """Generate a signing keypair for the given device, and upload it"""
  419. sk = key.generate_signing_key(device_id)
  420. device_dict = build_device_dict(user_id, device_id, sk)
  421. self.get_success(
  422. self.hs.get_e2e_keys_handler().upload_keys_for_user(
  423. user_id, device_id, {"device_keys": device_dict},
  424. )
  425. )
  426. return sk
  427. def generate_self_id_key() -> SigningKey:
  428. """generate a signing key whose version is its public key
  429. ... as used by the cross-signing-keys.
  430. """
  431. k = key.generate_signing_key("x")
  432. k.version = encode_pubkey(k)
  433. return k
  434. def key_id(k: BaseKey) -> str:
  435. return "%s:%s" % (k.alg, k.version)
  436. def encode_pubkey(sk: SigningKey) -> str:
  437. """Encode the public key corresponding to the given signing key as base64"""
  438. return key.encode_verify_key_base64(key.get_verify_key(sk))
  439. def build_device_dict(user_id: str, device_id: str, sk: SigningKey):
  440. """Build a dict representing the given device"""
  441. return {
  442. "user_id": user_id,
  443. "device_id": device_id,
  444. "algorithms": [
  445. "m.olm.curve25519-aes-sha2",
  446. RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2,
  447. ],
  448. "keys": {
  449. "curve25519:" + device_id: "curve25519+key",
  450. key_id(sk): encode_pubkey(sk),
  451. },
  452. }