test_presence.py 44 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Optional, cast
  15. from unittest.mock import Mock, call
  16. from parameterized import parameterized
  17. from signedjson.key import generate_signing_key
  18. from twisted.test.proto_helpers import MemoryReactor
  19. from synapse.api.constants import EventTypes, Membership, PresenceState
  20. from synapse.api.presence import UserPresenceState
  21. from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
  22. from synapse.events.builder import EventBuilder
  23. from synapse.federation.sender import FederationSender
  24. from synapse.handlers.presence import (
  25. EXTERNAL_PROCESS_EXPIRY,
  26. FEDERATION_PING_INTERVAL,
  27. FEDERATION_TIMEOUT,
  28. IDLE_TIMER,
  29. LAST_ACTIVE_GRANULARITY,
  30. SYNC_ONLINE_TIMEOUT,
  31. handle_timeout,
  32. handle_update,
  33. )
  34. from synapse.rest import admin
  35. from synapse.rest.client import room
  36. from synapse.server import HomeServer
  37. from synapse.storage.database import LoggingDatabaseConnection
  38. from synapse.types import JsonDict, UserID, get_domain_from_id
  39. from synapse.util import Clock
  40. from tests import unittest
  41. from tests.replication._base import BaseMultiWorkerStreamTestCase
  42. class PresenceUpdateTestCase(unittest.HomeserverTestCase):
  43. servlets = [admin.register_servlets]
  44. def prepare(
  45. self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
  46. ) -> None:
  47. self.store = homeserver.get_datastores().main
  48. def test_offline_to_online(self) -> None:
  49. wheel_timer = Mock()
  50. user_id = "@foo:bar"
  51. now = 5000000
  52. prev_state = UserPresenceState.default(user_id)
  53. new_state = prev_state.copy_and_replace(
  54. state=PresenceState.ONLINE, last_active_ts=now
  55. )
  56. state, persist_and_notify, federation_ping = handle_update(
  57. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  58. )
  59. self.assertTrue(persist_and_notify)
  60. self.assertTrue(state.currently_active)
  61. self.assertEqual(new_state.state, state.state)
  62. self.assertEqual(new_state.status_msg, state.status_msg)
  63. self.assertEqual(state.last_federation_update_ts, now)
  64. self.assertEqual(wheel_timer.insert.call_count, 3)
  65. wheel_timer.insert.assert_has_calls(
  66. [
  67. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  68. call(
  69. now=now,
  70. obj=user_id,
  71. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  72. ),
  73. call(
  74. now=now,
  75. obj=user_id,
  76. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  77. ),
  78. ],
  79. any_order=True,
  80. )
  81. def test_online_to_online(self) -> None:
  82. wheel_timer = Mock()
  83. user_id = "@foo:bar"
  84. now = 5000000
  85. prev_state = UserPresenceState.default(user_id)
  86. prev_state = prev_state.copy_and_replace(
  87. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  88. )
  89. new_state = prev_state.copy_and_replace(
  90. state=PresenceState.ONLINE, last_active_ts=now
  91. )
  92. state, persist_and_notify, federation_ping = handle_update(
  93. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  94. )
  95. self.assertFalse(persist_and_notify)
  96. self.assertTrue(federation_ping)
  97. self.assertTrue(state.currently_active)
  98. self.assertEqual(new_state.state, state.state)
  99. self.assertEqual(new_state.status_msg, state.status_msg)
  100. self.assertEqual(state.last_federation_update_ts, now)
  101. self.assertEqual(wheel_timer.insert.call_count, 3)
  102. wheel_timer.insert.assert_has_calls(
  103. [
  104. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  105. call(
  106. now=now,
  107. obj=user_id,
  108. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  109. ),
  110. call(
  111. now=now,
  112. obj=user_id,
  113. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  114. ),
  115. ],
  116. any_order=True,
  117. )
  118. def test_online_to_online_last_active_noop(self) -> None:
  119. wheel_timer = Mock()
  120. user_id = "@foo:bar"
  121. now = 5000000
  122. prev_state = UserPresenceState.default(user_id)
  123. prev_state = prev_state.copy_and_replace(
  124. state=PresenceState.ONLINE,
  125. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 10,
  126. currently_active=True,
  127. )
  128. new_state = prev_state.copy_and_replace(
  129. state=PresenceState.ONLINE, last_active_ts=now
  130. )
  131. state, persist_and_notify, federation_ping = handle_update(
  132. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  133. )
  134. self.assertFalse(persist_and_notify)
  135. self.assertTrue(federation_ping)
  136. self.assertTrue(state.currently_active)
  137. self.assertEqual(new_state.state, state.state)
  138. self.assertEqual(new_state.status_msg, state.status_msg)
  139. self.assertEqual(state.last_federation_update_ts, now)
  140. self.assertEqual(wheel_timer.insert.call_count, 3)
  141. wheel_timer.insert.assert_has_calls(
  142. [
  143. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  144. call(
  145. now=now,
  146. obj=user_id,
  147. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  148. ),
  149. call(
  150. now=now,
  151. obj=user_id,
  152. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  153. ),
  154. ],
  155. any_order=True,
  156. )
  157. def test_online_to_online_last_active(self) -> None:
  158. wheel_timer = Mock()
  159. user_id = "@foo:bar"
  160. now = 5000000
  161. prev_state = UserPresenceState.default(user_id)
  162. prev_state = prev_state.copy_and_replace(
  163. state=PresenceState.ONLINE,
  164. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
  165. currently_active=True,
  166. )
  167. new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
  168. state, persist_and_notify, federation_ping = handle_update(
  169. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  170. )
  171. self.assertTrue(persist_and_notify)
  172. self.assertFalse(state.currently_active)
  173. self.assertEqual(new_state.state, state.state)
  174. self.assertEqual(new_state.status_msg, state.status_msg)
  175. self.assertEqual(state.last_federation_update_ts, now)
  176. self.assertEqual(wheel_timer.insert.call_count, 2)
  177. wheel_timer.insert.assert_has_calls(
  178. [
  179. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  180. call(
  181. now=now,
  182. obj=user_id,
  183. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  184. ),
  185. ],
  186. any_order=True,
  187. )
  188. def test_remote_ping_timer(self) -> None:
  189. wheel_timer = Mock()
  190. user_id = "@foo:bar"
  191. now = 5000000
  192. prev_state = UserPresenceState.default(user_id)
  193. prev_state = prev_state.copy_and_replace(
  194. state=PresenceState.ONLINE, last_active_ts=now
  195. )
  196. new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
  197. state, persist_and_notify, federation_ping = handle_update(
  198. prev_state, new_state, is_mine=False, wheel_timer=wheel_timer, now=now
  199. )
  200. self.assertFalse(persist_and_notify)
  201. self.assertFalse(federation_ping)
  202. self.assertFalse(state.currently_active)
  203. self.assertEqual(new_state.state, state.state)
  204. self.assertEqual(new_state.status_msg, state.status_msg)
  205. self.assertEqual(wheel_timer.insert.call_count, 1)
  206. wheel_timer.insert.assert_has_calls(
  207. [
  208. call(
  209. now=now,
  210. obj=user_id,
  211. then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
  212. )
  213. ],
  214. any_order=True,
  215. )
  216. def test_online_to_offline(self) -> None:
  217. wheel_timer = Mock()
  218. user_id = "@foo:bar"
  219. now = 5000000
  220. prev_state = UserPresenceState.default(user_id)
  221. prev_state = prev_state.copy_and_replace(
  222. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  223. )
  224. new_state = prev_state.copy_and_replace(state=PresenceState.OFFLINE)
  225. state, persist_and_notify, federation_ping = handle_update(
  226. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  227. )
  228. self.assertTrue(persist_and_notify)
  229. self.assertEqual(new_state.state, state.state)
  230. self.assertEqual(state.last_federation_update_ts, now)
  231. self.assertEqual(wheel_timer.insert.call_count, 0)
  232. def test_online_to_idle(self) -> None:
  233. wheel_timer = Mock()
  234. user_id = "@foo:bar"
  235. now = 5000000
  236. prev_state = UserPresenceState.default(user_id)
  237. prev_state = prev_state.copy_and_replace(
  238. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  239. )
  240. new_state = prev_state.copy_and_replace(state=PresenceState.UNAVAILABLE)
  241. state, persist_and_notify, federation_ping = handle_update(
  242. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  243. )
  244. self.assertTrue(persist_and_notify)
  245. self.assertEqual(new_state.state, state.state)
  246. self.assertEqual(state.last_federation_update_ts, now)
  247. self.assertEqual(new_state.state, state.state)
  248. self.assertEqual(new_state.status_msg, state.status_msg)
  249. self.assertEqual(wheel_timer.insert.call_count, 1)
  250. wheel_timer.insert.assert_has_calls(
  251. [
  252. call(
  253. now=now,
  254. obj=user_id,
  255. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  256. )
  257. ],
  258. any_order=True,
  259. )
  260. def test_persisting_presence_updates(self) -> None:
  261. """Tests that the latest presence state for each user is persisted correctly"""
  262. # Create some test users and presence states for them
  263. presence_states = []
  264. for i in range(5):
  265. user_id = self.register_user(f"user_{i}", "password")
  266. presence_state = UserPresenceState(
  267. user_id=user_id,
  268. state="online",
  269. last_active_ts=1,
  270. last_federation_update_ts=1,
  271. last_user_sync_ts=1,
  272. status_msg="I'm online!",
  273. currently_active=True,
  274. )
  275. presence_states.append(presence_state)
  276. # Persist these presence updates to the database
  277. self.get_success(self.store.update_presence(presence_states))
  278. # Check that each update is present in the database
  279. db_presence_states_raw = self.get_success(
  280. self.store.get_all_presence_updates(
  281. instance_name="master",
  282. last_id=0,
  283. current_id=len(presence_states) + 1,
  284. limit=len(presence_states),
  285. )
  286. )
  287. # Extract presence update user ID and state information into lists of tuples
  288. db_presence_states = [(ps[0], ps[1]) for _, ps in db_presence_states_raw[0]]
  289. presence_states_compare = [(ps.user_id, ps.state) for ps in presence_states]
  290. # Compare what we put into the storage with what we got out.
  291. # They should be identical.
  292. self.assertEqual(presence_states_compare, db_presence_states)
  293. class PresenceTimeoutTestCase(unittest.TestCase):
  294. """Tests different timers and that the timer does not change `status_msg` of user."""
  295. def test_idle_timer(self) -> None:
  296. user_id = "@foo:bar"
  297. status_msg = "I'm here!"
  298. now = 5000000
  299. state = UserPresenceState.default(user_id)
  300. state = state.copy_and_replace(
  301. state=PresenceState.ONLINE,
  302. last_active_ts=now - IDLE_TIMER - 1,
  303. last_user_sync_ts=now,
  304. status_msg=status_msg,
  305. )
  306. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  307. self.assertIsNotNone(new_state)
  308. assert new_state is not None
  309. self.assertEqual(new_state.state, PresenceState.UNAVAILABLE)
  310. self.assertEqual(new_state.status_msg, status_msg)
  311. def test_busy_no_idle(self) -> None:
  312. """
  313. Tests that a user setting their presence to busy but idling doesn't turn their
  314. presence state into unavailable.
  315. """
  316. user_id = "@foo:bar"
  317. status_msg = "I'm here!"
  318. now = 5000000
  319. state = UserPresenceState.default(user_id)
  320. state = state.copy_and_replace(
  321. state=PresenceState.BUSY,
  322. last_active_ts=now - IDLE_TIMER - 1,
  323. last_user_sync_ts=now,
  324. status_msg=status_msg,
  325. )
  326. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  327. self.assertIsNotNone(new_state)
  328. assert new_state is not None
  329. self.assertEqual(new_state.state, PresenceState.BUSY)
  330. self.assertEqual(new_state.status_msg, status_msg)
  331. def test_sync_timeout(self) -> None:
  332. user_id = "@foo:bar"
  333. status_msg = "I'm here!"
  334. now = 5000000
  335. state = UserPresenceState.default(user_id)
  336. state = state.copy_and_replace(
  337. state=PresenceState.ONLINE,
  338. last_active_ts=0,
  339. last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  340. status_msg=status_msg,
  341. )
  342. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  343. self.assertIsNotNone(new_state)
  344. assert new_state is not None
  345. self.assertEqual(new_state.state, PresenceState.OFFLINE)
  346. self.assertEqual(new_state.status_msg, status_msg)
  347. def test_sync_online(self) -> None:
  348. user_id = "@foo:bar"
  349. status_msg = "I'm here!"
  350. now = 5000000
  351. state = UserPresenceState.default(user_id)
  352. state = state.copy_and_replace(
  353. state=PresenceState.ONLINE,
  354. last_active_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  355. last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  356. status_msg=status_msg,
  357. )
  358. new_state = handle_timeout(
  359. state, is_mine=True, syncing_user_ids={user_id}, now=now
  360. )
  361. self.assertIsNotNone(new_state)
  362. assert new_state is not None
  363. self.assertEqual(new_state.state, PresenceState.ONLINE)
  364. self.assertEqual(new_state.status_msg, status_msg)
  365. def test_federation_ping(self) -> None:
  366. user_id = "@foo:bar"
  367. status_msg = "I'm here!"
  368. now = 5000000
  369. state = UserPresenceState.default(user_id)
  370. state = state.copy_and_replace(
  371. state=PresenceState.ONLINE,
  372. last_active_ts=now,
  373. last_user_sync_ts=now,
  374. last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
  375. status_msg=status_msg,
  376. )
  377. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  378. self.assertIsNotNone(new_state)
  379. self.assertEqual(state, new_state)
  380. def test_no_timeout(self) -> None:
  381. user_id = "@foo:bar"
  382. now = 5000000
  383. state = UserPresenceState.default(user_id)
  384. state = state.copy_and_replace(
  385. state=PresenceState.ONLINE,
  386. last_active_ts=now,
  387. last_user_sync_ts=now,
  388. last_federation_update_ts=now,
  389. )
  390. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  391. self.assertIsNone(new_state)
  392. def test_federation_timeout(self) -> None:
  393. user_id = "@foo:bar"
  394. status_msg = "I'm here!"
  395. now = 5000000
  396. state = UserPresenceState.default(user_id)
  397. state = state.copy_and_replace(
  398. state=PresenceState.ONLINE,
  399. last_active_ts=now,
  400. last_user_sync_ts=now,
  401. last_federation_update_ts=now - FEDERATION_TIMEOUT - 1,
  402. status_msg=status_msg,
  403. )
  404. new_state = handle_timeout(
  405. state, is_mine=False, syncing_user_ids=set(), now=now
  406. )
  407. self.assertIsNotNone(new_state)
  408. assert new_state is not None
  409. self.assertEqual(new_state.state, PresenceState.OFFLINE)
  410. self.assertEqual(new_state.status_msg, status_msg)
  411. def test_last_active(self) -> None:
  412. user_id = "@foo:bar"
  413. status_msg = "I'm here!"
  414. now = 5000000
  415. state = UserPresenceState.default(user_id)
  416. state = state.copy_and_replace(
  417. state=PresenceState.ONLINE,
  418. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
  419. last_user_sync_ts=now,
  420. last_federation_update_ts=now,
  421. status_msg=status_msg,
  422. )
  423. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  424. self.assertIsNotNone(new_state)
  425. self.assertEqual(state, new_state)
  426. class PresenceHandlerInitTestCase(unittest.HomeserverTestCase):
  427. def default_config(self) -> JsonDict:
  428. config = super().default_config()
  429. # Disable background tasks on this worker so that the PresenceHandler isn't
  430. # loaded until we request it.
  431. config["run_background_tasks_on"] = "other"
  432. return config
  433. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  434. self.user_id = f"@test:{self.hs.config.server.server_name}"
  435. # Move the reactor to the initial time.
  436. self.reactor.advance(1000)
  437. now = self.clock.time_msec()
  438. main_store = hs.get_datastores().main
  439. self.get_success(
  440. main_store.update_presence(
  441. [
  442. UserPresenceState(
  443. user_id=self.user_id,
  444. state=PresenceState.ONLINE,
  445. last_active_ts=now,
  446. last_federation_update_ts=now,
  447. last_user_sync_ts=now,
  448. status_msg=None,
  449. currently_active=True,
  450. )
  451. ]
  452. )
  453. )
  454. # Regenerate the preloaded presence information on PresenceStore.
  455. def refill_presence(db_conn: LoggingDatabaseConnection) -> None:
  456. main_store._presence_on_startup = main_store._get_active_presence(db_conn)
  457. self.get_success(main_store.db_pool.runWithConnection(refill_presence))
  458. def test_restored_presence_idles(self) -> None:
  459. """The presence state restored from the database should not persist forever."""
  460. # Get the handler (which kicks off a bunch of timers).
  461. presence_handler = self.hs.get_presence_handler()
  462. # Assert the user is online.
  463. state = self.get_success(
  464. presence_handler.get_state(UserID.from_string(self.user_id))
  465. )
  466. self.assertEqual(state.state, PresenceState.ONLINE)
  467. # Advance such that the user should timeout.
  468. self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
  469. self.reactor.pump([5])
  470. # Check that the user is now offline.
  471. state = self.get_success(
  472. presence_handler.get_state(UserID.from_string(self.user_id))
  473. )
  474. self.assertEqual(state.state, PresenceState.OFFLINE)
  475. @parameterized.expand(
  476. [
  477. (PresenceState.BUSY, PresenceState.BUSY),
  478. (PresenceState.ONLINE, PresenceState.ONLINE),
  479. (PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE),
  480. # Offline syncs don't update the state.
  481. (PresenceState.OFFLINE, PresenceState.ONLINE),
  482. ]
  483. )
  484. @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
  485. def test_restored_presence_online_after_sync(
  486. self, sync_state: str, expected_state: str
  487. ) -> None:
  488. """
  489. The presence state restored from the database should be overridden with sync after a timeout.
  490. Args:
  491. sync_state: The presence state of the new sync.
  492. expected_state: The expected presence right after the sync.
  493. """
  494. # Get the handler (which kicks off a bunch of timers).
  495. presence_handler = self.hs.get_presence_handler()
  496. # Assert the user is online, as restored.
  497. state = self.get_success(
  498. presence_handler.get_state(UserID.from_string(self.user_id))
  499. )
  500. self.assertEqual(state.state, PresenceState.ONLINE)
  501. # Advance slightly and sync.
  502. self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
  503. self.get_success(
  504. presence_handler.user_syncing(
  505. self.user_id, sync_state != PresenceState.OFFLINE, sync_state
  506. )
  507. )
  508. # Assert the user is in the expected state.
  509. state = self.get_success(
  510. presence_handler.get_state(UserID.from_string(self.user_id))
  511. )
  512. self.assertEqual(state.state, expected_state)
  513. # Advance such that the user's preloaded data times out, but not the new sync.
  514. self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
  515. self.reactor.pump([5])
  516. # Check that the user is in the sync state (as the client is currently syncing still).
  517. state = self.get_success(
  518. presence_handler.get_state(UserID.from_string(self.user_id))
  519. )
  520. self.assertEqual(state.state, sync_state)
  521. class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
  522. user_id = "@test:server"
  523. user_id_obj = UserID.from_string(user_id)
  524. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  525. self.presence_handler = hs.get_presence_handler()
  526. self.clock = hs.get_clock()
  527. def test_external_process_timeout(self) -> None:
  528. """Test that if an external process doesn't update the records for a while
  529. we time out their syncing users presence.
  530. """
  531. process_id = "1"
  532. # Notify handler that a user is now syncing.
  533. self.get_success(
  534. self.presence_handler.update_external_syncs_row(
  535. process_id, self.user_id, True, self.clock.time_msec()
  536. )
  537. )
  538. # Check that if we wait a while without telling the handler the user has
  539. # stopped syncing that their presence state doesn't get timed out.
  540. self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)
  541. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  542. self.assertEqual(state.state, PresenceState.ONLINE)
  543. # Check that if the external process timeout fires, then the syncing
  544. # user gets timed out
  545. self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)
  546. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  547. self.assertEqual(state.state, PresenceState.OFFLINE)
  548. def test_user_goes_offline_by_timeout_status_msg_remain(self) -> None:
  549. """Test that if a user doesn't update the records for a while
  550. users presence goes `OFFLINE` because of timeout and `status_msg` remains.
  551. """
  552. status_msg = "I'm here!"
  553. # Mark user as online
  554. self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
  555. # Check that if we wait a while without telling the handler the user has
  556. # stopped syncing that their presence state doesn't get timed out.
  557. self.reactor.advance(SYNC_ONLINE_TIMEOUT / 2)
  558. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  559. self.assertEqual(state.state, PresenceState.ONLINE)
  560. self.assertEqual(state.status_msg, status_msg)
  561. # Check that if the timeout fires, then the syncing user gets timed out
  562. self.reactor.advance(SYNC_ONLINE_TIMEOUT)
  563. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  564. # status_msg should remain even after going offline
  565. self.assertEqual(state.state, PresenceState.OFFLINE)
  566. self.assertEqual(state.status_msg, status_msg)
  567. def test_user_goes_offline_manually_with_no_status_msg(self) -> None:
  568. """Test that if a user change presence manually to `OFFLINE`
  569. and no status is set, that `status_msg` is `None`.
  570. """
  571. status_msg = "I'm here!"
  572. # Mark user as online
  573. self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
  574. # Mark user as offline
  575. self.get_success(
  576. self.presence_handler.set_state(
  577. self.user_id_obj, {"presence": PresenceState.OFFLINE}
  578. )
  579. )
  580. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  581. self.assertEqual(state.state, PresenceState.OFFLINE)
  582. self.assertEqual(state.status_msg, None)
  583. def test_user_goes_offline_manually_with_status_msg(self) -> None:
  584. """Test that if a user change presence manually to `OFFLINE`
  585. and a status is set, that `status_msg` appears.
  586. """
  587. status_msg = "I'm here!"
  588. # Mark user as online
  589. self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
  590. # Mark user as offline
  591. self._set_presencestate_with_status_msg(PresenceState.OFFLINE, "And now here.")
  592. def test_user_reset_online_with_no_status(self) -> None:
  593. """Test that if a user set again the presence manually
  594. and no status is set, that `status_msg` is `None`.
  595. """
  596. status_msg = "I'm here!"
  597. # Mark user as online
  598. self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
  599. # Mark user as online again
  600. self.get_success(
  601. self.presence_handler.set_state(
  602. self.user_id_obj, {"presence": PresenceState.ONLINE}
  603. )
  604. )
  605. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  606. # status_msg should remain even after going offline
  607. self.assertEqual(state.state, PresenceState.ONLINE)
  608. self.assertEqual(state.status_msg, None)
  609. def test_set_presence_with_status_msg_none(self) -> None:
  610. """Test that if a user set again the presence manually
  611. and status is `None`, that `status_msg` is `None`.
  612. """
  613. status_msg = "I'm here!"
  614. # Mark user as online
  615. self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
  616. # Mark user as online and `status_msg = None`
  617. self._set_presencestate_with_status_msg(PresenceState.ONLINE, None)
  618. def test_set_presence_from_syncing_not_set(self) -> None:
  619. """Test that presence is not set by syncing if affect_presence is false"""
  620. status_msg = "I'm here!"
  621. self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
  622. self.get_success(
  623. self.presence_handler.user_syncing(
  624. self.user_id, False, PresenceState.ONLINE
  625. )
  626. )
  627. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  628. # we should still be unavailable
  629. self.assertEqual(state.state, PresenceState.UNAVAILABLE)
  630. # and status message should still be the same
  631. self.assertEqual(state.status_msg, status_msg)
  632. def test_set_presence_from_syncing_is_set(self) -> None:
  633. """Test that presence is set by syncing if affect_presence is true"""
  634. status_msg = "I'm here!"
  635. self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
  636. self.get_success(
  637. self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE)
  638. )
  639. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  640. # we should now be online
  641. self.assertEqual(state.state, PresenceState.ONLINE)
  642. def test_set_presence_from_syncing_keeps_status(self) -> None:
  643. """Test that presence set by syncing retains status message"""
  644. status_msg = "I'm here!"
  645. self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
  646. self.get_success(
  647. self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE)
  648. )
  649. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  650. # our status message should be the same as it was before
  651. self.assertEqual(state.status_msg, status_msg)
  652. @parameterized.expand([(False,), (True,)])
  653. @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
  654. def test_set_presence_from_syncing_keeps_busy(
  655. self, test_with_workers: bool
  656. ) -> None:
  657. """Test that presence set by syncing doesn't affect busy status
  658. Args:
  659. test_with_workers: If True, check the presence state of the user by calling
  660. /sync against a worker, rather than the main process.
  661. """
  662. status_msg = "I'm busy!"
  663. # By default, we call /sync against the main process.
  664. worker_to_sync_against = self.hs
  665. if test_with_workers:
  666. # Create a worker and use it to handle /sync traffic instead.
  667. # This is used to test that presence changes get replicated from workers
  668. # to the main process correctly.
  669. worker_to_sync_against = self.make_worker_hs(
  670. "synapse.app.generic_worker", {"worker_name": "presence_writer"}
  671. )
  672. # Set presence to BUSY
  673. self._set_presencestate_with_status_msg(PresenceState.BUSY, status_msg)
  674. # Perform a sync with a presence state other than busy. This should NOT change
  675. # our presence status; we only change from busy if we explicitly set it via
  676. # /presence/*.
  677. self.get_success(
  678. worker_to_sync_against.get_presence_handler().user_syncing(
  679. self.user_id, True, PresenceState.ONLINE
  680. )
  681. )
  682. # Check against the main process that the user's presence did not change.
  683. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  684. # we should still be busy
  685. self.assertEqual(state.state, PresenceState.BUSY)
  686. def _set_presencestate_with_status_msg(
  687. self, state: str, status_msg: Optional[str]
  688. ) -> None:
  689. """Set a PresenceState and status_msg and check the result.
  690. Args:
  691. state: The new PresenceState.
  692. status_msg: Status message that is to be set.
  693. """
  694. self.get_success(
  695. self.presence_handler.set_state(
  696. self.user_id_obj,
  697. {"presence": state, "status_msg": status_msg},
  698. )
  699. )
  700. new_state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  701. self.assertEqual(new_state.state, state)
  702. self.assertEqual(new_state.status_msg, status_msg)
  703. class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
  704. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  705. self.presence_handler = hs.get_presence_handler()
  706. self.clock = hs.get_clock()
  707. self.instance_name = hs.get_instance_name()
  708. self.queue = self.presence_handler.get_federation_queue()
  709. def test_send_and_get(self) -> None:
  710. state1 = UserPresenceState.default("@user1:test")
  711. state2 = UserPresenceState.default("@user2:test")
  712. state3 = UserPresenceState.default("@user3:test")
  713. prev_token = self.queue.get_current_token(self.instance_name)
  714. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  715. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  716. now_token = self.queue.get_current_token(self.instance_name)
  717. rows, upto_token, limited = self.get_success(
  718. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  719. )
  720. self.assertEqual(upto_token, now_token)
  721. self.assertFalse(limited)
  722. expected_rows = [
  723. (1, ("dest1", "@user1:test")),
  724. (1, ("dest2", "@user1:test")),
  725. (1, ("dest1", "@user2:test")),
  726. (1, ("dest2", "@user2:test")),
  727. (2, ("dest3", "@user3:test")),
  728. ]
  729. self.assertCountEqual(rows, expected_rows)
  730. now_token = self.queue.get_current_token(self.instance_name)
  731. rows, upto_token, limited = self.get_success(
  732. self.queue.get_replication_rows("master", upto_token, now_token, 10)
  733. )
  734. self.assertEqual(upto_token, now_token)
  735. self.assertFalse(limited)
  736. self.assertCountEqual(rows, [])
  737. def test_send_and_get_split(self) -> None:
  738. state1 = UserPresenceState.default("@user1:test")
  739. state2 = UserPresenceState.default("@user2:test")
  740. state3 = UserPresenceState.default("@user3:test")
  741. prev_token = self.queue.get_current_token(self.instance_name)
  742. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  743. now_token = self.queue.get_current_token(self.instance_name)
  744. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  745. rows, upto_token, limited = self.get_success(
  746. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  747. )
  748. self.assertEqual(upto_token, now_token)
  749. self.assertFalse(limited)
  750. expected_rows = [
  751. (1, ("dest1", "@user1:test")),
  752. (1, ("dest2", "@user1:test")),
  753. (1, ("dest1", "@user2:test")),
  754. (1, ("dest2", "@user2:test")),
  755. ]
  756. self.assertCountEqual(rows, expected_rows)
  757. now_token = self.queue.get_current_token(self.instance_name)
  758. rows, upto_token, limited = self.get_success(
  759. self.queue.get_replication_rows("master", upto_token, now_token, 10)
  760. )
  761. self.assertEqual(upto_token, now_token)
  762. self.assertFalse(limited)
  763. expected_rows = [
  764. (2, ("dest3", "@user3:test")),
  765. ]
  766. self.assertCountEqual(rows, expected_rows)
  767. def test_clear_queue_all(self) -> None:
  768. state1 = UserPresenceState.default("@user1:test")
  769. state2 = UserPresenceState.default("@user2:test")
  770. state3 = UserPresenceState.default("@user3:test")
  771. prev_token = self.queue.get_current_token(self.instance_name)
  772. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  773. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  774. self.reactor.advance(10 * 60 * 1000)
  775. now_token = self.queue.get_current_token(self.instance_name)
  776. rows, upto_token, limited = self.get_success(
  777. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  778. )
  779. self.assertEqual(upto_token, now_token)
  780. self.assertFalse(limited)
  781. self.assertCountEqual(rows, [])
  782. prev_token = self.queue.get_current_token(self.instance_name)
  783. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  784. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  785. now_token = self.queue.get_current_token(self.instance_name)
  786. rows, upto_token, limited = self.get_success(
  787. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  788. )
  789. self.assertEqual(upto_token, now_token)
  790. self.assertFalse(limited)
  791. expected_rows = [
  792. (3, ("dest1", "@user1:test")),
  793. (3, ("dest2", "@user1:test")),
  794. (3, ("dest1", "@user2:test")),
  795. (3, ("dest2", "@user2:test")),
  796. (4, ("dest3", "@user3:test")),
  797. ]
  798. self.assertCountEqual(rows, expected_rows)
  799. def test_partially_clear_queue(self) -> None:
  800. state1 = UserPresenceState.default("@user1:test")
  801. state2 = UserPresenceState.default("@user2:test")
  802. state3 = UserPresenceState.default("@user3:test")
  803. prev_token = self.queue.get_current_token(self.instance_name)
  804. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  805. self.reactor.advance(2 * 60 * 1000)
  806. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  807. self.reactor.advance(4 * 60 * 1000)
  808. now_token = self.queue.get_current_token(self.instance_name)
  809. rows, upto_token, limited = self.get_success(
  810. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  811. )
  812. self.assertEqual(upto_token, now_token)
  813. self.assertFalse(limited)
  814. self.assertCountEqual(rows, [])
  815. prev_token = self.queue.get_current_token(self.instance_name)
  816. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  817. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  818. now_token = self.queue.get_current_token(self.instance_name)
  819. rows, upto_token, limited = self.get_success(
  820. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  821. )
  822. self.assertEqual(upto_token, now_token)
  823. self.assertFalse(limited)
  824. expected_rows = [
  825. (3, ("dest1", "@user1:test")),
  826. (3, ("dest2", "@user1:test")),
  827. (3, ("dest1", "@user2:test")),
  828. (3, ("dest2", "@user2:test")),
  829. (4, ("dest3", "@user3:test")),
  830. ]
  831. self.assertCountEqual(rows, expected_rows)
  832. class PresenceJoinTestCase(unittest.HomeserverTestCase):
  833. """Tests remote servers get told about presence of users in the room when
  834. they join and when new local users join.
  835. """
  836. user_id = "@test:server"
  837. servlets = [room.register_servlets]
  838. def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
  839. hs = self.setup_test_homeserver(
  840. "server",
  841. federation_sender=Mock(spec=FederationSender),
  842. )
  843. return hs
  844. def default_config(self) -> JsonDict:
  845. config = super().default_config()
  846. # Enable federation sending on the main process.
  847. config["federation_sender_instances"] = None
  848. return config
  849. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  850. self.federation_sender = cast(Mock, hs.get_federation_sender())
  851. self.event_builder_factory = hs.get_event_builder_factory()
  852. self.federation_event_handler = hs.get_federation_event_handler()
  853. self.presence_handler = hs.get_presence_handler()
  854. # self.event_builder_for_2 = EventBuilderFactory(hs)
  855. # self.event_builder_for_2.hostname = "test2"
  856. self.store = hs.get_datastores().main
  857. self.state = hs.get_state_handler()
  858. self._event_auth_handler = hs.get_event_auth_handler()
  859. # We don't actually check signatures in tests, so lets just create a
  860. # random key to use.
  861. self.random_signing_key = generate_signing_key("ver")
  862. def test_remote_joins(self) -> None:
  863. # We advance time to something that isn't 0, as we use 0 as a special
  864. # value.
  865. self.reactor.advance(1000000000000)
  866. # Create a room with two local users
  867. room_id = self.helper.create_room_as(self.user_id)
  868. self.helper.join(room_id, "@test2:server")
  869. # Mark test2 as online, test will be offline with a last_active of 0
  870. self.get_success(
  871. self.presence_handler.set_state(
  872. UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}
  873. )
  874. )
  875. self.reactor.pump([0]) # Wait for presence updates to be handled
  876. #
  877. # Test that a new server gets told about existing presence
  878. #
  879. self.federation_sender.reset_mock()
  880. # Add a new remote server to the room
  881. self._add_new_user(room_id, "@alice:server2")
  882. # When new server is joined we send it the local users presence states.
  883. # We expect to only see user @test2:server, as @test:server is offline
  884. # and has a zero last_active_ts
  885. expected_state = self.get_success(
  886. self.presence_handler.current_state_for_user("@test2:server")
  887. )
  888. self.assertEqual(expected_state.state, PresenceState.ONLINE)
  889. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  890. destinations={"server2"}, states=[expected_state]
  891. )
  892. #
  893. # Test that only the new server gets sent presence and not existing servers
  894. #
  895. self.federation_sender.reset_mock()
  896. self._add_new_user(room_id, "@bob:server3")
  897. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  898. destinations={"server3"}, states=[expected_state]
  899. )
  900. def test_remote_gets_presence_when_local_user_joins(self) -> None:
  901. # We advance time to something that isn't 0, as we use 0 as a special
  902. # value.
  903. self.reactor.advance(1000000000000)
  904. # Create a room with one local users
  905. room_id = self.helper.create_room_as(self.user_id)
  906. # Mark test as online
  907. self.get_success(
  908. self.presence_handler.set_state(
  909. UserID.from_string("@test:server"), {"presence": PresenceState.ONLINE}
  910. )
  911. )
  912. # Mark test2 as online, test will be offline with a last_active of 0.
  913. # Note we don't join them to the room yet
  914. self.get_success(
  915. self.presence_handler.set_state(
  916. UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}
  917. )
  918. )
  919. # Add servers to the room
  920. self._add_new_user(room_id, "@alice:server2")
  921. self._add_new_user(room_id, "@bob:server3")
  922. self.reactor.pump([0]) # Wait for presence updates to be handled
  923. #
  924. # Test that when a local join happens remote servers get told about it
  925. #
  926. self.federation_sender.reset_mock()
  927. # Join local user to room
  928. self.helper.join(room_id, "@test2:server")
  929. self.reactor.pump([0]) # Wait for presence updates to be handled
  930. # We expect to only send test2 presence to server2 and server3
  931. expected_state = self.get_success(
  932. self.presence_handler.current_state_for_user("@test2:server")
  933. )
  934. self.assertEqual(expected_state.state, PresenceState.ONLINE)
  935. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  936. destinations={"server2", "server3"}, states=[expected_state]
  937. )
  938. def _add_new_user(self, room_id: str, user_id: str) -> None:
  939. """Add new user to the room by creating an event and poking the federation API."""
  940. hostname = get_domain_from_id(user_id)
  941. room_version = self.get_success(self.store.get_room_version_id(room_id))
  942. builder = EventBuilder(
  943. state=self.state,
  944. event_auth_handler=self._event_auth_handler,
  945. store=self.store,
  946. clock=self.clock,
  947. hostname=hostname,
  948. signing_key=self.random_signing_key,
  949. room_version=KNOWN_ROOM_VERSIONS[room_version],
  950. room_id=room_id,
  951. type=EventTypes.Member,
  952. sender=user_id,
  953. state_key=user_id,
  954. content={"membership": Membership.JOIN},
  955. )
  956. prev_event_ids = self.get_success(
  957. self.store.get_latest_event_ids_in_room(room_id)
  958. )
  959. event = self.get_success(
  960. builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
  961. )
  962. self.get_success(self.federation_event_handler.on_receive_pdu(hostname, event))
  963. # Check that it was successfully persisted.
  964. self.get_success(self.store.get_event(event.event_id))
  965. self.get_success(self.store.get_event(event.event_id))