test_appservice.py 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798
  1. # Copyright 2015-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Dict, Iterable, List, Optional
  15. from unittest.mock import Mock
  16. from twisted.internet import defer
  17. from twisted.test.proto_helpers import MemoryReactor
  18. import synapse.rest.admin
  19. import synapse.storage
  20. from synapse.appservice import (
  21. ApplicationService,
  22. TransactionOneTimeKeyCounts,
  23. TransactionUnusedFallbackKeys,
  24. )
  25. from synapse.handlers.appservice import ApplicationServicesHandler
  26. from synapse.rest.client import login, receipts, register, room, sendtodevice
  27. from synapse.server import HomeServer
  28. from synapse.types import RoomStreamToken
  29. from synapse.util import Clock
  30. from synapse.util.stringutils import random_string
  31. from tests import unittest
  32. from tests.test_utils import make_awaitable, simple_async_mock
  33. from tests.unittest import override_config
  34. from tests.utils import MockClock
  35. class AppServiceHandlerTestCase(unittest.TestCase):
  36. """Tests the ApplicationServicesHandler."""
  37. def setUp(self):
  38. self.mock_store = Mock()
  39. self.mock_as_api = Mock()
  40. self.mock_scheduler = Mock()
  41. hs = Mock()
  42. hs.get_datastores.return_value = Mock(main=self.mock_store)
  43. self.mock_store.get_received_ts.return_value = make_awaitable(0)
  44. self.mock_store.set_appservice_last_pos.return_value = make_awaitable(None)
  45. self.mock_store.set_appservice_stream_type_pos.return_value = make_awaitable(
  46. None
  47. )
  48. hs.get_application_service_api.return_value = self.mock_as_api
  49. hs.get_application_service_scheduler.return_value = self.mock_scheduler
  50. hs.get_clock.return_value = MockClock()
  51. self.handler = ApplicationServicesHandler(hs)
  52. self.event_source = hs.get_event_sources()
  53. def test_notify_interested_services(self):
  54. interested_service = self._mkservice(is_interested_in_event=True)
  55. services = [
  56. self._mkservice(is_interested_in_event=False),
  57. interested_service,
  58. self._mkservice(is_interested_in_event=False),
  59. ]
  60. self.mock_as_api.query_user.return_value = make_awaitable(True)
  61. self.mock_store.get_app_services.return_value = services
  62. self.mock_store.get_user_by_id.return_value = make_awaitable([])
  63. event = Mock(
  64. sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar"
  65. )
  66. self.mock_store.get_new_events_for_appservice.side_effect = [
  67. make_awaitable((0, [])),
  68. make_awaitable((1, [event])),
  69. ]
  70. self.handler.notify_interested_services(RoomStreamToken(None, 1))
  71. self.mock_scheduler.enqueue_for_appservice.assert_called_once_with(
  72. interested_service, events=[event]
  73. )
  74. def test_query_user_exists_unknown_user(self):
  75. user_id = "@someone:anywhere"
  76. services = [self._mkservice(is_interested_in_event=True)]
  77. services[0].is_interested_in_user.return_value = True
  78. self.mock_store.get_app_services.return_value = services
  79. self.mock_store.get_user_by_id.return_value = make_awaitable(None)
  80. event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
  81. self.mock_as_api.query_user.return_value = make_awaitable(True)
  82. self.mock_store.get_new_events_for_appservice.side_effect = [
  83. make_awaitable((0, [event])),
  84. ]
  85. self.handler.notify_interested_services(RoomStreamToken(None, 0))
  86. self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)
  87. def test_query_user_exists_known_user(self):
  88. user_id = "@someone:anywhere"
  89. services = [self._mkservice(is_interested_in_event=True)]
  90. services[0].is_interested_in_user.return_value = True
  91. self.mock_store.get_app_services.return_value = services
  92. self.mock_store.get_user_by_id.return_value = make_awaitable({"name": user_id})
  93. event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
  94. self.mock_as_api.query_user.return_value = make_awaitable(True)
  95. self.mock_store.get_new_events_for_appservice.side_effect = [
  96. make_awaitable((0, [event])),
  97. ]
  98. self.handler.notify_interested_services(RoomStreamToken(None, 0))
  99. self.assertFalse(
  100. self.mock_as_api.query_user.called,
  101. "query_user called when it shouldn't have been.",
  102. )
  103. def test_query_room_alias_exists(self):
  104. room_alias_str = "#foo:bar"
  105. room_alias = Mock()
  106. room_alias.to_string.return_value = room_alias_str
  107. room_id = "!alpha:bet"
  108. servers = ["aperture"]
  109. interested_service = self._mkservice_alias(is_room_alias_in_namespace=True)
  110. services = [
  111. self._mkservice_alias(is_room_alias_in_namespace=False),
  112. interested_service,
  113. self._mkservice_alias(is_room_alias_in_namespace=False),
  114. ]
  115. self.mock_as_api.query_alias.return_value = make_awaitable(True)
  116. self.mock_store.get_app_services.return_value = services
  117. self.mock_store.get_association_from_room_alias.return_value = make_awaitable(
  118. Mock(room_id=room_id, servers=servers)
  119. )
  120. result = self.successResultOf(
  121. defer.ensureDeferred(self.handler.query_room_alias_exists(room_alias))
  122. )
  123. self.mock_as_api.query_alias.assert_called_once_with(
  124. interested_service, room_alias_str
  125. )
  126. self.assertEqual(result.room_id, room_id)
  127. self.assertEqual(result.servers, servers)
  128. def test_get_3pe_protocols_no_appservices(self):
  129. self.mock_store.get_app_services.return_value = []
  130. response = self.successResultOf(
  131. defer.ensureDeferred(self.handler.get_3pe_protocols("my-protocol"))
  132. )
  133. self.mock_as_api.get_3pe_protocol.assert_not_called()
  134. self.assertEqual(response, {})
  135. def test_get_3pe_protocols_no_protocols(self):
  136. service = self._mkservice(False, [])
  137. self.mock_store.get_app_services.return_value = [service]
  138. response = self.successResultOf(
  139. defer.ensureDeferred(self.handler.get_3pe_protocols())
  140. )
  141. self.mock_as_api.get_3pe_protocol.assert_not_called()
  142. self.assertEqual(response, {})
  143. def test_get_3pe_protocols_protocol_no_response(self):
  144. service = self._mkservice(False, ["my-protocol"])
  145. self.mock_store.get_app_services.return_value = [service]
  146. self.mock_as_api.get_3pe_protocol.return_value = make_awaitable(None)
  147. response = self.successResultOf(
  148. defer.ensureDeferred(self.handler.get_3pe_protocols())
  149. )
  150. self.mock_as_api.get_3pe_protocol.assert_called_once_with(
  151. service, "my-protocol"
  152. )
  153. self.assertEqual(response, {})
  154. def test_get_3pe_protocols_select_one_protocol(self):
  155. service = self._mkservice(False, ["my-protocol"])
  156. self.mock_store.get_app_services.return_value = [service]
  157. self.mock_as_api.get_3pe_protocol.return_value = make_awaitable(
  158. {"x-protocol-data": 42, "instances": []}
  159. )
  160. response = self.successResultOf(
  161. defer.ensureDeferred(self.handler.get_3pe_protocols("my-protocol"))
  162. )
  163. self.mock_as_api.get_3pe_protocol.assert_called_once_with(
  164. service, "my-protocol"
  165. )
  166. self.assertEqual(
  167. response, {"my-protocol": {"x-protocol-data": 42, "instances": []}}
  168. )
  169. def test_get_3pe_protocols_one_protocol(self):
  170. service = self._mkservice(False, ["my-protocol"])
  171. self.mock_store.get_app_services.return_value = [service]
  172. self.mock_as_api.get_3pe_protocol.return_value = make_awaitable(
  173. {"x-protocol-data": 42, "instances": []}
  174. )
  175. response = self.successResultOf(
  176. defer.ensureDeferred(self.handler.get_3pe_protocols())
  177. )
  178. self.mock_as_api.get_3pe_protocol.assert_called_once_with(
  179. service, "my-protocol"
  180. )
  181. self.assertEqual(
  182. response, {"my-protocol": {"x-protocol-data": 42, "instances": []}}
  183. )
  184. def test_get_3pe_protocols_multiple_protocol(self):
  185. service_one = self._mkservice(False, ["my-protocol"])
  186. service_two = self._mkservice(False, ["other-protocol"])
  187. self.mock_store.get_app_services.return_value = [service_one, service_two]
  188. self.mock_as_api.get_3pe_protocol.return_value = make_awaitable(
  189. {"x-protocol-data": 42, "instances": []}
  190. )
  191. response = self.successResultOf(
  192. defer.ensureDeferred(self.handler.get_3pe_protocols())
  193. )
  194. self.mock_as_api.get_3pe_protocol.assert_called()
  195. self.assertEqual(
  196. response,
  197. {
  198. "my-protocol": {"x-protocol-data": 42, "instances": []},
  199. "other-protocol": {"x-protocol-data": 42, "instances": []},
  200. },
  201. )
  202. def test_get_3pe_protocols_multiple_info(self):
  203. service_one = self._mkservice(False, ["my-protocol"])
  204. service_two = self._mkservice(False, ["my-protocol"])
  205. async def get_3pe_protocol(service, unusedProtocol):
  206. if service == service_one:
  207. return {
  208. "x-protocol-data": 42,
  209. "instances": [{"desc": "Alice's service"}],
  210. }
  211. if service == service_two:
  212. return {
  213. "x-protocol-data": 36,
  214. "x-not-used": 45,
  215. "instances": [{"desc": "Bob's service"}],
  216. }
  217. raise Exception("Unexpected service")
  218. self.mock_store.get_app_services.return_value = [service_one, service_two]
  219. self.mock_as_api.get_3pe_protocol = get_3pe_protocol
  220. response = self.successResultOf(
  221. defer.ensureDeferred(self.handler.get_3pe_protocols())
  222. )
  223. # It's expected that the second service's data doesn't appear in the response
  224. self.assertEqual(
  225. response,
  226. {
  227. "my-protocol": {
  228. "x-protocol-data": 42,
  229. "instances": [
  230. {
  231. "desc": "Alice's service",
  232. },
  233. {"desc": "Bob's service"},
  234. ],
  235. },
  236. },
  237. )
  238. def test_notify_interested_services_ephemeral(self):
  239. """
  240. Test sending ephemeral events to the appservice handler are scheduled
  241. to be pushed out to interested appservices, and that the stream ID is
  242. updated accordingly.
  243. """
  244. interested_service = self._mkservice(is_interested_in_event=True)
  245. services = [interested_service]
  246. self.mock_store.get_app_services.return_value = services
  247. self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
  248. 579
  249. )
  250. event = Mock(event_id="event_1")
  251. self.event_source.sources.receipt.get_new_events_as.return_value = (
  252. make_awaitable(([event], None))
  253. )
  254. self.handler.notify_interested_services_ephemeral(
  255. "receipt_key", 580, ["@fakerecipient:example.com"]
  256. )
  257. self.mock_scheduler.enqueue_for_appservice.assert_called_once_with(
  258. interested_service, ephemeral=[event]
  259. )
  260. self.mock_store.set_appservice_stream_type_pos.assert_called_once_with(
  261. interested_service,
  262. "read_receipt",
  263. 580,
  264. )
  265. def test_notify_interested_services_ephemeral_out_of_order(self):
  266. """
  267. Test sending out of order ephemeral events to the appservice handler
  268. are ignored.
  269. """
  270. interested_service = self._mkservice(is_interested_in_event=True)
  271. services = [interested_service]
  272. self.mock_store.get_app_services.return_value = services
  273. self.mock_store.get_type_stream_id_for_appservice.return_value = make_awaitable(
  274. 580
  275. )
  276. event = Mock(event_id="event_1")
  277. self.event_source.sources.receipt.get_new_events_as.return_value = (
  278. make_awaitable(([event], None))
  279. )
  280. self.handler.notify_interested_services_ephemeral(
  281. "receipt_key", 580, ["@fakerecipient:example.com"]
  282. )
  283. # This method will be called, but with an empty list of events
  284. self.mock_scheduler.enqueue_for_appservice.assert_called_once_with(
  285. interested_service, ephemeral=[]
  286. )
  287. def _mkservice(
  288. self, is_interested_in_event: bool, protocols: Optional[Iterable] = None
  289. ) -> Mock:
  290. """
  291. Create a new mock representing an ApplicationService.
  292. Args:
  293. is_interested_in_event: Whether this application service will be considered
  294. interested in all events.
  295. protocols: The third-party protocols that this application service claims to
  296. support.
  297. Returns:
  298. A mock representing the ApplicationService.
  299. """
  300. service = Mock()
  301. service.is_interested_in_event.return_value = make_awaitable(
  302. is_interested_in_event
  303. )
  304. service.token = "mock_service_token"
  305. service.url = "mock_service_url"
  306. service.protocols = protocols
  307. return service
  308. def _mkservice_alias(self, is_room_alias_in_namespace: bool) -> Mock:
  309. """
  310. Create a new mock representing an ApplicationService that is or is not interested
  311. any given room aliase.
  312. Args:
  313. is_room_alias_in_namespace: If true, the application service will be interested
  314. in all room aliases that are queried against it. If false, the application
  315. service will not be interested in any room aliases.
  316. Returns:
  317. A mock representing the ApplicationService.
  318. """
  319. service = Mock()
  320. service.is_room_alias_in_namespace.return_value = is_room_alias_in_namespace
  321. service.token = "mock_service_token"
  322. service.url = "mock_service_url"
  323. return service
  324. class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
  325. """
  326. Tests that the ApplicationServicesHandler sends events to application
  327. services correctly.
  328. """
  329. servlets = [
  330. synapse.rest.admin.register_servlets_for_client_rest_resource,
  331. login.register_servlets,
  332. room.register_servlets,
  333. sendtodevice.register_servlets,
  334. receipts.register_servlets,
  335. ]
  336. def prepare(self, reactor, clock, hs):
  337. # Mock the ApplicationServiceScheduler's _TransactionController's send method so that
  338. # we can track any outgoing ephemeral events
  339. self.send_mock = simple_async_mock()
  340. hs.get_application_service_handler().scheduler.txn_ctrl.send = self.send_mock
  341. # Mock out application services, and allow defining our own in tests
  342. self._services: List[ApplicationService] = []
  343. self.hs.get_datastores().main.get_app_services = Mock(
  344. return_value=self._services
  345. )
  346. # A user on the homeserver.
  347. self.local_user_device_id = "local_device"
  348. self.local_user = self.register_user("local_user", "password")
  349. self.local_user_token = self.login(
  350. "local_user", "password", self.local_user_device_id
  351. )
  352. # A user on the homeserver which lies within an appservice's exclusive user namespace.
  353. self.exclusive_as_user_device_id = "exclusive_as_device"
  354. self.exclusive_as_user = self.register_user("exclusive_as_user", "password")
  355. self.exclusive_as_user_token = self.login(
  356. "exclusive_as_user", "password", self.exclusive_as_user_device_id
  357. )
  358. @unittest.override_config(
  359. {"experimental_features": {"msc2409_to_device_messages_enabled": True}}
  360. )
  361. def test_application_services_receive_local_to_device(self):
  362. """
  363. Test that when a user sends a to-device message to another user
  364. that is an application service's user namespace, the
  365. application service will receive it.
  366. """
  367. interested_appservice = self._register_application_service(
  368. namespaces={
  369. ApplicationService.NS_USERS: [
  370. {
  371. "regex": "@exclusive_as_user:.+",
  372. "exclusive": True,
  373. }
  374. ],
  375. },
  376. )
  377. # Have local_user send a to-device message to exclusive_as_user
  378. message_content = {"some_key": "some really interesting value"}
  379. chan = self.make_request(
  380. "PUT",
  381. "/_matrix/client/r0/sendToDevice/m.room_key_request/3",
  382. content={
  383. "messages": {
  384. self.exclusive_as_user: {
  385. self.exclusive_as_user_device_id: message_content
  386. }
  387. }
  388. },
  389. access_token=self.local_user_token,
  390. )
  391. self.assertEqual(chan.code, 200, chan.result)
  392. # Have exclusive_as_user send a to-device message to local_user
  393. chan = self.make_request(
  394. "PUT",
  395. "/_matrix/client/r0/sendToDevice/m.room_key_request/4",
  396. content={
  397. "messages": {
  398. self.local_user: {self.local_user_device_id: message_content}
  399. }
  400. },
  401. access_token=self.exclusive_as_user_token,
  402. )
  403. self.assertEqual(chan.code, 200, chan.result)
  404. # Check if our application service - that is interested in exclusive_as_user - received
  405. # the to-device message as part of an AS transaction.
  406. # Only the local_user -> exclusive_as_user to-device message should have been forwarded to the AS.
  407. #
  408. # The uninterested application service should not have been notified at all.
  409. self.send_mock.assert_called_once()
  410. (
  411. service,
  412. _events,
  413. _ephemeral,
  414. to_device_messages,
  415. _otks,
  416. _fbks,
  417. ) = self.send_mock.call_args[0]
  418. # Assert that this was the same to-device message that local_user sent
  419. self.assertEqual(service, interested_appservice)
  420. self.assertEqual(to_device_messages[0]["type"], "m.room_key_request")
  421. self.assertEqual(to_device_messages[0]["sender"], self.local_user)
  422. # Additional fields 'to_user_id' and 'to_device_id' specifically for
  423. # to-device messages via the AS API
  424. self.assertEqual(to_device_messages[0]["to_user_id"], self.exclusive_as_user)
  425. self.assertEqual(
  426. to_device_messages[0]["to_device_id"], self.exclusive_as_user_device_id
  427. )
  428. self.assertEqual(to_device_messages[0]["content"], message_content)
  429. @unittest.override_config(
  430. {"experimental_features": {"msc2409_to_device_messages_enabled": True}}
  431. )
  432. def test_application_services_receive_bursts_of_to_device(self):
  433. """
  434. Test that when a user sends >100 to-device messages at once, any
  435. interested AS's will receive them in separate transactions.
  436. Also tests that uninterested application services do not receive messages.
  437. """
  438. # Register two application services with exclusive interest in a user
  439. interested_appservices = []
  440. for _ in range(2):
  441. appservice = self._register_application_service(
  442. namespaces={
  443. ApplicationService.NS_USERS: [
  444. {
  445. "regex": "@exclusive_as_user:.+",
  446. "exclusive": True,
  447. }
  448. ],
  449. },
  450. )
  451. interested_appservices.append(appservice)
  452. # ...and an application service which does not have any user interest.
  453. self._register_application_service()
  454. to_device_message_content = {
  455. "some key": "some interesting value",
  456. }
  457. # We need to send a large burst of to-device messages. We also would like to
  458. # include them all in the same application service transaction so that we can
  459. # test large transactions.
  460. #
  461. # To do this, we can send a single to-device message to many user devices at
  462. # once.
  463. #
  464. # We insert number_of_messages - 1 messages into the database directly. We'll then
  465. # send a final to-device message to the real device, which will also kick off
  466. # an AS transaction (as just inserting messages into the DB won't).
  467. number_of_messages = 150
  468. fake_device_ids = [f"device_{num}" for num in range(number_of_messages - 1)]
  469. messages = {
  470. self.exclusive_as_user: {
  471. device_id: to_device_message_content for device_id in fake_device_ids
  472. }
  473. }
  474. # Create a fake device per message. We can't send to-device messages to
  475. # a device that doesn't exist.
  476. self.get_success(
  477. self.hs.get_datastores().main.db_pool.simple_insert_many(
  478. desc="test_application_services_receive_burst_of_to_device",
  479. table="devices",
  480. keys=("user_id", "device_id"),
  481. values=[
  482. (
  483. self.exclusive_as_user,
  484. device_id,
  485. )
  486. for device_id in fake_device_ids
  487. ],
  488. )
  489. )
  490. # Seed the device_inbox table with our fake messages
  491. self.get_success(
  492. self.hs.get_datastores().main.add_messages_to_device_inbox(messages, {})
  493. )
  494. # Now have local_user send a final to-device message to exclusive_as_user. All unsent
  495. # to-device messages should be sent to any application services
  496. # interested in exclusive_as_user.
  497. chan = self.make_request(
  498. "PUT",
  499. "/_matrix/client/r0/sendToDevice/m.room_key_request/4",
  500. content={
  501. "messages": {
  502. self.exclusive_as_user: {
  503. self.exclusive_as_user_device_id: to_device_message_content
  504. }
  505. }
  506. },
  507. access_token=self.local_user_token,
  508. )
  509. self.assertEqual(chan.code, 200, chan.result)
  510. self.send_mock.assert_called()
  511. # Count the total number of to-device messages that were sent out per-service.
  512. # Ensure that we only sent to-device messages to interested services, and that
  513. # each interested service received the full count of to-device messages.
  514. service_id_to_message_count: Dict[str, int] = {}
  515. for call in self.send_mock.call_args_list:
  516. service, _events, _ephemeral, to_device_messages, _otks, _fbks = call[0]
  517. # Check that this was made to an interested service
  518. self.assertIn(service, interested_appservices)
  519. # Add to the count of messages for this application service
  520. service_id_to_message_count.setdefault(service.id, 0)
  521. service_id_to_message_count[service.id] += len(to_device_messages)
  522. # Assert that each interested service received the full count of messages
  523. for count in service_id_to_message_count.values():
  524. self.assertEqual(count, number_of_messages)
  525. def _register_application_service(
  526. self,
  527. namespaces: Optional[Dict[str, Iterable[Dict]]] = None,
  528. ) -> ApplicationService:
  529. """
  530. Register a new application service, with the given namespaces of interest.
  531. Args:
  532. namespaces: A dictionary containing any user, room or alias namespaces that
  533. the application service is interested in.
  534. Returns:
  535. The registered application service.
  536. """
  537. # Create an application service
  538. appservice = ApplicationService(
  539. token=random_string(10),
  540. hostname="example.com",
  541. id=random_string(10),
  542. sender="@as:example.com",
  543. rate_limited=False,
  544. namespaces=namespaces,
  545. supports_ephemeral=True,
  546. )
  547. # Register the application service
  548. self._services.append(appservice)
  549. return appservice
  550. class ApplicationServicesHandlerOtkCountsTestCase(unittest.HomeserverTestCase):
  551. # Argument indices for pulling out arguments from a `send_mock`.
  552. ARG_OTK_COUNTS = 4
  553. ARG_FALLBACK_KEYS = 5
  554. servlets = [
  555. synapse.rest.admin.register_servlets_for_client_rest_resource,
  556. login.register_servlets,
  557. register.register_servlets,
  558. room.register_servlets,
  559. sendtodevice.register_servlets,
  560. receipts.register_servlets,
  561. ]
  562. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  563. # Mock the ApplicationServiceScheduler's _TransactionController's send method so that
  564. # we can track what's going out
  565. self.send_mock = simple_async_mock()
  566. hs.get_application_service_handler().scheduler.txn_ctrl.send = self.send_mock # type: ignore[assignment] # We assign to a method.
  567. # Define an application service for the tests
  568. self._service_token = "VERYSECRET"
  569. self._service = ApplicationService(
  570. self._service_token,
  571. "as1.invalid",
  572. "as1",
  573. "@as.sender:test",
  574. namespaces={
  575. "users": [
  576. {"regex": "@_as_.*:test", "exclusive": True},
  577. {"regex": "@as.sender:test", "exclusive": True},
  578. ]
  579. },
  580. msc3202_transaction_extensions=True,
  581. )
  582. self.hs.get_datastores().main.services_cache = [self._service]
  583. # Register some appservice users
  584. self._sender_user, self._sender_device = self.register_appservice_user(
  585. "as.sender", self._service_token
  586. )
  587. self._namespaced_user, self._namespaced_device = self.register_appservice_user(
  588. "_as_user1", self._service_token
  589. )
  590. # Register a real user as well.
  591. self._real_user = self.register_user("real.user", "meow")
  592. self._real_user_token = self.login("real.user", "meow")
  593. async def _add_otks_for_device(
  594. self, user_id: str, device_id: str, otk_count: int
  595. ) -> None:
  596. """
  597. Add some dummy keys. It doesn't matter if they're not a real algorithm;
  598. that should be opaque to the server anyway.
  599. """
  600. await self.hs.get_datastores().main.add_e2e_one_time_keys(
  601. user_id,
  602. device_id,
  603. self.clock.time_msec(),
  604. [("algo", f"k{i}", "{}") for i in range(otk_count)],
  605. )
  606. async def _add_fallback_key_for_device(
  607. self, user_id: str, device_id: str, used: bool
  608. ) -> None:
  609. """
  610. Adds a fake fallback key to a device, optionally marking it as used
  611. right away.
  612. """
  613. store = self.hs.get_datastores().main
  614. await store.set_e2e_fallback_keys(user_id, device_id, {"algo:fk": "fall back!"})
  615. if used is True:
  616. # Mark the key as used
  617. await store.db_pool.simple_update_one(
  618. table="e2e_fallback_keys_json",
  619. keyvalues={
  620. "user_id": user_id,
  621. "device_id": device_id,
  622. "algorithm": "algo",
  623. "key_id": "fk",
  624. },
  625. updatevalues={"used": True},
  626. desc="_get_fallback_key_set_used",
  627. )
  628. def _set_up_devices_and_a_room(self) -> str:
  629. """
  630. Helper to set up devices for all the users
  631. and a room for the users to talk in.
  632. """
  633. async def preparation():
  634. await self._add_otks_for_device(self._sender_user, self._sender_device, 42)
  635. await self._add_fallback_key_for_device(
  636. self._sender_user, self._sender_device, used=True
  637. )
  638. await self._add_otks_for_device(
  639. self._namespaced_user, self._namespaced_device, 36
  640. )
  641. await self._add_fallback_key_for_device(
  642. self._namespaced_user, self._namespaced_device, used=False
  643. )
  644. # Register a device for the real user, too, so that we can later ensure
  645. # that we don't leak information to the AS about the non-AS user.
  646. await self.hs.get_datastores().main.store_device(
  647. self._real_user, "REALDEV", "UltraMatrix 3000"
  648. )
  649. await self._add_otks_for_device(self._real_user, "REALDEV", 50)
  650. self.get_success(preparation())
  651. room_id = self.helper.create_room_as(
  652. self._real_user, is_public=True, tok=self._real_user_token
  653. )
  654. self.helper.join(
  655. room_id,
  656. self._namespaced_user,
  657. tok=self._service_token,
  658. appservice_user_id=self._namespaced_user,
  659. )
  660. # Check it was called for sanity. (This was to send the join event to the AS.)
  661. self.send_mock.assert_called()
  662. self.send_mock.reset_mock()
  663. return room_id
  664. @override_config(
  665. {"experimental_features": {"msc3202_transaction_extensions": True}}
  666. )
  667. def test_application_services_receive_otk_counts_and_fallback_key_usages_with_pdus(
  668. self,
  669. ) -> None:
  670. """
  671. Tests that:
  672. - the AS receives one-time key counts and unused fallback keys for:
  673. - the specified sender; and
  674. - any user who is in receipt of the PDUs
  675. """
  676. room_id = self._set_up_devices_and_a_room()
  677. # Send a message into the AS's room
  678. self.helper.send(room_id, "woof woof", tok=self._real_user_token)
  679. # Capture what was sent as an AS transaction.
  680. self.send_mock.assert_called()
  681. last_args, _last_kwargs = self.send_mock.call_args
  682. otks: Optional[TransactionOneTimeKeyCounts] = last_args[self.ARG_OTK_COUNTS]
  683. unused_fallbacks: Optional[TransactionUnusedFallbackKeys] = last_args[
  684. self.ARG_FALLBACK_KEYS
  685. ]
  686. self.assertEqual(
  687. otks,
  688. {
  689. "@as.sender:test": {self._sender_device: {"algo": 42}},
  690. "@_as_user1:test": {self._namespaced_device: {"algo": 36}},
  691. },
  692. )
  693. self.assertEqual(
  694. unused_fallbacks,
  695. {
  696. "@as.sender:test": {self._sender_device: []},
  697. "@_as_user1:test": {self._namespaced_device: ["algo"]},
  698. },
  699. )