test_appservice.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import json
  15. import os
  16. import tempfile
  17. from typing import List, Optional, cast
  18. from unittest.mock import Mock
  19. import yaml
  20. from twisted.internet import defer
  21. from twisted.test.proto_helpers import MemoryReactor
  22. from synapse.appservice import ApplicationService, ApplicationServiceState
  23. from synapse.config._base import ConfigError
  24. from synapse.events import EventBase
  25. from synapse.server import HomeServer
  26. from synapse.storage.database import DatabasePool, make_conn
  27. from synapse.storage.databases.main.appservice import (
  28. ApplicationServiceStore,
  29. ApplicationServiceTransactionStore,
  30. )
  31. from synapse.util import Clock
  32. from tests import unittest
  33. from tests.test_utils import make_awaitable
  34. class ApplicationServiceStoreTestCase(unittest.HomeserverTestCase):
  35. def setUp(self):
  36. super(ApplicationServiceStoreTestCase, self).setUp()
  37. self.as_yaml_files: List[str] = []
  38. self.hs.config.appservice.app_service_config_files = self.as_yaml_files
  39. self.hs.config.caches.event_cache_size = 1
  40. self.as_token = "token1"
  41. self.as_url = "some_url"
  42. self.as_id = "as1"
  43. self._add_appservice(
  44. self.as_token, self.as_id, self.as_url, "some_hs_token", "bob"
  45. )
  46. self._add_appservice("token2", "as2", "some_url", "some_hs_token", "bob")
  47. self._add_appservice("token3", "as3", "some_url", "some_hs_token", "bob")
  48. # must be done after inserts
  49. database = self.hs.get_datastores().databases[0]
  50. self.store = ApplicationServiceStore(
  51. database,
  52. make_conn(database._database_config, database.engine, "test"),
  53. self.hs,
  54. )
  55. def tearDown(self) -> None:
  56. # TODO: suboptimal that we need to create files for tests!
  57. for f in self.as_yaml_files:
  58. try:
  59. os.remove(f)
  60. except Exception:
  61. pass
  62. super(ApplicationServiceStoreTestCase, self).tearDown()
  63. def _add_appservice(self, as_token, id, url, hs_token, sender) -> None:
  64. as_yaml = {
  65. "url": url,
  66. "as_token": as_token,
  67. "hs_token": hs_token,
  68. "id": id,
  69. "sender_localpart": sender,
  70. "namespaces": {},
  71. }
  72. # use the token as the filename
  73. with open(as_token, "w") as outfile:
  74. outfile.write(yaml.dump(as_yaml))
  75. self.as_yaml_files.append(as_token)
  76. def test_retrieve_unknown_service_token(self) -> None:
  77. service = self.store.get_app_service_by_token("invalid_token")
  78. self.assertEquals(service, None)
  79. def test_retrieval_of_service(self) -> None:
  80. stored_service = self.store.get_app_service_by_token(self.as_token)
  81. assert stored_service is not None
  82. self.assertEquals(stored_service.token, self.as_token)
  83. self.assertEquals(stored_service.id, self.as_id)
  84. self.assertEquals(stored_service.url, self.as_url)
  85. self.assertEquals(stored_service.namespaces[ApplicationService.NS_ALIASES], [])
  86. self.assertEquals(stored_service.namespaces[ApplicationService.NS_ROOMS], [])
  87. self.assertEquals(stored_service.namespaces[ApplicationService.NS_USERS], [])
  88. def test_retrieval_of_all_services(self) -> None:
  89. services = self.store.get_app_services()
  90. self.assertEquals(len(services), 3)
  91. class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase):
  92. def setUp(self) -> None:
  93. super(ApplicationServiceTransactionStoreTestCase, self).setUp()
  94. self.as_yaml_files: List[str] = []
  95. self.hs.config.appservice.app_service_config_files = self.as_yaml_files
  96. self.hs.config.caches.event_cache_size = 1
  97. self.as_list = [
  98. {"token": "token1", "url": "https://matrix-as.org", "id": "id_1"},
  99. {"token": "alpha_tok", "url": "https://alpha.com", "id": "id_alpha"},
  100. {"token": "beta_tok", "url": "https://beta.com", "id": "id_beta"},
  101. {"token": "gamma_tok", "url": "https://gamma.com", "id": "id_gamma"},
  102. ]
  103. for s in self.as_list:
  104. self._add_service(s["url"], s["token"], s["id"])
  105. self.as_yaml_files = []
  106. # We assume there is only one database in these tests
  107. database = self.hs.get_datastores().databases[0]
  108. self.db_pool = database._db_pool
  109. self.engine = database.engine
  110. db_config = self.hs.config.database.get_single_database()
  111. self.store = TestTransactionStore(
  112. database, make_conn(db_config, self.engine, "test"), self.hs
  113. )
  114. def _add_service(self, url, as_token, id) -> None:
  115. as_yaml = {
  116. "url": url,
  117. "as_token": as_token,
  118. "hs_token": "something",
  119. "id": id,
  120. "sender_localpart": "a_sender",
  121. "namespaces": {},
  122. }
  123. # use the token as the filename
  124. with open(as_token, "w") as outfile:
  125. outfile.write(yaml.dump(as_yaml))
  126. self.as_yaml_files.append(as_token)
  127. def _set_state(
  128. self, id: str, state: ApplicationServiceState, txn: Optional[int] = None
  129. ):
  130. return self.db_pool.runOperation(
  131. self.engine.convert_param_style(
  132. "INSERT INTO application_services_state(as_id, state, last_txn) "
  133. "VALUES(?,?,?)"
  134. ),
  135. (id, state.value, txn),
  136. )
  137. def _insert_txn(self, as_id, txn_id, events):
  138. return self.db_pool.runOperation(
  139. self.engine.convert_param_style(
  140. "INSERT INTO application_services_txns(as_id, txn_id, event_ids) "
  141. "VALUES(?,?,?)"
  142. ),
  143. (as_id, txn_id, json.dumps([e.event_id for e in events])),
  144. )
  145. def _set_last_txn(self, as_id, txn_id):
  146. return self.db_pool.runOperation(
  147. self.engine.convert_param_style(
  148. "INSERT INTO application_services_state(as_id, last_txn, state) "
  149. "VALUES(?,?,?)"
  150. ),
  151. (as_id, txn_id, ApplicationServiceState.UP.value),
  152. )
  153. def test_get_appservice_state_none(
  154. self,
  155. ) -> None:
  156. service = Mock(id="999")
  157. state = self.get_success(self.store.get_appservice_state(service))
  158. self.assertEquals(None, state)
  159. def test_get_appservice_state_up(
  160. self,
  161. ) -> None:
  162. self.get_success(
  163. self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP)
  164. )
  165. service = Mock(id=self.as_list[0]["id"])
  166. state = self.get_success(
  167. defer.ensureDeferred(self.store.get_appservice_state(service))
  168. )
  169. self.assertEquals(ApplicationServiceState.UP, state)
  170. def test_get_appservice_state_down(
  171. self,
  172. ) -> None:
  173. self.get_success(
  174. self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP)
  175. )
  176. self.get_success(
  177. self._set_state(self.as_list[1]["id"], ApplicationServiceState.DOWN)
  178. )
  179. self.get_success(
  180. self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN)
  181. )
  182. service = Mock(id=self.as_list[1]["id"])
  183. state = self.get_success(self.store.get_appservice_state(service))
  184. self.assertEquals(ApplicationServiceState.DOWN, state)
  185. def test_get_appservices_by_state_none(
  186. self,
  187. ) -> None:
  188. services = self.get_success(
  189. self.store.get_appservices_by_state(ApplicationServiceState.DOWN)
  190. )
  191. self.assertEquals(0, len(services))
  192. def test_set_appservices_state_down(
  193. self,
  194. ) -> None:
  195. service = Mock(id=self.as_list[1]["id"])
  196. self.get_success(
  197. self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
  198. )
  199. rows = self.get_success(
  200. self.db_pool.runQuery(
  201. self.engine.convert_param_style(
  202. "SELECT as_id FROM application_services_state WHERE state=?"
  203. ),
  204. (ApplicationServiceState.DOWN.value,),
  205. )
  206. )
  207. self.assertEquals(service.id, rows[0][0])
  208. def test_set_appservices_state_multiple_up(
  209. self,
  210. ) -> None:
  211. service = Mock(id=self.as_list[1]["id"])
  212. self.get_success(
  213. self.store.set_appservice_state(service, ApplicationServiceState.UP)
  214. )
  215. self.get_success(
  216. self.store.set_appservice_state(service, ApplicationServiceState.DOWN)
  217. )
  218. self.get_success(
  219. self.store.set_appservice_state(service, ApplicationServiceState.UP)
  220. )
  221. rows = self.get_success(
  222. self.db_pool.runQuery(
  223. self.engine.convert_param_style(
  224. "SELECT as_id FROM application_services_state WHERE state=?"
  225. ),
  226. (ApplicationServiceState.UP.value,),
  227. )
  228. )
  229. self.assertEquals(service.id, rows[0][0])
  230. def test_create_appservice_txn_first(
  231. self,
  232. ) -> None:
  233. service = Mock(id=self.as_list[0]["id"])
  234. events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")])
  235. txn = self.get_success(
  236. defer.ensureDeferred(self.store.create_appservice_txn(service, events, []))
  237. )
  238. self.assertEquals(txn.id, 1)
  239. self.assertEquals(txn.events, events)
  240. self.assertEquals(txn.service, service)
  241. def test_create_appservice_txn_older_last_txn(
  242. self,
  243. ) -> None:
  244. service = Mock(id=self.as_list[0]["id"])
  245. events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")])
  246. self.get_success(self._set_last_txn(service.id, 9643)) # AS is falling behind
  247. self.get_success(self._insert_txn(service.id, 9644, events))
  248. self.get_success(self._insert_txn(service.id, 9645, events))
  249. txn = self.get_success(self.store.create_appservice_txn(service, events, []))
  250. self.assertEquals(txn.id, 9646)
  251. self.assertEquals(txn.events, events)
  252. self.assertEquals(txn.service, service)
  253. def test_create_appservice_txn_up_to_date_last_txn(
  254. self,
  255. ) -> None:
  256. service = Mock(id=self.as_list[0]["id"])
  257. events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")])
  258. self.get_success(self._set_last_txn(service.id, 9643))
  259. txn = self.get_success(self.store.create_appservice_txn(service, events, []))
  260. self.assertEquals(txn.id, 9644)
  261. self.assertEquals(txn.events, events)
  262. self.assertEquals(txn.service, service)
  263. def test_create_appservice_txn_up_fuzzing(
  264. self,
  265. ) -> None:
  266. service = Mock(id=self.as_list[0]["id"])
  267. events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")])
  268. self.get_success(self._set_last_txn(service.id, 9643))
  269. # dump in rows with higher IDs to make sure the queries aren't wrong.
  270. self.get_success(self._set_last_txn(self.as_list[1]["id"], 119643))
  271. self.get_success(self._set_last_txn(self.as_list[2]["id"], 9))
  272. self.get_success(self._set_last_txn(self.as_list[3]["id"], 9643))
  273. self.get_success(self._insert_txn(self.as_list[1]["id"], 119644, events))
  274. self.get_success(self._insert_txn(self.as_list[1]["id"], 119645, events))
  275. self.get_success(self._insert_txn(self.as_list[1]["id"], 119646, events))
  276. self.get_success(self._insert_txn(self.as_list[2]["id"], 10, events))
  277. self.get_success(self._insert_txn(self.as_list[3]["id"], 9643, events))
  278. txn = self.get_success(self.store.create_appservice_txn(service, events, []))
  279. self.assertEquals(txn.id, 9644)
  280. self.assertEquals(txn.events, events)
  281. self.assertEquals(txn.service, service)
  282. def test_complete_appservice_txn_first_txn(
  283. self,
  284. ) -> None:
  285. service = Mock(id=self.as_list[0]["id"])
  286. events = [Mock(event_id="e1"), Mock(event_id="e2")]
  287. txn_id = 1
  288. self.get_success(self._insert_txn(service.id, txn_id, events))
  289. self.get_success(
  290. self.store.complete_appservice_txn(txn_id=txn_id, service=service)
  291. )
  292. res = self.get_success(
  293. self.db_pool.runQuery(
  294. self.engine.convert_param_style(
  295. "SELECT last_txn FROM application_services_state WHERE as_id=?"
  296. ),
  297. (service.id,),
  298. )
  299. )
  300. self.assertEquals(1, len(res))
  301. self.assertEquals(txn_id, res[0][0])
  302. res = self.get_success(
  303. self.db_pool.runQuery(
  304. self.engine.convert_param_style(
  305. "SELECT * FROM application_services_txns WHERE txn_id=?"
  306. ),
  307. (txn_id,),
  308. )
  309. )
  310. self.assertEquals(0, len(res))
  311. def test_complete_appservice_txn_existing_in_state_table(
  312. self,
  313. ) -> None:
  314. service = Mock(id=self.as_list[0]["id"])
  315. events = [Mock(event_id="e1"), Mock(event_id="e2")]
  316. txn_id = 5
  317. self.get_success(self._set_last_txn(service.id, 4))
  318. self.get_success(self._insert_txn(service.id, txn_id, events))
  319. self.get_success(
  320. self.store.complete_appservice_txn(txn_id=txn_id, service=service)
  321. )
  322. res = self.get_success(
  323. self.db_pool.runQuery(
  324. self.engine.convert_param_style(
  325. "SELECT last_txn, state FROM application_services_state WHERE as_id=?"
  326. ),
  327. (service.id,),
  328. )
  329. )
  330. self.assertEquals(1, len(res))
  331. self.assertEquals(txn_id, res[0][0])
  332. self.assertEquals(ApplicationServiceState.UP.value, res[0][1])
  333. res = self.get_success(
  334. self.db_pool.runQuery(
  335. self.engine.convert_param_style(
  336. "SELECT * FROM application_services_txns WHERE txn_id=?"
  337. ),
  338. (txn_id,),
  339. )
  340. )
  341. self.assertEquals(0, len(res))
  342. def test_get_oldest_unsent_txn_none(
  343. self,
  344. ) -> None:
  345. service = Mock(id=self.as_list[0]["id"])
  346. txn = self.get_success(self.store.get_oldest_unsent_txn(service))
  347. self.assertEquals(None, txn)
  348. def test_get_oldest_unsent_txn(self) -> None:
  349. service = Mock(id=self.as_list[0]["id"])
  350. events = [Mock(event_id="e1"), Mock(event_id="e2")]
  351. other_events = [Mock(event_id="e5"), Mock(event_id="e6")]
  352. # we aren't testing store._base stuff here, so mock this out
  353. # (ignore needed because Mypy won't allow us to assign to a method otherwise)
  354. self.store.get_events_as_list = Mock(return_value=make_awaitable(events)) # type: ignore[assignment]
  355. self.get_success(self._insert_txn(self.as_list[1]["id"], 9, other_events))
  356. self.get_success(self._insert_txn(service.id, 10, events))
  357. self.get_success(self._insert_txn(service.id, 11, other_events))
  358. self.get_success(self._insert_txn(service.id, 12, other_events))
  359. txn = self.get_success(self.store.get_oldest_unsent_txn(service))
  360. self.assertEquals(service, txn.service)
  361. self.assertEquals(10, txn.id)
  362. self.assertEquals(events, txn.events)
  363. def test_get_appservices_by_state_single(
  364. self,
  365. ) -> None:
  366. self.get_success(
  367. self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN)
  368. )
  369. self.get_success(
  370. self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP)
  371. )
  372. services = self.get_success(
  373. self.store.get_appservices_by_state(ApplicationServiceState.DOWN)
  374. )
  375. self.assertEquals(1, len(services))
  376. self.assertEquals(self.as_list[0]["id"], services[0].id)
  377. def test_get_appservices_by_state_multiple(
  378. self,
  379. ) -> None:
  380. self.get_success(
  381. self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN)
  382. )
  383. self.get_success(
  384. self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP)
  385. )
  386. self.get_success(
  387. self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN)
  388. )
  389. self.get_success(
  390. self._set_state(self.as_list[3]["id"], ApplicationServiceState.UP)
  391. )
  392. services = self.get_success(
  393. self.store.get_appservices_by_state(ApplicationServiceState.DOWN)
  394. )
  395. self.assertEquals(2, len(services))
  396. self.assertEquals(
  397. {self.as_list[2]["id"], self.as_list[0]["id"]},
  398. {services[0].id, services[1].id},
  399. )
  400. class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase):
  401. def prepare(
  402. self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
  403. ) -> None:
  404. self.service = Mock(id="foo")
  405. self.store = self.hs.get_datastore()
  406. self.get_success(
  407. self.store.set_appservice_state(self.service, ApplicationServiceState.UP)
  408. )
  409. def test_get_type_stream_id_for_appservice_no_value(self) -> None:
  410. value = self.get_success(
  411. self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
  412. )
  413. self.assertEquals(value, 0)
  414. value = self.get_success(
  415. self.store.get_type_stream_id_for_appservice(self.service, "presence")
  416. )
  417. self.assertEquals(value, 0)
  418. def test_get_type_stream_id_for_appservice_invalid_type(self) -> None:
  419. self.get_failure(
  420. self.store.get_type_stream_id_for_appservice(self.service, "foobar"),
  421. ValueError,
  422. )
  423. def test_set_type_stream_id_for_appservice(self) -> None:
  424. read_receipt_value = 1024
  425. self.get_success(
  426. self.store.set_type_stream_id_for_appservice(
  427. self.service, "read_receipt", read_receipt_value
  428. )
  429. )
  430. result = self.get_success(
  431. self.store.get_type_stream_id_for_appservice(self.service, "read_receipt")
  432. )
  433. self.assertEqual(result, read_receipt_value)
  434. self.get_success(
  435. self.store.set_type_stream_id_for_appservice(
  436. self.service, "presence", read_receipt_value
  437. )
  438. )
  439. result = self.get_success(
  440. self.store.get_type_stream_id_for_appservice(self.service, "presence")
  441. )
  442. self.assertEqual(result, read_receipt_value)
  443. def test_set_type_stream_id_for_appservice_invalid_type(self) -> None:
  444. self.get_failure(
  445. self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024),
  446. ValueError,
  447. )
  448. # required for ApplicationServiceTransactionStoreTestCase tests
  449. class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore):
  450. def __init__(self, database: DatabasePool, db_conn, hs) -> None:
  451. super().__init__(database, db_conn, hs)
  452. class ApplicationServiceStoreConfigTestCase(unittest.HomeserverTestCase):
  453. def _write_config(self, suffix, **kwargs) -> str:
  454. vals = {
  455. "id": "id" + suffix,
  456. "url": "url" + suffix,
  457. "as_token": "as_token" + suffix,
  458. "hs_token": "hs_token" + suffix,
  459. "sender_localpart": "sender_localpart" + suffix,
  460. "namespaces": {},
  461. }
  462. vals.update(kwargs)
  463. _, path = tempfile.mkstemp(prefix="as_config")
  464. with open(path, "w") as f:
  465. f.write(yaml.dump(vals))
  466. return path
  467. def test_unique_works(self) -> None:
  468. f1 = self._write_config(suffix="1")
  469. f2 = self._write_config(suffix="2")
  470. self.hs.config.appservice.app_service_config_files = [f1, f2]
  471. self.hs.config.caches.event_cache_size = 1
  472. database = self.hs.get_datastores().databases[0]
  473. ApplicationServiceStore(
  474. database,
  475. make_conn(database._database_config, database.engine, "test"),
  476. self.hs,
  477. )
  478. def test_duplicate_ids(self) -> None:
  479. f1 = self._write_config(id="id", suffix="1")
  480. f2 = self._write_config(id="id", suffix="2")
  481. self.hs.config.appservice.app_service_config_files = [f1, f2]
  482. self.hs.config.caches.event_cache_size = 1
  483. with self.assertRaises(ConfigError) as cm:
  484. database = self.hs.get_datastores().databases[0]
  485. ApplicationServiceStore(
  486. database,
  487. make_conn(database._database_config, database.engine, "test"),
  488. self.hs,
  489. )
  490. e = cm.exception
  491. self.assertIn(f1, str(e))
  492. self.assertIn(f2, str(e))
  493. self.assertIn("id", str(e))
  494. def test_duplicate_as_tokens(self) -> None:
  495. f1 = self._write_config(as_token="as_token", suffix="1")
  496. f2 = self._write_config(as_token="as_token", suffix="2")
  497. self.hs.config.appservice.app_service_config_files = [f1, f2]
  498. self.hs.config.caches.event_cache_size = 1
  499. with self.assertRaises(ConfigError) as cm:
  500. database = self.hs.get_datastores().databases[0]
  501. ApplicationServiceStore(
  502. database,
  503. make_conn(database._database_config, database.engine, "test"),
  504. self.hs,
  505. )
  506. e = cm.exception
  507. self.assertIn(f1, str(e))
  508. self.assertIn(f2, str(e))
  509. self.assertIn("as_token", str(e))