test_appservice.py 49 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291
  1. # Copyright 2015-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Dict, Iterable, List, Optional
  15. from unittest.mock import AsyncMock, Mock
  16. from parameterized import parameterized
  17. from twisted.internet import defer
  18. from twisted.test.proto_helpers import MemoryReactor
  19. import synapse.rest.admin
  20. import synapse.storage
  21. from synapse.api.constants import EduTypes, EventTypes
  22. from synapse.appservice import (
  23. ApplicationService,
  24. TransactionOneTimeKeysCount,
  25. TransactionUnusedFallbackKeys,
  26. )
  27. from synapse.handlers.appservice import ApplicationServicesHandler
  28. from synapse.rest.client import login, receipts, register, room, sendtodevice
  29. from synapse.server import HomeServer
  30. from synapse.types import (
  31. JsonDict,
  32. MultiWriterStreamToken,
  33. RoomStreamToken,
  34. StreamKeyType,
  35. )
  36. from synapse.util import Clock
  37. from synapse.util.stringutils import random_string
  38. from tests import unittest
  39. from tests.test_utils import event_injection
  40. from tests.unittest import override_config
  41. from tests.utils import MockClock
  42. class AppServiceHandlerTestCase(unittest.TestCase):
  43. """Tests the ApplicationServicesHandler."""
  44. def setUp(self) -> None:
  45. self.mock_store = Mock()
  46. self.mock_as_api = AsyncMock()
  47. self.mock_scheduler = Mock()
  48. hs = Mock()
  49. hs.get_datastores.return_value = Mock(main=self.mock_store)
  50. self.mock_store.get_appservice_last_pos = AsyncMock(return_value=None)
  51. self.mock_store.set_appservice_last_pos = AsyncMock(return_value=None)
  52. self.mock_store.set_appservice_stream_type_pos = AsyncMock(return_value=None)
  53. hs.get_application_service_api.return_value = self.mock_as_api
  54. hs.get_application_service_scheduler.return_value = self.mock_scheduler
  55. hs.get_clock.return_value = MockClock()
  56. self.handler = ApplicationServicesHandler(hs)
  57. self.event_source = hs.get_event_sources()
  58. def test_notify_interested_services(self) -> None:
  59. interested_service = self._mkservice(is_interested_in_event=True)
  60. services = [
  61. self._mkservice(is_interested_in_event=False),
  62. interested_service,
  63. self._mkservice(is_interested_in_event=False),
  64. ]
  65. self.mock_as_api.query_user.return_value = True
  66. self.mock_store.get_app_services.return_value = services
  67. self.mock_store.get_user_by_id = AsyncMock(return_value=[])
  68. event = Mock(
  69. sender="@someone:anywhere", type="m.room.message", room_id="!foo:bar"
  70. )
  71. self.mock_store.get_all_new_event_ids_stream = AsyncMock(
  72. side_effect=[
  73. (0, {}),
  74. (1, {event.event_id: 0}),
  75. ]
  76. )
  77. self.mock_store.get_events_as_list = AsyncMock(
  78. side_effect=[
  79. [],
  80. [event],
  81. ]
  82. )
  83. self.handler.notify_interested_services(RoomStreamToken(stream=1))
  84. self.mock_scheduler.enqueue_for_appservice.assert_called_once_with(
  85. interested_service, events=[event]
  86. )
  87. def test_query_user_exists_unknown_user(self) -> None:
  88. user_id = "@someone:anywhere"
  89. services = [self._mkservice(is_interested_in_event=True)]
  90. services[0].is_interested_in_user.return_value = True
  91. self.mock_store.get_app_services.return_value = services
  92. self.mock_store.get_user_by_id = AsyncMock(return_value=None)
  93. event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
  94. self.mock_as_api.query_user.return_value = True
  95. self.mock_store.get_all_new_event_ids_stream = AsyncMock(
  96. side_effect=[
  97. (0, {event.event_id: 0}),
  98. ]
  99. )
  100. self.mock_store.get_events_as_list = AsyncMock(side_effect=[[event]])
  101. self.handler.notify_interested_services(RoomStreamToken(stream=0))
  102. self.mock_as_api.query_user.assert_called_once_with(services[0], user_id)
  103. def test_query_user_exists_known_user(self) -> None:
  104. user_id = "@someone:anywhere"
  105. services = [self._mkservice(is_interested_in_event=True)]
  106. services[0].is_interested_in_user.return_value = True
  107. self.mock_store.get_app_services.return_value = services
  108. self.mock_store.get_user_by_id = AsyncMock(return_value={"name": user_id})
  109. event = Mock(sender=user_id, type="m.room.message", room_id="!foo:bar")
  110. self.mock_as_api.query_user.return_value = True
  111. self.mock_store.get_all_new_event_ids_stream = AsyncMock(
  112. side_effect=[
  113. (0, [event], {event.event_id: 0}),
  114. ]
  115. )
  116. self.handler.notify_interested_services(RoomStreamToken(stream=0))
  117. self.assertFalse(
  118. self.mock_as_api.query_user.called,
  119. "query_user called when it shouldn't have been.",
  120. )
  121. def test_query_room_alias_exists(self) -> None:
  122. room_alias_str = "#foo:bar"
  123. room_alias = Mock()
  124. room_alias.to_string.return_value = room_alias_str
  125. room_id = "!alpha:bet"
  126. servers = ["aperture"]
  127. interested_service = self._mkservice_alias(is_room_alias_in_namespace=True)
  128. services = [
  129. self._mkservice_alias(is_room_alias_in_namespace=False),
  130. interested_service,
  131. self._mkservice_alias(is_room_alias_in_namespace=False),
  132. ]
  133. self.mock_as_api.query_alias = AsyncMock(return_value=True)
  134. self.mock_store.get_app_services.return_value = services
  135. self.mock_store.get_association_from_room_alias = AsyncMock(
  136. return_value=Mock(room_id=room_id, servers=servers)
  137. )
  138. result = self.successResultOf(
  139. defer.ensureDeferred(self.handler.query_room_alias_exists(room_alias))
  140. )
  141. assert result is not None
  142. self.mock_as_api.query_alias.assert_called_once_with(
  143. interested_service, room_alias_str
  144. )
  145. self.assertEqual(result.room_id, room_id)
  146. self.assertEqual(result.servers, servers)
  147. def test_get_3pe_protocols_no_appservices(self) -> None:
  148. self.mock_store.get_app_services.return_value = []
  149. response = self.successResultOf(
  150. defer.ensureDeferred(self.handler.get_3pe_protocols("my-protocol"))
  151. )
  152. self.mock_as_api.get_3pe_protocol.assert_not_called()
  153. self.assertEqual(response, {})
  154. def test_get_3pe_protocols_no_protocols(self) -> None:
  155. service = self._mkservice(False, [])
  156. self.mock_store.get_app_services.return_value = [service]
  157. response = self.successResultOf(
  158. defer.ensureDeferred(self.handler.get_3pe_protocols())
  159. )
  160. self.mock_as_api.get_3pe_protocol.assert_not_called()
  161. self.assertEqual(response, {})
  162. def test_get_3pe_protocols_protocol_no_response(self) -> None:
  163. service = self._mkservice(False, ["my-protocol"])
  164. self.mock_store.get_app_services.return_value = [service]
  165. self.mock_as_api.get_3pe_protocol.return_value = None
  166. response = self.successResultOf(
  167. defer.ensureDeferred(self.handler.get_3pe_protocols())
  168. )
  169. self.mock_as_api.get_3pe_protocol.assert_called_once_with(
  170. service, "my-protocol"
  171. )
  172. self.assertEqual(response, {})
  173. def test_get_3pe_protocols_select_one_protocol(self) -> None:
  174. service = self._mkservice(False, ["my-protocol"])
  175. self.mock_store.get_app_services.return_value = [service]
  176. self.mock_as_api.get_3pe_protocol.return_value = {
  177. "x-protocol-data": 42,
  178. "instances": [],
  179. }
  180. response = self.successResultOf(
  181. defer.ensureDeferred(self.handler.get_3pe_protocols("my-protocol"))
  182. )
  183. self.mock_as_api.get_3pe_protocol.assert_called_once_with(
  184. service, "my-protocol"
  185. )
  186. self.assertEqual(
  187. response, {"my-protocol": {"x-protocol-data": 42, "instances": []}}
  188. )
  189. def test_get_3pe_protocols_one_protocol(self) -> None:
  190. service = self._mkservice(False, ["my-protocol"])
  191. self.mock_store.get_app_services.return_value = [service]
  192. self.mock_as_api.get_3pe_protocol.return_value = {
  193. "x-protocol-data": 42,
  194. "instances": [],
  195. }
  196. response = self.successResultOf(
  197. defer.ensureDeferred(self.handler.get_3pe_protocols())
  198. )
  199. self.mock_as_api.get_3pe_protocol.assert_called_once_with(
  200. service, "my-protocol"
  201. )
  202. self.assertEqual(
  203. response, {"my-protocol": {"x-protocol-data": 42, "instances": []}}
  204. )
  205. def test_get_3pe_protocols_multiple_protocol(self) -> None:
  206. service_one = self._mkservice(False, ["my-protocol"])
  207. service_two = self._mkservice(False, ["other-protocol"])
  208. self.mock_store.get_app_services.return_value = [service_one, service_two]
  209. self.mock_as_api.get_3pe_protocol.return_value = {
  210. "x-protocol-data": 42,
  211. "instances": [],
  212. }
  213. response = self.successResultOf(
  214. defer.ensureDeferred(self.handler.get_3pe_protocols())
  215. )
  216. self.mock_as_api.get_3pe_protocol.assert_called()
  217. self.assertEqual(
  218. response,
  219. {
  220. "my-protocol": {"x-protocol-data": 42, "instances": []},
  221. "other-protocol": {"x-protocol-data": 42, "instances": []},
  222. },
  223. )
  224. def test_get_3pe_protocols_multiple_info(self) -> None:
  225. service_one = self._mkservice(False, ["my-protocol"])
  226. service_two = self._mkservice(False, ["my-protocol"])
  227. async def get_3pe_protocol(
  228. service: ApplicationService, protocol: str
  229. ) -> Optional[JsonDict]:
  230. if service == service_one:
  231. return {
  232. "x-protocol-data": 42,
  233. "instances": [{"desc": "Alice's service"}],
  234. }
  235. if service == service_two:
  236. return {
  237. "x-protocol-data": 36,
  238. "x-not-used": 45,
  239. "instances": [{"desc": "Bob's service"}],
  240. }
  241. raise Exception("Unexpected service")
  242. self.mock_store.get_app_services.return_value = [service_one, service_two]
  243. self.mock_as_api.get_3pe_protocol = get_3pe_protocol
  244. response = self.successResultOf(
  245. defer.ensureDeferred(self.handler.get_3pe_protocols())
  246. )
  247. # It's expected that the second service's data doesn't appear in the response
  248. self.assertEqual(
  249. response,
  250. {
  251. "my-protocol": {
  252. "x-protocol-data": 42,
  253. "instances": [
  254. {
  255. "desc": "Alice's service",
  256. },
  257. {"desc": "Bob's service"},
  258. ],
  259. },
  260. },
  261. )
  262. def test_notify_interested_services_ephemeral(self) -> None:
  263. """
  264. Test sending ephemeral events to the appservice handler are scheduled
  265. to be pushed out to interested appservices, and that the stream ID is
  266. updated accordingly.
  267. """
  268. interested_service = self._mkservice(is_interested_in_event=True)
  269. services = [interested_service]
  270. self.mock_store.get_app_services.return_value = services
  271. self.mock_store.get_type_stream_id_for_appservice = AsyncMock(return_value=579)
  272. event = Mock(event_id="event_1")
  273. self.event_source.sources.receipt.get_new_events_as = AsyncMock(
  274. return_value=([event], None)
  275. )
  276. self.handler.notify_interested_services_ephemeral(
  277. StreamKeyType.RECEIPT,
  278. MultiWriterStreamToken(stream=580),
  279. ["@fakerecipient:example.com"],
  280. )
  281. self.mock_scheduler.enqueue_for_appservice.assert_called_once_with(
  282. interested_service, ephemeral=[event]
  283. )
  284. self.mock_store.set_appservice_stream_type_pos.assert_called_once_with(
  285. interested_service,
  286. "read_receipt",
  287. 580,
  288. )
  289. def test_notify_interested_services_ephemeral_out_of_order(self) -> None:
  290. """
  291. Test sending out of order ephemeral events to the appservice handler
  292. are ignored.
  293. """
  294. interested_service = self._mkservice(is_interested_in_event=True)
  295. services = [interested_service]
  296. self.mock_store.get_app_services.return_value = services
  297. self.mock_store.get_type_stream_id_for_appservice = AsyncMock(return_value=580)
  298. event = Mock(event_id="event_1")
  299. self.event_source.sources.receipt.get_new_events_as = AsyncMock(
  300. return_value=([event], None)
  301. )
  302. self.handler.notify_interested_services_ephemeral(
  303. StreamKeyType.RECEIPT,
  304. MultiWriterStreamToken(stream=580),
  305. ["@fakerecipient:example.com"],
  306. )
  307. # This method will be called, but with an empty list of events
  308. self.mock_scheduler.enqueue_for_appservice.assert_called_once_with(
  309. interested_service, ephemeral=[]
  310. )
  311. def _mkservice(
  312. self, is_interested_in_event: bool, protocols: Optional[Iterable] = None
  313. ) -> Mock:
  314. """
  315. Create a new mock representing an ApplicationService.
  316. Args:
  317. is_interested_in_event: Whether this application service will be considered
  318. interested in all events.
  319. protocols: The third-party protocols that this application service claims to
  320. support.
  321. Returns:
  322. A mock representing the ApplicationService.
  323. """
  324. service = Mock()
  325. service.is_interested_in_event = AsyncMock(return_value=is_interested_in_event)
  326. service.token = "mock_service_token"
  327. service.url = "mock_service_url"
  328. service.protocols = protocols
  329. return service
  330. def _mkservice_alias(self, is_room_alias_in_namespace: bool) -> Mock:
  331. """
  332. Create a new mock representing an ApplicationService that is or is not interested
  333. any given room aliase.
  334. Args:
  335. is_room_alias_in_namespace: If true, the application service will be interested
  336. in all room aliases that are queried against it. If false, the application
  337. service will not be interested in any room aliases.
  338. Returns:
  339. A mock representing the ApplicationService.
  340. """
  341. service = Mock()
  342. service.is_room_alias_in_namespace.return_value = is_room_alias_in_namespace
  343. service.token = "mock_service_token"
  344. service.url = "mock_service_url"
  345. return service
  346. class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase):
  347. """
  348. Tests that the ApplicationServicesHandler sends events to application
  349. services correctly.
  350. """
  351. servlets = [
  352. synapse.rest.admin.register_servlets_for_client_rest_resource,
  353. login.register_servlets,
  354. room.register_servlets,
  355. sendtodevice.register_servlets,
  356. receipts.register_servlets,
  357. ]
  358. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  359. self.hs = hs
  360. # Mock the ApplicationServiceScheduler's _TransactionController's send method so that
  361. # we can track any outgoing ephemeral events
  362. self.send_mock = AsyncMock()
  363. hs.get_application_service_handler().scheduler.txn_ctrl.send = self.send_mock # type: ignore[method-assign]
  364. # Mock out application services, and allow defining our own in tests
  365. self._services: List[ApplicationService] = []
  366. self.hs.get_datastores().main.get_app_services = Mock( # type: ignore[method-assign]
  367. return_value=self._services
  368. )
  369. # A user on the homeserver.
  370. self.local_user_device_id = "local_device"
  371. self.local_user = self.register_user("local_user", "password")
  372. self.local_user_token = self.login(
  373. "local_user", "password", self.local_user_device_id
  374. )
  375. # A user on the homeserver which lies within an appservice's exclusive user namespace.
  376. self.exclusive_as_user_device_id = "exclusive_as_device"
  377. self.exclusive_as_user = self.register_user("exclusive_as_user", "password")
  378. self.exclusive_as_user_token = self.login(
  379. "exclusive_as_user", "password", self.exclusive_as_user_device_id
  380. )
  381. self.exclusive_as_user_2_device_id = "exclusive_as_device_2"
  382. self.exclusive_as_user_2 = self.register_user("exclusive_as_user_2", "password")
  383. self.exclusive_as_user_2_token = self.login(
  384. "exclusive_as_user_2", "password", self.exclusive_as_user_2_device_id
  385. )
  386. self.exclusive_as_user_3_device_id = "exclusive_as_device_3"
  387. self.exclusive_as_user_3 = self.register_user("exclusive_as_user_3", "password")
  388. self.exclusive_as_user_3_token = self.login(
  389. "exclusive_as_user_3", "password", self.exclusive_as_user_3_device_id
  390. )
  391. def _notify_interested_services(self) -> None:
  392. # This is normally set in `notify_interested_services` but we need to call the
  393. # internal async version so the reactor gets pushed to completion.
  394. self.hs.get_application_service_handler().current_max += 1
  395. self.get_success(
  396. self.hs.get_application_service_handler()._notify_interested_services(
  397. RoomStreamToken(
  398. stream=self.hs.get_application_service_handler().current_max
  399. )
  400. )
  401. )
  402. @parameterized.expand(
  403. [
  404. ("@local_as_user:test", True),
  405. # Defining remote users in an application service user namespace regex is a
  406. # footgun since the appservice might assume that it'll receive all events
  407. # sent by that remote user, but it will only receive events in rooms that
  408. # are shared with a local user. So we just remove this footgun possibility
  409. # entirely and we won't notify the application service based on remote
  410. # users.
  411. ("@remote_as_user:remote", False),
  412. ]
  413. )
  414. def test_match_interesting_room_members(
  415. self, interesting_user: str, should_notify: bool
  416. ) -> None:
  417. """
  418. Test to make sure that a interesting user (local or remote) in the room is
  419. notified as expected when someone else in the room sends a message.
  420. """
  421. # Register an application service that's interested in the `interesting_user`
  422. interested_appservice = self._register_application_service(
  423. namespaces={
  424. ApplicationService.NS_USERS: [
  425. {
  426. "regex": interesting_user,
  427. "exclusive": False,
  428. },
  429. ],
  430. },
  431. )
  432. # Create a room
  433. alice = self.register_user("alice", "pass")
  434. alice_access_token = self.login("alice", "pass")
  435. room_id = self.helper.create_room_as(room_creator=alice, tok=alice_access_token)
  436. # Join the interesting user to the room
  437. self.get_success(
  438. event_injection.inject_member_event(
  439. self.hs, room_id, interesting_user, "join"
  440. )
  441. )
  442. # Kick the appservice into checking this membership event to get the event out
  443. # of the way
  444. self._notify_interested_services()
  445. # We don't care about the interesting user join event (this test is making sure
  446. # the next thing works)
  447. self.send_mock.reset_mock()
  448. # Send a message from an uninteresting user
  449. self.helper.send_event(
  450. room_id,
  451. type=EventTypes.Message,
  452. content={
  453. "msgtype": "m.text",
  454. "body": "message from uninteresting user",
  455. },
  456. tok=alice_access_token,
  457. )
  458. # Kick the appservice into checking this new event
  459. self._notify_interested_services()
  460. if should_notify:
  461. self.send_mock.assert_called_once()
  462. (
  463. service,
  464. events,
  465. _ephemeral,
  466. _to_device_messages,
  467. _otks,
  468. _fbks,
  469. _device_list_summary,
  470. ) = self.send_mock.call_args[0]
  471. # Even though the message came from an uninteresting user, it should still
  472. # notify us because the interesting user is joined to the room where the
  473. # message was sent.
  474. self.assertEqual(service, interested_appservice)
  475. self.assertEqual(events[0]["type"], "m.room.message")
  476. self.assertEqual(events[0]["sender"], alice)
  477. else:
  478. self.send_mock.assert_not_called()
  479. def test_application_services_receive_events_sent_by_interesting_local_user(
  480. self,
  481. ) -> None:
  482. """
  483. Test to make sure that a messages sent from a local user can be interesting and
  484. picked up by the appservice.
  485. """
  486. # Register an application service that's interested in all local users
  487. interested_appservice = self._register_application_service(
  488. namespaces={
  489. ApplicationService.NS_USERS: [
  490. {
  491. "regex": ".*",
  492. "exclusive": False,
  493. },
  494. ],
  495. },
  496. )
  497. # Create a room
  498. alice = self.register_user("alice", "pass")
  499. alice_access_token = self.login("alice", "pass")
  500. room_id = self.helper.create_room_as(room_creator=alice, tok=alice_access_token)
  501. # We don't care about interesting events before this (this test is making sure
  502. # the next thing works)
  503. self.send_mock.reset_mock()
  504. # Send a message from the interesting local user
  505. self.helper.send_event(
  506. room_id,
  507. type=EventTypes.Message,
  508. content={
  509. "msgtype": "m.text",
  510. "body": "message from interesting local user",
  511. },
  512. tok=alice_access_token,
  513. )
  514. # Kick the appservice into checking this new event
  515. self._notify_interested_services()
  516. self.send_mock.assert_called_once()
  517. (
  518. service,
  519. events,
  520. _ephemeral,
  521. _to_device_messages,
  522. _otks,
  523. _fbks,
  524. _device_list_summary,
  525. ) = self.send_mock.call_args[0]
  526. # Events sent from an interesting local user should also be picked up as
  527. # interesting to the appservice.
  528. self.assertEqual(service, interested_appservice)
  529. self.assertEqual(events[0]["type"], "m.room.message")
  530. self.assertEqual(events[0]["sender"], alice)
  531. def test_sending_read_receipt_batches_to_application_services(self) -> None:
  532. """Tests that a large batch of read receipts are sent correctly to
  533. interested application services.
  534. """
  535. # Register an application service that's interested in a certain user
  536. # and room prefix
  537. interested_appservice = self._register_application_service(
  538. namespaces={
  539. ApplicationService.NS_USERS: [
  540. {
  541. "regex": "@exclusive_as_user:.+",
  542. "exclusive": True,
  543. }
  544. ],
  545. ApplicationService.NS_ROOMS: [
  546. {
  547. "regex": "!fakeroom_.*",
  548. "exclusive": True,
  549. }
  550. ],
  551. },
  552. )
  553. # Now, pretend that we receive a large burst of read receipts (300 total) that
  554. # all come in at once.
  555. for i in range(300):
  556. self.get_success(
  557. # Insert a fake read receipt into the database
  558. self.hs.get_datastores().main.insert_receipt(
  559. # We have to use unique room ID + user ID combinations here, as the db query
  560. # is an upsert.
  561. room_id=f"!fakeroom_{i}:test",
  562. receipt_type="m.read",
  563. user_id=self.local_user,
  564. event_ids=[f"$eventid_{i}"],
  565. thread_id=None,
  566. data={},
  567. )
  568. )
  569. # Now notify the appservice handler that 300 read receipts have all arrived
  570. # at once. What will it do!
  571. # note: stream tokens start at 2
  572. for stream_token in range(2, 303):
  573. self.get_success(
  574. self.hs.get_application_service_handler()._notify_interested_services_ephemeral(
  575. services=[interested_appservice],
  576. stream_key=StreamKeyType.RECEIPT,
  577. new_token=MultiWriterStreamToken(stream=stream_token),
  578. users=[self.exclusive_as_user],
  579. )
  580. )
  581. # Using our txn send mock, we can see what the AS received. After iterating over every
  582. # transaction, we'd like to see all 300 read receipts accounted for.
  583. # No more, no less.
  584. all_ephemeral_events = []
  585. for call in self.send_mock.call_args_list:
  586. ephemeral_events = call[0][2]
  587. all_ephemeral_events += ephemeral_events
  588. # Ensure that no duplicate events were sent
  589. self.assertEqual(len(all_ephemeral_events), 300)
  590. # Check that the ephemeral event is a read receipt with the expected structure
  591. latest_read_receipt = all_ephemeral_events[-1]
  592. self.assertEqual(latest_read_receipt["type"], EduTypes.RECEIPT)
  593. event_id = list(latest_read_receipt["content"].keys())[0]
  594. self.assertEqual(
  595. latest_read_receipt["content"][event_id]["m.read"], {self.local_user: {}}
  596. )
  597. @unittest.override_config(
  598. {"experimental_features": {"msc2409_to_device_messages_enabled": True}}
  599. )
  600. def test_application_services_receive_local_to_device(self) -> None:
  601. """
  602. Test that when a user sends a to-device message to another user
  603. that is an application service's user namespace, the
  604. application service will receive it.
  605. """
  606. interested_appservice = self._register_application_service(
  607. namespaces={
  608. ApplicationService.NS_USERS: [
  609. {
  610. "regex": "@exclusive_as_user:.+",
  611. "exclusive": True,
  612. }
  613. ],
  614. },
  615. )
  616. # Have local_user send a to-device message to exclusive_as_user
  617. message_content = {"some_key": "some really interesting value"}
  618. chan = self.make_request(
  619. "PUT",
  620. "/_matrix/client/r0/sendToDevice/m.room_key_request/3",
  621. content={
  622. "messages": {
  623. self.exclusive_as_user: {
  624. self.exclusive_as_user_device_id: message_content
  625. }
  626. }
  627. },
  628. access_token=self.local_user_token,
  629. )
  630. self.assertEqual(chan.code, 200, chan.result)
  631. # Have exclusive_as_user send a to-device message to local_user
  632. chan = self.make_request(
  633. "PUT",
  634. "/_matrix/client/r0/sendToDevice/m.room_key_request/4",
  635. content={
  636. "messages": {
  637. self.local_user: {self.local_user_device_id: message_content}
  638. }
  639. },
  640. access_token=self.exclusive_as_user_token,
  641. )
  642. self.assertEqual(chan.code, 200, chan.result)
  643. # Check if our application service - that is interested in exclusive_as_user - received
  644. # the to-device message as part of an AS transaction.
  645. # Only the local_user -> exclusive_as_user to-device message should have been forwarded to the AS.
  646. #
  647. # The uninterested application service should not have been notified at all.
  648. self.send_mock.assert_called_once()
  649. (
  650. service,
  651. _events,
  652. _ephemeral,
  653. to_device_messages,
  654. _otks,
  655. _fbks,
  656. _device_list_summary,
  657. ) = self.send_mock.call_args[0]
  658. # Assert that this was the same to-device message that local_user sent
  659. self.assertEqual(service, interested_appservice)
  660. self.assertEqual(to_device_messages[0]["type"], "m.room_key_request")
  661. self.assertEqual(to_device_messages[0]["sender"], self.local_user)
  662. # Additional fields 'to_user_id' and 'to_device_id' specifically for
  663. # to-device messages via the AS API
  664. self.assertEqual(to_device_messages[0]["to_user_id"], self.exclusive_as_user)
  665. self.assertEqual(
  666. to_device_messages[0]["to_device_id"], self.exclusive_as_user_device_id
  667. )
  668. self.assertEqual(to_device_messages[0]["content"], message_content)
  669. @unittest.override_config(
  670. {"experimental_features": {"msc2409_to_device_messages_enabled": True}}
  671. )
  672. def test_application_services_receive_bursts_of_to_device(self) -> None:
  673. """
  674. Test that when a user sends >100 to-device messages at once, any
  675. interested AS's will receive them in separate transactions.
  676. Also tests that uninterested application services do not receive messages.
  677. """
  678. # Register two application services with exclusive interest in a user
  679. interested_appservices = []
  680. for _ in range(2):
  681. appservice = self._register_application_service(
  682. namespaces={
  683. ApplicationService.NS_USERS: [
  684. {
  685. "regex": "@exclusive_as_user:.+",
  686. "exclusive": True,
  687. }
  688. ],
  689. },
  690. )
  691. interested_appservices.append(appservice)
  692. # ...and an application service which does not have any user interest.
  693. self._register_application_service()
  694. to_device_message_content = {
  695. "some key": "some interesting value",
  696. }
  697. # We need to send a large burst of to-device messages. We also would like to
  698. # include them all in the same application service transaction so that we can
  699. # test large transactions.
  700. #
  701. # To do this, we can send a single to-device message to many user devices at
  702. # once.
  703. #
  704. # We insert number_of_messages - 1 messages into the database directly. We'll then
  705. # send a final to-device message to the real device, which will also kick off
  706. # an AS transaction (as just inserting messages into the DB won't).
  707. number_of_messages = 150
  708. fake_device_ids = [f"device_{num}" for num in range(number_of_messages - 1)]
  709. messages = {
  710. self.exclusive_as_user: {
  711. device_id: {
  712. "type": "test_to_device_message",
  713. "sender": "@some:sender",
  714. "content": to_device_message_content,
  715. }
  716. for device_id in fake_device_ids
  717. }
  718. }
  719. # Create a fake device per message. We can't send to-device messages to
  720. # a device that doesn't exist.
  721. self.get_success(
  722. self.hs.get_datastores().main.db_pool.simple_insert_many(
  723. desc="test_application_services_receive_burst_of_to_device",
  724. table="devices",
  725. keys=("user_id", "device_id"),
  726. values=[
  727. (
  728. self.exclusive_as_user,
  729. device_id,
  730. )
  731. for device_id in fake_device_ids
  732. ],
  733. )
  734. )
  735. # Seed the device_inbox table with our fake messages
  736. self.get_success(
  737. self.hs.get_datastores().main.add_messages_to_device_inbox(messages, {})
  738. )
  739. # Now have local_user send a final to-device message to exclusive_as_user. All unsent
  740. # to-device messages should be sent to any application services
  741. # interested in exclusive_as_user.
  742. chan = self.make_request(
  743. "PUT",
  744. "/_matrix/client/r0/sendToDevice/m.room_key_request/4",
  745. content={
  746. "messages": {
  747. self.exclusive_as_user: {
  748. self.exclusive_as_user_device_id: to_device_message_content
  749. }
  750. }
  751. },
  752. access_token=self.local_user_token,
  753. )
  754. self.assertEqual(chan.code, 200, chan.result)
  755. self.send_mock.assert_called()
  756. # Count the total number of to-device messages that were sent out per-service.
  757. # Ensure that we only sent to-device messages to interested services, and that
  758. # each interested service received the full count of to-device messages.
  759. service_id_to_message_count: Dict[str, int] = {}
  760. for call in self.send_mock.call_args_list:
  761. (
  762. service,
  763. _events,
  764. _ephemeral,
  765. to_device_messages,
  766. _otks,
  767. _fbks,
  768. _device_list_summary,
  769. ) = call[0]
  770. # Check that this was made to an interested service
  771. self.assertIn(service, interested_appservices)
  772. # Add to the count of messages for this application service
  773. service_id_to_message_count.setdefault(service.id, 0)
  774. service_id_to_message_count[service.id] += len(to_device_messages)
  775. # Assert that each interested service received the full count of messages
  776. for count in service_id_to_message_count.values():
  777. self.assertEqual(count, number_of_messages)
  778. @unittest.override_config(
  779. {"experimental_features": {"msc2409_to_device_messages_enabled": True}}
  780. )
  781. def test_application_services_receive_local_to_device_for_many_users(self) -> None:
  782. """
  783. Test that when a user sends a to-device message to many users
  784. in an application service's user namespace, the
  785. application service will receive all of them.
  786. """
  787. interested_appservice = self._register_application_service(
  788. namespaces={
  789. ApplicationService.NS_USERS: [
  790. {
  791. "regex": "@exclusive_as_user:.+",
  792. "exclusive": True,
  793. },
  794. {
  795. "regex": "@exclusive_as_user_2:.+",
  796. "exclusive": True,
  797. },
  798. {
  799. "regex": "@exclusive_as_user_3:.+",
  800. "exclusive": True,
  801. },
  802. ],
  803. },
  804. )
  805. # Have local_user send a to-device message to exclusive_as_users
  806. message_content = {"some_key": "some really interesting value"}
  807. chan = self.make_request(
  808. "PUT",
  809. "/_matrix/client/r0/sendToDevice/m.room_key_request/3",
  810. content={
  811. "messages": {
  812. self.exclusive_as_user: {
  813. self.exclusive_as_user_device_id: message_content
  814. },
  815. self.exclusive_as_user_2: {
  816. self.exclusive_as_user_2_device_id: message_content
  817. },
  818. self.exclusive_as_user_3: {
  819. self.exclusive_as_user_3_device_id: message_content
  820. },
  821. }
  822. },
  823. access_token=self.local_user_token,
  824. )
  825. self.assertEqual(chan.code, 200, chan.result)
  826. # Have exclusive_as_user send a to-device message to local_user
  827. for user_token in [
  828. self.exclusive_as_user_token,
  829. self.exclusive_as_user_2_token,
  830. self.exclusive_as_user_3_token,
  831. ]:
  832. chan = self.make_request(
  833. "PUT",
  834. "/_matrix/client/r0/sendToDevice/m.room_key_request/4",
  835. content={
  836. "messages": {
  837. self.local_user: {self.local_user_device_id: message_content}
  838. }
  839. },
  840. access_token=user_token,
  841. )
  842. self.assertEqual(chan.code, 200, chan.result)
  843. # Check if our application service - that is interested in exclusive_as_user - received
  844. # the to-device message as part of an AS transaction.
  845. # Only the local_user -> exclusive_as_user to-device message should have been forwarded to the AS.
  846. #
  847. # The uninterested application service should not have been notified at all.
  848. self.send_mock.assert_called_once()
  849. (
  850. service,
  851. _events,
  852. _ephemeral,
  853. to_device_messages,
  854. _otks,
  855. _fbks,
  856. _device_list_summary,
  857. ) = self.send_mock.call_args[0]
  858. # Assert that this was the same to-device message that local_user sent
  859. self.assertEqual(service, interested_appservice)
  860. # Assert expected number of messages
  861. self.assertEqual(len(to_device_messages), 3)
  862. for device_msg in to_device_messages:
  863. self.assertEqual(device_msg["type"], "m.room_key_request")
  864. self.assertEqual(device_msg["sender"], self.local_user)
  865. self.assertEqual(device_msg["content"], message_content)
  866. self.assertEqual(to_device_messages[0]["to_user_id"], self.exclusive_as_user)
  867. self.assertEqual(
  868. to_device_messages[0]["to_device_id"],
  869. self.exclusive_as_user_device_id,
  870. )
  871. self.assertEqual(to_device_messages[1]["to_user_id"], self.exclusive_as_user_2)
  872. self.assertEqual(
  873. to_device_messages[1]["to_device_id"],
  874. self.exclusive_as_user_2_device_id,
  875. )
  876. self.assertEqual(to_device_messages[2]["to_user_id"], self.exclusive_as_user_3)
  877. self.assertEqual(
  878. to_device_messages[2]["to_device_id"],
  879. self.exclusive_as_user_3_device_id,
  880. )
  881. def _register_application_service(
  882. self,
  883. namespaces: Optional[Dict[str, Iterable[Dict]]] = None,
  884. ) -> ApplicationService:
  885. """
  886. Register a new application service, with the given namespaces of interest.
  887. Args:
  888. namespaces: A dictionary containing any user, room or alias namespaces that
  889. the application service is interested in.
  890. Returns:
  891. The registered application service.
  892. """
  893. # Create an application service
  894. appservice = ApplicationService(
  895. token=random_string(10),
  896. id=random_string(10),
  897. sender="@as:example.com",
  898. rate_limited=False,
  899. namespaces=namespaces,
  900. supports_ephemeral=True,
  901. )
  902. # Register the application service
  903. self._services.append(appservice)
  904. return appservice
  905. class ApplicationServicesHandlerDeviceListsTestCase(unittest.HomeserverTestCase):
  906. """
  907. Tests that the ApplicationServicesHandler sends device list updates to application
  908. services correctly.
  909. """
  910. servlets = [
  911. synapse.rest.admin.register_servlets_for_client_rest_resource,
  912. login.register_servlets,
  913. room.register_servlets,
  914. ]
  915. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  916. # Allow us to modify cached feature flags mid-test
  917. self.as_handler = hs.get_application_service_handler()
  918. # Mock ApplicationServiceApi's put_json, so we can verify the raw JSON that
  919. # will be sent over the wire
  920. self.put_json = AsyncMock()
  921. hs.get_application_service_api().put_json = self.put_json # type: ignore[method-assign]
  922. # Mock out application services, and allow defining our own in tests
  923. self._services: List[ApplicationService] = []
  924. self.hs.get_datastores().main.get_app_services = Mock( # type: ignore[method-assign]
  925. return_value=self._services
  926. )
  927. # Test across a variety of configuration values
  928. @parameterized.expand(
  929. [
  930. (True, True, True),
  931. (True, False, False),
  932. (False, True, False),
  933. (False, False, False),
  934. ]
  935. )
  936. def test_application_service_receives_device_list_updates(
  937. self,
  938. experimental_feature_enabled: bool,
  939. as_supports_txn_extensions: bool,
  940. as_should_receive_device_list_updates: bool,
  941. ) -> None:
  942. """
  943. Tests that an application service receives notice of changed device
  944. lists for a user, when a user changes their device lists.
  945. Arguments above are populated by parameterized.
  946. Args:
  947. as_should_receive_device_list_updates: Whether we expect the AS to receive the
  948. device list changes.
  949. experimental_feature_enabled: Whether the "msc3202_transaction_extensions" experimental
  950. feature is enabled. This feature must be enabled for device lists to ASs to work.
  951. as_supports_txn_extensions: Whether the application service has explicitly registered
  952. to receive information defined by MSC3202 - which includes device list changes.
  953. """
  954. # Change whether the experimental feature is enabled or disabled before making
  955. # device list changes
  956. self.as_handler._msc3202_transaction_extensions_enabled = (
  957. experimental_feature_enabled
  958. )
  959. # Create an appservice that is interested in "local_user"
  960. appservice = ApplicationService(
  961. token=random_string(10),
  962. id=random_string(10),
  963. sender="@as:example.com",
  964. rate_limited=False,
  965. namespaces={
  966. ApplicationService.NS_USERS: [
  967. {
  968. "regex": "@local_user:.+",
  969. "exclusive": False,
  970. }
  971. ],
  972. },
  973. supports_ephemeral=True,
  974. msc3202_transaction_extensions=as_supports_txn_extensions,
  975. # Must be set for Synapse to try pushing data to the AS
  976. hs_token="abcde",
  977. url="some_url",
  978. )
  979. # Register the application service
  980. self._services.append(appservice)
  981. # Register a user on the homeserver
  982. self.local_user = self.register_user("local_user", "password")
  983. self.local_user_token = self.login("local_user", "password")
  984. if as_should_receive_device_list_updates:
  985. # Ensure that the resulting JSON uses the unstable prefix and contains the
  986. # expected users
  987. self.put_json.assert_called_once()
  988. json_body = self.put_json.call_args[1]["json_body"]
  989. # Our application service should have received a device list update with
  990. # "local_user" in the "changed" list
  991. device_list_dict = json_body.get("org.matrix.msc3202.device_lists", {})
  992. self.assertEqual([], device_list_dict["left"])
  993. self.assertEqual([self.local_user], device_list_dict["changed"])
  994. else:
  995. # No device list changes should have been sent out
  996. self.put_json.assert_not_called()
  997. class ApplicationServicesHandlerOtkCountsTestCase(unittest.HomeserverTestCase):
  998. # Argument indices for pulling out arguments from a `send_mock`.
  999. ARG_OTK_COUNTS = 4
  1000. ARG_FALLBACK_KEYS = 5
  1001. servlets = [
  1002. synapse.rest.admin.register_servlets_for_client_rest_resource,
  1003. login.register_servlets,
  1004. register.register_servlets,
  1005. room.register_servlets,
  1006. sendtodevice.register_servlets,
  1007. receipts.register_servlets,
  1008. ]
  1009. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  1010. # Mock the ApplicationServiceScheduler's _TransactionController's send method so that
  1011. # we can track what's going out
  1012. self.send_mock = AsyncMock()
  1013. hs.get_application_service_handler().scheduler.txn_ctrl.send = self.send_mock # type: ignore[method-assign] # We assign to a method.
  1014. # Define an application service for the tests
  1015. self._service_token = "VERYSECRET"
  1016. self._service = ApplicationService(
  1017. self._service_token,
  1018. "as1",
  1019. "@as.sender:test",
  1020. namespaces={
  1021. "users": [
  1022. {"regex": "@_as_.*:test", "exclusive": True},
  1023. {"regex": "@as.sender:test", "exclusive": True},
  1024. ]
  1025. },
  1026. msc3202_transaction_extensions=True,
  1027. )
  1028. self.hs.get_datastores().main.services_cache = [self._service]
  1029. # Register some appservice users
  1030. self._sender_user, self._sender_device = self.register_appservice_user(
  1031. "as.sender", self._service_token
  1032. )
  1033. self._namespaced_user, self._namespaced_device = self.register_appservice_user(
  1034. "_as_user1", self._service_token
  1035. )
  1036. # Register a real user as well.
  1037. self._real_user = self.register_user("real.user", "meow")
  1038. self._real_user_token = self.login("real.user", "meow")
  1039. async def _add_otks_for_device(
  1040. self, user_id: str, device_id: str, otk_count: int
  1041. ) -> None:
  1042. """
  1043. Add some dummy keys. It doesn't matter if they're not a real algorithm;
  1044. that should be opaque to the server anyway.
  1045. """
  1046. await self.hs.get_datastores().main.add_e2e_one_time_keys(
  1047. user_id,
  1048. device_id,
  1049. self.clock.time_msec(),
  1050. [("algo", f"k{i}", "{}") for i in range(otk_count)],
  1051. )
  1052. async def _add_fallback_key_for_device(
  1053. self, user_id: str, device_id: str, used: bool
  1054. ) -> None:
  1055. """
  1056. Adds a fake fallback key to a device, optionally marking it as used
  1057. right away.
  1058. """
  1059. store = self.hs.get_datastores().main
  1060. await store.set_e2e_fallback_keys(user_id, device_id, {"algo:fk": "fall back!"})
  1061. if used is True:
  1062. # Mark the key as used
  1063. await store.db_pool.simple_update_one(
  1064. table="e2e_fallback_keys_json",
  1065. keyvalues={
  1066. "user_id": user_id,
  1067. "device_id": device_id,
  1068. "algorithm": "algo",
  1069. "key_id": "fk",
  1070. },
  1071. updatevalues={"used": True},
  1072. desc="_get_fallback_key_set_used",
  1073. )
  1074. def _set_up_devices_and_a_room(self) -> str:
  1075. """
  1076. Helper to set up devices for all the users
  1077. and a room for the users to talk in.
  1078. """
  1079. async def preparation() -> None:
  1080. await self._add_otks_for_device(self._sender_user, self._sender_device, 42)
  1081. await self._add_fallback_key_for_device(
  1082. self._sender_user, self._sender_device, used=True
  1083. )
  1084. await self._add_otks_for_device(
  1085. self._namespaced_user, self._namespaced_device, 36
  1086. )
  1087. await self._add_fallback_key_for_device(
  1088. self._namespaced_user, self._namespaced_device, used=False
  1089. )
  1090. # Register a device for the real user, too, so that we can later ensure
  1091. # that we don't leak information to the AS about the non-AS user.
  1092. await self.hs.get_datastores().main.store_device(
  1093. self._real_user, "REALDEV", "UltraMatrix 3000"
  1094. )
  1095. await self._add_otks_for_device(self._real_user, "REALDEV", 50)
  1096. self.get_success(preparation())
  1097. room_id = self.helper.create_room_as(
  1098. self._real_user, is_public=True, tok=self._real_user_token
  1099. )
  1100. self.helper.join(
  1101. room_id,
  1102. self._namespaced_user,
  1103. tok=self._service_token,
  1104. appservice_user_id=self._namespaced_user,
  1105. )
  1106. # Check it was called for sanity. (This was to send the join event to the AS.)
  1107. self.send_mock.assert_called()
  1108. self.send_mock.reset_mock()
  1109. return room_id
  1110. @override_config(
  1111. {"experimental_features": {"msc3202_transaction_extensions": True}}
  1112. )
  1113. def test_application_services_receive_otk_counts_and_fallback_key_usages_with_pdus(
  1114. self,
  1115. ) -> None:
  1116. """
  1117. Tests that:
  1118. - the AS receives one-time key counts and unused fallback keys for:
  1119. - the specified sender; and
  1120. - any user who is in receipt of the PDUs
  1121. """
  1122. room_id = self._set_up_devices_and_a_room()
  1123. # Send a message into the AS's room
  1124. self.helper.send(room_id, "woof woof", tok=self._real_user_token)
  1125. # Capture what was sent as an AS transaction.
  1126. self.send_mock.assert_called()
  1127. last_args, _last_kwargs = self.send_mock.call_args
  1128. otks: Optional[TransactionOneTimeKeysCount] = last_args[self.ARG_OTK_COUNTS]
  1129. unused_fallbacks: Optional[TransactionUnusedFallbackKeys] = last_args[
  1130. self.ARG_FALLBACK_KEYS
  1131. ]
  1132. self.assertEqual(
  1133. otks,
  1134. {
  1135. "@as.sender:test": {self._sender_device: {"algo": 42}},
  1136. "@_as_user1:test": {self._namespaced_device: {"algo": 36}},
  1137. },
  1138. )
  1139. self.assertEqual(
  1140. unused_fallbacks,
  1141. {
  1142. "@as.sender:test": {self._sender_device: []},
  1143. "@_as_user1:test": {self._namespaced_device: ["algo"]},
  1144. },
  1145. )