test_presence.py 46 KB


  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Optional, cast
  15. from unittest.mock import Mock, call
  16. from parameterized import parameterized
  17. from signedjson.key import generate_signing_key
  18. from twisted.test.proto_helpers import MemoryReactor
  19. from synapse.api.constants import EventTypes, Membership, PresenceState
  20. from synapse.api.presence import UserPresenceState
  21. from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
  22. from synapse.events.builder import EventBuilder
  23. from synapse.federation.sender import FederationSender
  24. from synapse.handlers.presence import (
  25. EXTERNAL_PROCESS_EXPIRY,
  26. FEDERATION_PING_INTERVAL,
  27. FEDERATION_TIMEOUT,
  28. IDLE_TIMER,
  29. LAST_ACTIVE_GRANULARITY,
  30. SYNC_ONLINE_TIMEOUT,
  31. handle_timeout,
  32. handle_update,
  33. )
  34. from synapse.rest import admin
  35. from synapse.rest.client import room
  36. from synapse.server import HomeServer
  37. from synapse.storage.database import LoggingDatabaseConnection
  38. from synapse.types import JsonDict, UserID, get_domain_from_id
  39. from synapse.util import Clock
  40. from tests import unittest
  41. from tests.replication._base import BaseMultiWorkerStreamTestCase
  42. class PresenceUpdateTestCase(unittest.HomeserverTestCase):
  43. servlets = [admin.register_servlets]
  44. def prepare(
  45. self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
  46. ) -> None:
  47. self.store = homeserver.get_datastores().main
  48. def test_offline_to_online(self) -> None:
  49. wheel_timer = Mock()
  50. user_id = "@foo:bar"
  51. now = 5000000
  52. prev_state = UserPresenceState.default(user_id)
  53. new_state = prev_state.copy_and_replace(
  54. state=PresenceState.ONLINE, last_active_ts=now
  55. )
  56. state, persist_and_notify, federation_ping = handle_update(
  57. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  58. )
  59. self.assertTrue(persist_and_notify)
  60. self.assertTrue(state.currently_active)
  61. self.assertEqual(new_state.state, state.state)
  62. self.assertEqual(new_state.status_msg, state.status_msg)
  63. self.assertEqual(state.last_federation_update_ts, now)
  64. self.assertEqual(wheel_timer.insert.call_count, 3)
  65. wheel_timer.insert.assert_has_calls(
  66. [
  67. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  68. call(
  69. now=now,
  70. obj=user_id,
  71. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  72. ),
  73. call(
  74. now=now,
  75. obj=user_id,
  76. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  77. ),
  78. ],
  79. any_order=True,
  80. )
  81. def test_online_to_online(self) -> None:
  82. wheel_timer = Mock()
  83. user_id = "@foo:bar"
  84. now = 5000000
  85. prev_state = UserPresenceState.default(user_id)
  86. prev_state = prev_state.copy_and_replace(
  87. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  88. )
  89. new_state = prev_state.copy_and_replace(
  90. state=PresenceState.ONLINE, last_active_ts=now
  91. )
  92. state, persist_and_notify, federation_ping = handle_update(
  93. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  94. )
  95. self.assertFalse(persist_and_notify)
  96. self.assertTrue(federation_ping)
  97. self.assertTrue(state.currently_active)
  98. self.assertEqual(new_state.state, state.state)
  99. self.assertEqual(new_state.status_msg, state.status_msg)
  100. self.assertEqual(state.last_federation_update_ts, now)
  101. self.assertEqual(wheel_timer.insert.call_count, 3)
  102. wheel_timer.insert.assert_has_calls(
  103. [
  104. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  105. call(
  106. now=now,
  107. obj=user_id,
  108. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  109. ),
  110. call(
  111. now=now,
  112. obj=user_id,
  113. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  114. ),
  115. ],
  116. any_order=True,
  117. )
  118. def test_online_to_online_last_active_noop(self) -> None:
  119. wheel_timer = Mock()
  120. user_id = "@foo:bar"
  121. now = 5000000
  122. prev_state = UserPresenceState.default(user_id)
  123. prev_state = prev_state.copy_and_replace(
  124. state=PresenceState.ONLINE,
  125. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 10,
  126. currently_active=True,
  127. )
  128. new_state = prev_state.copy_and_replace(
  129. state=PresenceState.ONLINE, last_active_ts=now
  130. )
  131. state, persist_and_notify, federation_ping = handle_update(
  132. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  133. )
  134. self.assertFalse(persist_and_notify)
  135. self.assertTrue(federation_ping)
  136. self.assertTrue(state.currently_active)
  137. self.assertEqual(new_state.state, state.state)
  138. self.assertEqual(new_state.status_msg, state.status_msg)
  139. self.assertEqual(state.last_federation_update_ts, now)
  140. self.assertEqual(wheel_timer.insert.call_count, 3)
  141. wheel_timer.insert.assert_has_calls(
  142. [
  143. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  144. call(
  145. now=now,
  146. obj=user_id,
  147. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  148. ),
  149. call(
  150. now=now,
  151. obj=user_id,
  152. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  153. ),
  154. ],
  155. any_order=True,
  156. )
  157. def test_online_to_online_last_active(self) -> None:
  158. wheel_timer = Mock()
  159. user_id = "@foo:bar"
  160. now = 5000000
  161. prev_state = UserPresenceState.default(user_id)
  162. prev_state = prev_state.copy_and_replace(
  163. state=PresenceState.ONLINE,
  164. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
  165. currently_active=True,
  166. )
  167. new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
  168. state, persist_and_notify, federation_ping = handle_update(
  169. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  170. )
  171. self.assertTrue(persist_and_notify)
  172. self.assertFalse(state.currently_active)
  173. self.assertEqual(new_state.state, state.state)
  174. self.assertEqual(new_state.status_msg, state.status_msg)
  175. self.assertEqual(state.last_federation_update_ts, now)
  176. self.assertEqual(wheel_timer.insert.call_count, 2)
  177. wheel_timer.insert.assert_has_calls(
  178. [
  179. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  180. call(
  181. now=now,
  182. obj=user_id,
  183. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  184. ),
  185. ],
  186. any_order=True,
  187. )
  188. def test_remote_ping_timer(self) -> None:
  189. wheel_timer = Mock()
  190. user_id = "@foo:bar"
  191. now = 5000000
  192. prev_state = UserPresenceState.default(user_id)
  193. prev_state = prev_state.copy_and_replace(
  194. state=PresenceState.ONLINE, last_active_ts=now
  195. )
  196. new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
  197. state, persist_and_notify, federation_ping = handle_update(
  198. prev_state, new_state, is_mine=False, wheel_timer=wheel_timer, now=now
  199. )
  200. self.assertFalse(persist_and_notify)
  201. self.assertFalse(federation_ping)
  202. self.assertFalse(state.currently_active)
  203. self.assertEqual(new_state.state, state.state)
  204. self.assertEqual(new_state.status_msg, state.status_msg)
  205. self.assertEqual(wheel_timer.insert.call_count, 1)
  206. wheel_timer.insert.assert_has_calls(
  207. [
  208. call(
  209. now=now,
  210. obj=user_id,
  211. then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
  212. )
  213. ],
  214. any_order=True,
  215. )
  216. def test_online_to_offline(self) -> None:
  217. wheel_timer = Mock()
  218. user_id = "@foo:bar"
  219. now = 5000000
  220. prev_state = UserPresenceState.default(user_id)
  221. prev_state = prev_state.copy_and_replace(
  222. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  223. )
  224. new_state = prev_state.copy_and_replace(state=PresenceState.OFFLINE)
  225. state, persist_and_notify, federation_ping = handle_update(
  226. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  227. )
  228. self.assertTrue(persist_and_notify)
  229. self.assertEqual(new_state.state, state.state)
  230. self.assertEqual(state.last_federation_update_ts, now)
  231. self.assertEqual(wheel_timer.insert.call_count, 0)
  232. def test_online_to_idle(self) -> None:
  233. wheel_timer = Mock()
  234. user_id = "@foo:bar"
  235. now = 5000000
  236. prev_state = UserPresenceState.default(user_id)
  237. prev_state = prev_state.copy_and_replace(
  238. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  239. )
  240. new_state = prev_state.copy_and_replace(state=PresenceState.UNAVAILABLE)
  241. state, persist_and_notify, federation_ping = handle_update(
  242. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  243. )
  244. self.assertTrue(persist_and_notify)
  245. self.assertEqual(new_state.state, state.state)
  246. self.assertEqual(state.last_federation_update_ts, now)
  247. self.assertEqual(new_state.state, state.state)
  248. self.assertEqual(new_state.status_msg, state.status_msg)
  249. self.assertEqual(wheel_timer.insert.call_count, 1)
  250. wheel_timer.insert.assert_has_calls(
  251. [
  252. call(
  253. now=now,
  254. obj=user_id,
  255. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  256. )
  257. ],
  258. any_order=True,
  259. )
  260. def test_persisting_presence_updates(self) -> None:
  261. """Tests that the latest presence state for each user is persisted correctly"""
  262. # Create some test users and presence states for them
  263. presence_states = []
  264. for i in range(5):
  265. user_id = self.register_user(f"user_{i}", "password")
  266. presence_state = UserPresenceState(
  267. user_id=user_id,
  268. state="online",
  269. last_active_ts=1,
  270. last_federation_update_ts=1,
  271. last_user_sync_ts=1,
  272. status_msg="I'm online!",
  273. currently_active=True,
  274. )
  275. presence_states.append(presence_state)
  276. # Persist these presence updates to the database
  277. self.get_success(self.store.update_presence(presence_states))
  278. # Check that each update is present in the database
  279. db_presence_states_raw = self.get_success(
  280. self.store.get_all_presence_updates(
  281. instance_name="master",
  282. last_id=0,
  283. current_id=len(presence_states) + 1,
  284. limit=len(presence_states),
  285. )
  286. )
  287. # Extract presence update user ID and state information into lists of tuples
  288. db_presence_states = [(ps[0], ps[1]) for _, ps in db_presence_states_raw[0]]
  289. presence_states_compare = [(ps.user_id, ps.state) for ps in presence_states]
  290. # Compare what we put into the storage with what we got out.
  291. # They should be identical.
  292. self.assertEqual(presence_states_compare, db_presence_states)
  293. class PresenceTimeoutTestCase(unittest.TestCase):
  294. """Tests different timers and that the timer does not change `status_msg` of user."""
  295. def test_idle_timer(self) -> None:
  296. user_id = "@foo:bar"
  297. status_msg = "I'm here!"
  298. now = 5000000
  299. state = UserPresenceState.default(user_id)
  300. state = state.copy_and_replace(
  301. state=PresenceState.ONLINE,
  302. last_active_ts=now - IDLE_TIMER - 1,
  303. last_user_sync_ts=now,
  304. status_msg=status_msg,
  305. )
  306. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  307. self.assertIsNotNone(new_state)
  308. assert new_state is not None
  309. self.assertEqual(new_state.state, PresenceState.UNAVAILABLE)
  310. self.assertEqual(new_state.status_msg, status_msg)
  311. def test_busy_no_idle(self) -> None:
  312. """
  313. Tests that a user setting their presence to busy but idling doesn't turn their
  314. presence state into unavailable.
  315. """
  316. user_id = "@foo:bar"
  317. status_msg = "I'm here!"
  318. now = 5000000
  319. state = UserPresenceState.default(user_id)
  320. state = state.copy_and_replace(
  321. state=PresenceState.BUSY,
  322. last_active_ts=now - IDLE_TIMER - 1,
  323. last_user_sync_ts=now,
  324. status_msg=status_msg,
  325. )
  326. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  327. self.assertIsNotNone(new_state)
  328. assert new_state is not None
  329. self.assertEqual(new_state.state, PresenceState.BUSY)
  330. self.assertEqual(new_state.status_msg, status_msg)
  331. def test_sync_timeout(self) -> None:
  332. user_id = "@foo:bar"
  333. status_msg = "I'm here!"
  334. now = 5000000
  335. state = UserPresenceState.default(user_id)
  336. state = state.copy_and_replace(
  337. state=PresenceState.ONLINE,
  338. last_active_ts=0,
  339. last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  340. status_msg=status_msg,
  341. )
  342. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  343. self.assertIsNotNone(new_state)
  344. assert new_state is not None
  345. self.assertEqual(new_state.state, PresenceState.OFFLINE)
  346. self.assertEqual(new_state.status_msg, status_msg)
  347. def test_sync_online(self) -> None:
  348. user_id = "@foo:bar"
  349. status_msg = "I'm here!"
  350. now = 5000000
  351. state = UserPresenceState.default(user_id)
  352. state = state.copy_and_replace(
  353. state=PresenceState.ONLINE,
  354. last_active_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  355. last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  356. status_msg=status_msg,
  357. )
  358. new_state = handle_timeout(
  359. state, is_mine=True, syncing_user_ids={user_id}, now=now
  360. )
  361. self.assertIsNotNone(new_state)
  362. assert new_state is not None
  363. self.assertEqual(new_state.state, PresenceState.ONLINE)
  364. self.assertEqual(new_state.status_msg, status_msg)
  365. def test_federation_ping(self) -> None:
  366. user_id = "@foo:bar"
  367. status_msg = "I'm here!"
  368. now = 5000000
  369. state = UserPresenceState.default(user_id)
  370. state = state.copy_and_replace(
  371. state=PresenceState.ONLINE,
  372. last_active_ts=now,
  373. last_user_sync_ts=now,
  374. last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
  375. status_msg=status_msg,
  376. )
  377. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  378. self.assertIsNotNone(new_state)
  379. self.assertEqual(state, new_state)
  380. def test_no_timeout(self) -> None:
  381. user_id = "@foo:bar"
  382. now = 5000000
  383. state = UserPresenceState.default(user_id)
  384. state = state.copy_and_replace(
  385. state=PresenceState.ONLINE,
  386. last_active_ts=now,
  387. last_user_sync_ts=now,
  388. last_federation_update_ts=now,
  389. )
  390. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  391. self.assertIsNone(new_state)
  392. def test_federation_timeout(self) -> None:
  393. user_id = "@foo:bar"
  394. status_msg = "I'm here!"
  395. now = 5000000
  396. state = UserPresenceState.default(user_id)
  397. state = state.copy_and_replace(
  398. state=PresenceState.ONLINE,
  399. last_active_ts=now,
  400. last_user_sync_ts=now,
  401. last_federation_update_ts=now - FEDERATION_TIMEOUT - 1,
  402. status_msg=status_msg,
  403. )
  404. new_state = handle_timeout(
  405. state, is_mine=False, syncing_user_ids=set(), now=now
  406. )
  407. self.assertIsNotNone(new_state)
  408. assert new_state is not None
  409. self.assertEqual(new_state.state, PresenceState.OFFLINE)
  410. self.assertEqual(new_state.status_msg, status_msg)
  411. def test_last_active(self) -> None:
  412. user_id = "@foo:bar"
  413. status_msg = "I'm here!"
  414. now = 5000000
  415. state = UserPresenceState.default(user_id)
  416. state = state.copy_and_replace(
  417. state=PresenceState.ONLINE,
  418. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
  419. last_user_sync_ts=now,
  420. last_federation_update_ts=now,
  421. status_msg=status_msg,
  422. )
  423. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  424. self.assertIsNotNone(new_state)
  425. self.assertEqual(state, new_state)
  426. class PresenceHandlerInitTestCase(unittest.HomeserverTestCase):
  427. def default_config(self) -> JsonDict:
  428. config = super().default_config()
  429. # Disable background tasks on this worker so that the PresenceHandler isn't
  430. # loaded until we request it.
  431. config["run_background_tasks_on"] = "other"
  432. return config
  433. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  434. self.user_id = f"@test:{self.hs.config.server.server_name}"
  435. self.device_id = "dev-1"
  436. # Move the reactor to the initial time.
  437. self.reactor.advance(1000)
  438. now = self.clock.time_msec()
  439. main_store = hs.get_datastores().main
  440. self.get_success(
  441. main_store.update_presence(
  442. [
  443. UserPresenceState(
  444. user_id=self.user_id,
  445. state=PresenceState.ONLINE,
  446. last_active_ts=now,
  447. last_federation_update_ts=now,
  448. last_user_sync_ts=now,
  449. status_msg=None,
  450. currently_active=True,
  451. )
  452. ]
  453. )
  454. )
  455. # Regenerate the preloaded presence information on PresenceStore.
  456. def refill_presence(db_conn: LoggingDatabaseConnection) -> None:
  457. main_store._presence_on_startup = main_store._get_active_presence(db_conn)
  458. self.get_success(main_store.db_pool.runWithConnection(refill_presence))
  459. def test_restored_presence_idles(self) -> None:
  460. """The presence state restored from the database should not persist forever."""
  461. # Get the handler (which kicks off a bunch of timers).
  462. presence_handler = self.hs.get_presence_handler()
  463. # Assert the user is online.
  464. state = self.get_success(
  465. presence_handler.get_state(UserID.from_string(self.user_id))
  466. )
  467. self.assertEqual(state.state, PresenceState.ONLINE)
  468. # Advance such that the user should timeout.
  469. self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
  470. self.reactor.pump([5])
  471. # Check that the user is now offline.
  472. state = self.get_success(
  473. presence_handler.get_state(UserID.from_string(self.user_id))
  474. )
  475. self.assertEqual(state.state, PresenceState.OFFLINE)
  476. @parameterized.expand(
  477. [
  478. (PresenceState.BUSY, PresenceState.BUSY),
  479. (PresenceState.ONLINE, PresenceState.ONLINE),
  480. (PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE),
  481. # Offline syncs don't update the state.
  482. (PresenceState.OFFLINE, PresenceState.ONLINE),
  483. ]
  484. )
  485. @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
  486. def test_restored_presence_online_after_sync(
  487. self, sync_state: str, expected_state: str
  488. ) -> None:
  489. """
  490. The presence state restored from the database should be overridden with sync after a timeout.
  491. Args:
  492. sync_state: The presence state of the new sync.
  493. expected_state: The expected presence right after the sync.
  494. """
  495. # Get the handler (which kicks off a bunch of timers).
  496. presence_handler = self.hs.get_presence_handler()
  497. # Assert the user is online, as restored.
  498. state = self.get_success(
  499. presence_handler.get_state(UserID.from_string(self.user_id))
  500. )
  501. self.assertEqual(state.state, PresenceState.ONLINE)
  502. # Advance slightly and sync.
  503. self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
  504. self.get_success(
  505. presence_handler.user_syncing(
  506. self.user_id,
  507. self.device_id,
  508. sync_state != PresenceState.OFFLINE,
  509. sync_state,
  510. )
  511. )
  512. # Assert the user is in the expected state.
  513. state = self.get_success(
  514. presence_handler.get_state(UserID.from_string(self.user_id))
  515. )
  516. self.assertEqual(state.state, expected_state)
  517. # Advance such that the user's preloaded data times out, but not the new sync.
  518. self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
  519. self.reactor.pump([5])
  520. # Check that the user is in the sync state (as the client is currently syncing still).
  521. state = self.get_success(
  522. presence_handler.get_state(UserID.from_string(self.user_id))
  523. )
  524. self.assertEqual(state.state, sync_state)
  525. class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
  526. user_id = "@test:server"
  527. user_id_obj = UserID.from_string(user_id)
  528. device_id = "dev-1"
  529. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  530. self.presence_handler = hs.get_presence_handler()
  531. self.clock = hs.get_clock()
  532. def test_external_process_timeout(self) -> None:
  533. """Test that if an external process doesn't update the records for a while
  534. we time out their syncing users presence.
  535. """
  536. # Create a worker and use it to handle /sync traffic instead.
  537. # This is used to test that presence changes get replicated from workers
  538. # to the main process correctly.
  539. worker_to_sync_against = self.make_worker_hs(
  540. "synapse.app.generic_worker", {"worker_name": "synchrotron"}
  541. )
  542. worker_presence_handler = worker_to_sync_against.get_presence_handler()
  543. self.get_success(
  544. worker_presence_handler.user_syncing(
  545. self.user_id, self.device_id, True, PresenceState.ONLINE
  546. ),
  547. by=0.1,
  548. )
  549. # Check that if we wait a while without telling the handler the user has
  550. # stopped syncing that their presence state doesn't get timed out.
  551. self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)
  552. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  553. self.assertEqual(state.state, PresenceState.ONLINE)
  554. # Check that if the external process timeout fires, then the syncing
  555. # user gets timed out
  556. self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)
  557. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  558. self.assertEqual(state.state, PresenceState.OFFLINE)
  559. def test_user_goes_offline_by_timeout_status_msg_remain(self) -> None:
  560. """Test that if a user doesn't update the records for a while
  561. users presence goes `OFFLINE` because of timeout and `status_msg` remains.
  562. """
  563. status_msg = "I'm here!"
  564. # Mark user as online
  565. self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
  566. # Check that if we wait a while without telling the handler the user has
  567. # stopped syncing that their presence state doesn't get timed out.
  568. self.reactor.advance(SYNC_ONLINE_TIMEOUT / 2)
  569. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  570. self.assertEqual(state.state, PresenceState.ONLINE)
  571. self.assertEqual(state.status_msg, status_msg)
  572. # Check that if the timeout fires, then the syncing user gets timed out
  573. self.reactor.advance(SYNC_ONLINE_TIMEOUT)
  574. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  575. # status_msg should remain even after going offline
  576. self.assertEqual(state.state, PresenceState.OFFLINE)
  577. self.assertEqual(state.status_msg, status_msg)
  578. def test_user_goes_offline_manually_with_no_status_msg(self) -> None:
  579. """Test that if a user change presence manually to `OFFLINE`
  580. and no status is set, that `status_msg` is `None`.
  581. """
  582. status_msg = "I'm here!"
  583. # Mark user as online
  584. self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
  585. # Mark user as offline
  586. self.get_success(
  587. self.presence_handler.set_state(
  588. self.user_id_obj, self.device_id, {"presence": PresenceState.OFFLINE}
  589. )
  590. )
  591. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  592. self.assertEqual(state.state, PresenceState.OFFLINE)
  593. self.assertEqual(state.status_msg, None)
  594. def test_user_goes_offline_manually_with_status_msg(self) -> None:
  595. """Test that if a user change presence manually to `OFFLINE`
  596. and a status is set, that `status_msg` appears.
  597. """
  598. status_msg = "I'm here!"
  599. # Mark user as online
  600. self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
  601. # Mark user as offline
  602. self._set_presencestate_with_status_msg(PresenceState.OFFLINE, "And now here.")
  603. def test_user_reset_online_with_no_status(self) -> None:
  604. """Test that if a user set again the presence manually
  605. and no status is set, that `status_msg` is `None`.
  606. """
  607. status_msg = "I'm here!"
  608. # Mark user as online
  609. self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
  610. # Mark user as online again
  611. self.get_success(
  612. self.presence_handler.set_state(
  613. self.user_id_obj, self.device_id, {"presence": PresenceState.ONLINE}
  614. )
  615. )
  616. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  617. # status_msg should remain even after going offline
  618. self.assertEqual(state.state, PresenceState.ONLINE)
  619. self.assertEqual(state.status_msg, None)
  620. def test_set_presence_with_status_msg_none(self) -> None:
  621. """Test that if a user set again the presence manually
  622. and status is `None`, that `status_msg` is `None`.
  623. """
  624. status_msg = "I'm here!"
  625. # Mark user as online
  626. self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
  627. # Mark user as online and `status_msg = None`
  628. self._set_presencestate_with_status_msg(PresenceState.ONLINE, None)
  629. def test_set_presence_from_syncing_not_set(self) -> None:
  630. """Test that presence is not set by syncing if affect_presence is false"""
  631. status_msg = "I'm here!"
  632. self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
  633. self.get_success(
  634. self.presence_handler.user_syncing(
  635. self.user_id, self.device_id, False, PresenceState.ONLINE
  636. )
  637. )
  638. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  639. # we should still be unavailable
  640. self.assertEqual(state.state, PresenceState.UNAVAILABLE)
  641. # and status message should still be the same
  642. self.assertEqual(state.status_msg, status_msg)
  643. def test_set_presence_from_syncing_is_set(self) -> None:
  644. """Test that presence is set by syncing if affect_presence is true"""
  645. status_msg = "I'm here!"
  646. self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
  647. self.get_success(
  648. self.presence_handler.user_syncing(
  649. self.user_id, self.device_id, True, PresenceState.ONLINE
  650. )
  651. )
  652. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  653. # we should now be online
  654. self.assertEqual(state.state, PresenceState.ONLINE)
  655. def test_set_presence_from_syncing_keeps_status(self) -> None:
  656. """Test that presence set by syncing retains status message"""
  657. status_msg = "I'm here!"
  658. self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
  659. self.get_success(
  660. self.presence_handler.user_syncing(
  661. self.user_id, self.device_id, True, PresenceState.ONLINE
  662. )
  663. )
  664. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  665. # our status message should be the same as it was before
  666. self.assertEqual(state.status_msg, status_msg)
  667. @parameterized.expand([(False,), (True,)])
  668. @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
  669. def test_set_presence_from_syncing_keeps_busy(
  670. self, test_with_workers: bool
  671. ) -> None:
  672. """Test that presence set by syncing doesn't affect busy status
  673. Args:
  674. test_with_workers: If True, check the presence state of the user by calling
  675. /sync against a worker, rather than the main process.
  676. """
  677. status_msg = "I'm busy!"
  678. # By default, we call /sync against the main process.
  679. worker_to_sync_against = self.hs
  680. if test_with_workers:
  681. # Create a worker and use it to handle /sync traffic instead.
  682. # This is used to test that presence changes get replicated from workers
  683. # to the main process correctly.
  684. worker_to_sync_against = self.make_worker_hs(
  685. "synapse.app.generic_worker", {"worker_name": "synchrotron"}
  686. )
  687. # Set presence to BUSY
  688. self._set_presencestate_with_status_msg(PresenceState.BUSY, status_msg)
  689. # Perform a sync with a presence state other than busy. This should NOT change
  690. # our presence status; we only change from busy if we explicitly set it via
  691. # /presence/*.
  692. self.get_success(
  693. worker_to_sync_against.get_presence_handler().user_syncing(
  694. self.user_id, self.device_id, True, PresenceState.ONLINE
  695. ),
  696. by=0.1,
  697. )
  698. # Check against the main process that the user's presence did not change.
  699. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  700. # we should still be busy
  701. self.assertEqual(state.state, PresenceState.BUSY)
  702. # Advance such that the device would be discarded if it was not busy,
  703. # then pump so _handle_timeouts function to called.
  704. self.reactor.advance(IDLE_TIMER / 1000)
  705. self.reactor.pump([5])
  706. # The account should still be busy.
  707. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  708. self.assertEqual(state.state, PresenceState.BUSY)
  709. # Ensure that a /presence call can set the user *off* busy.
  710. self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
  711. state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  712. self.assertEqual(state.state, PresenceState.ONLINE)
  713. def _set_presencestate_with_status_msg(
  714. self, state: str, status_msg: Optional[str]
  715. ) -> None:
  716. """Set a PresenceState and status_msg and check the result.
  717. Args:
  718. state: The new PresenceState.
  719. status_msg: Status message that is to be set.
  720. """
  721. self.get_success(
  722. self.presence_handler.set_state(
  723. self.user_id_obj,
  724. self.device_id,
  725. {"presence": state, "status_msg": status_msg},
  726. )
  727. )
  728. new_state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
  729. self.assertEqual(new_state.state, state)
  730. self.assertEqual(new_state.status_msg, status_msg)
  731. class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
  732. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  733. self.presence_handler = hs.get_presence_handler()
  734. self.clock = hs.get_clock()
  735. self.instance_name = hs.get_instance_name()
  736. self.queue = self.presence_handler.get_federation_queue()
  737. def test_send_and_get(self) -> None:
  738. state1 = UserPresenceState.default("@user1:test")
  739. state2 = UserPresenceState.default("@user2:test")
  740. state3 = UserPresenceState.default("@user3:test")
  741. prev_token = self.queue.get_current_token(self.instance_name)
  742. self.get_success(
  743. self.queue.send_presence_to_destinations(
  744. (state1, state2), ("dest1", "dest2")
  745. )
  746. )
  747. self.get_success(
  748. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  749. )
  750. now_token = self.queue.get_current_token(self.instance_name)
  751. rows, upto_token, limited = self.get_success(
  752. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  753. )
  754. self.assertEqual(upto_token, now_token)
  755. self.assertFalse(limited)
  756. expected_rows = [
  757. (1, ("dest1", "@user1:test")),
  758. (1, ("dest2", "@user1:test")),
  759. (1, ("dest1", "@user2:test")),
  760. (1, ("dest2", "@user2:test")),
  761. (2, ("dest3", "@user3:test")),
  762. ]
  763. self.assertCountEqual(rows, expected_rows)
  764. now_token = self.queue.get_current_token(self.instance_name)
  765. rows, upto_token, limited = self.get_success(
  766. self.queue.get_replication_rows("master", upto_token, now_token, 10)
  767. )
  768. self.assertEqual(upto_token, now_token)
  769. self.assertFalse(limited)
  770. self.assertCountEqual(rows, [])
  771. def test_send_and_get_split(self) -> None:
  772. state1 = UserPresenceState.default("@user1:test")
  773. state2 = UserPresenceState.default("@user2:test")
  774. state3 = UserPresenceState.default("@user3:test")
  775. prev_token = self.queue.get_current_token(self.instance_name)
  776. self.get_success(
  777. self.queue.send_presence_to_destinations(
  778. (state1, state2), ("dest1", "dest2")
  779. )
  780. )
  781. now_token = self.queue.get_current_token(self.instance_name)
  782. self.get_success(
  783. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  784. )
  785. rows, upto_token, limited = self.get_success(
  786. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  787. )
  788. self.assertEqual(upto_token, now_token)
  789. self.assertFalse(limited)
  790. expected_rows = [
  791. (1, ("dest1", "@user1:test")),
  792. (1, ("dest2", "@user1:test")),
  793. (1, ("dest1", "@user2:test")),
  794. (1, ("dest2", "@user2:test")),
  795. ]
  796. self.assertCountEqual(rows, expected_rows)
  797. now_token = self.queue.get_current_token(self.instance_name)
  798. rows, upto_token, limited = self.get_success(
  799. self.queue.get_replication_rows("master", upto_token, now_token, 10)
  800. )
  801. self.assertEqual(upto_token, now_token)
  802. self.assertFalse(limited)
  803. expected_rows = [
  804. (2, ("dest3", "@user3:test")),
  805. ]
  806. self.assertCountEqual(rows, expected_rows)
  807. def test_clear_queue_all(self) -> None:
  808. state1 = UserPresenceState.default("@user1:test")
  809. state2 = UserPresenceState.default("@user2:test")
  810. state3 = UserPresenceState.default("@user3:test")
  811. prev_token = self.queue.get_current_token(self.instance_name)
  812. self.get_success(
  813. self.queue.send_presence_to_destinations(
  814. (state1, state2), ("dest1", "dest2")
  815. )
  816. )
  817. self.get_success(
  818. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  819. )
  820. self.reactor.advance(10 * 60 * 1000)
  821. now_token = self.queue.get_current_token(self.instance_name)
  822. rows, upto_token, limited = self.get_success(
  823. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  824. )
  825. self.assertEqual(upto_token, now_token)
  826. self.assertFalse(limited)
  827. self.assertCountEqual(rows, [])
  828. prev_token = self.queue.get_current_token(self.instance_name)
  829. self.get_success(
  830. self.queue.send_presence_to_destinations(
  831. (state1, state2), ("dest1", "dest2")
  832. )
  833. )
  834. self.get_success(
  835. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  836. )
  837. now_token = self.queue.get_current_token(self.instance_name)
  838. rows, upto_token, limited = self.get_success(
  839. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  840. )
  841. self.assertEqual(upto_token, now_token)
  842. self.assertFalse(limited)
  843. expected_rows = [
  844. (3, ("dest1", "@user1:test")),
  845. (3, ("dest2", "@user1:test")),
  846. (3, ("dest1", "@user2:test")),
  847. (3, ("dest2", "@user2:test")),
  848. (4, ("dest3", "@user3:test")),
  849. ]
  850. self.assertCountEqual(rows, expected_rows)
  851. def test_partially_clear_queue(self) -> None:
  852. state1 = UserPresenceState.default("@user1:test")
  853. state2 = UserPresenceState.default("@user2:test")
  854. state3 = UserPresenceState.default("@user3:test")
  855. prev_token = self.queue.get_current_token(self.instance_name)
  856. self.get_success(
  857. self.queue.send_presence_to_destinations(
  858. (state1, state2), ("dest1", "dest2")
  859. )
  860. )
  861. self.reactor.advance(2 * 60 * 1000)
  862. self.get_success(
  863. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  864. )
  865. self.reactor.advance(4 * 60 * 1000)
  866. now_token = self.queue.get_current_token(self.instance_name)
  867. rows, upto_token, limited = self.get_success(
  868. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  869. )
  870. self.assertEqual(upto_token, now_token)
  871. self.assertFalse(limited)
  872. self.assertCountEqual(rows, [])
  873. prev_token = self.queue.get_current_token(self.instance_name)
  874. self.get_success(
  875. self.queue.send_presence_to_destinations(
  876. (state1, state2), ("dest1", "dest2")
  877. )
  878. )
  879. self.get_success(
  880. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  881. )
  882. now_token = self.queue.get_current_token(self.instance_name)
  883. rows, upto_token, limited = self.get_success(
  884. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  885. )
  886. self.assertEqual(upto_token, now_token)
  887. self.assertFalse(limited)
  888. expected_rows = [
  889. (3, ("dest1", "@user1:test")),
  890. (3, ("dest2", "@user1:test")),
  891. (3, ("dest1", "@user2:test")),
  892. (3, ("dest2", "@user2:test")),
  893. (4, ("dest3", "@user3:test")),
  894. ]
  895. self.assertCountEqual(rows, expected_rows)
  896. class PresenceJoinTestCase(unittest.HomeserverTestCase):
  897. """Tests remote servers get told about presence of users in the room when
  898. they join and when new local users join.
  899. """
  900. user_id = "@test:server"
  901. servlets = [room.register_servlets]
  902. def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
  903. hs = self.setup_test_homeserver(
  904. "server",
  905. federation_sender=Mock(spec=FederationSender),
  906. )
  907. return hs
  908. def default_config(self) -> JsonDict:
  909. config = super().default_config()
  910. # Enable federation sending on the main process.
  911. config["federation_sender_instances"] = None
  912. return config
  913. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  914. self.federation_sender = cast(Mock, hs.get_federation_sender())
  915. self.event_builder_factory = hs.get_event_builder_factory()
  916. self.federation_event_handler = hs.get_federation_event_handler()
  917. self.presence_handler = hs.get_presence_handler()
  918. # self.event_builder_for_2 = EventBuilderFactory(hs)
  919. # self.event_builder_for_2.hostname = "test2"
  920. self.store = hs.get_datastores().main
  921. self.state = hs.get_state_handler()
  922. self._event_auth_handler = hs.get_event_auth_handler()
  923. # We don't actually check signatures in tests, so lets just create a
  924. # random key to use.
  925. self.random_signing_key = generate_signing_key("ver")
  926. def test_remote_joins(self) -> None:
  927. # We advance time to something that isn't 0, as we use 0 as a special
  928. # value.
  929. self.reactor.advance(1000000000000)
  930. # Create a room with two local users
  931. room_id = self.helper.create_room_as(self.user_id)
  932. self.helper.join(room_id, "@test2:server")
  933. # Mark test2 as online, test will be offline with a last_active of 0
  934. self.get_success(
  935. self.presence_handler.set_state(
  936. UserID.from_string("@test2:server"),
  937. "dev-1",
  938. {"presence": PresenceState.ONLINE},
  939. )
  940. )
  941. self.reactor.pump([0]) # Wait for presence updates to be handled
  942. #
  943. # Test that a new server gets told about existing presence
  944. #
  945. self.federation_sender.reset_mock()
  946. # Add a new remote server to the room
  947. self._add_new_user(room_id, "@alice:server2")
  948. # When new server is joined we send it the local users presence states.
  949. # We expect to only see user @test2:server, as @test:server is offline
  950. # and has a zero last_active_ts
  951. expected_state = self.get_success(
  952. self.presence_handler.current_state_for_user("@test2:server")
  953. )
  954. self.assertEqual(expected_state.state, PresenceState.ONLINE)
  955. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  956. destinations={"server2"}, states=[expected_state]
  957. )
  958. #
  959. # Test that only the new server gets sent presence and not existing servers
  960. #
  961. self.federation_sender.reset_mock()
  962. self._add_new_user(room_id, "@bob:server3")
  963. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  964. destinations={"server3"}, states=[expected_state]
  965. )
  966. def test_remote_gets_presence_when_local_user_joins(self) -> None:
  967. # We advance time to something that isn't 0, as we use 0 as a special
  968. # value.
  969. self.reactor.advance(1000000000000)
  970. # Create a room with one local users
  971. room_id = self.helper.create_room_as(self.user_id)
  972. # Mark test as online
  973. self.get_success(
  974. self.presence_handler.set_state(
  975. UserID.from_string("@test:server"),
  976. "dev-1",
  977. {"presence": PresenceState.ONLINE},
  978. )
  979. )
  980. # Mark test2 as online, test will be offline with a last_active of 0.
  981. # Note we don't join them to the room yet
  982. self.get_success(
  983. self.presence_handler.set_state(
  984. UserID.from_string("@test2:server"),
  985. "dev-1",
  986. {"presence": PresenceState.ONLINE},
  987. )
  988. )
  989. # Add servers to the room
  990. self._add_new_user(room_id, "@alice:server2")
  991. self._add_new_user(room_id, "@bob:server3")
  992. self.reactor.pump([0]) # Wait for presence updates to be handled
  993. #
  994. # Test that when a local join happens remote servers get told about it
  995. #
  996. self.federation_sender.reset_mock()
  997. # Join local user to room
  998. self.helper.join(room_id, "@test2:server")
  999. self.reactor.pump([0]) # Wait for presence updates to be handled
  1000. # We expect to only send test2 presence to server2 and server3
  1001. expected_state = self.get_success(
  1002. self.presence_handler.current_state_for_user("@test2:server")
  1003. )
  1004. self.assertEqual(expected_state.state, PresenceState.ONLINE)
  1005. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  1006. destinations={"server2", "server3"}, states=[expected_state]
  1007. )
  1008. def _add_new_user(self, room_id: str, user_id: str) -> None:
  1009. """Add new user to the room by creating an event and poking the federation API."""
  1010. hostname = get_domain_from_id(user_id)
  1011. room_version = self.get_success(self.store.get_room_version_id(room_id))
  1012. builder = EventBuilder(
  1013. state=self.state,
  1014. event_auth_handler=self._event_auth_handler,
  1015. store=self.store,
  1016. clock=self.clock,
  1017. hostname=hostname,
  1018. signing_key=self.random_signing_key,
  1019. room_version=KNOWN_ROOM_VERSIONS[room_version],
  1020. room_id=room_id,
  1021. type=EventTypes.Member,
  1022. sender=user_id,
  1023. state_key=user_id,
  1024. content={"membership": Membership.JOIN},
  1025. )
  1026. prev_event_ids = self.get_success(
  1027. self.store.get_latest_event_ids_in_room(room_id)
  1028. )
  1029. event = self.get_success(
  1030. builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
  1031. )
  1032. self.get_success(self.federation_event_handler.on_receive_pdu(hostname, event))
  1033. # Check that it was successfully persisted.
  1034. self.get_success(self.store.get_event(event.event_id))
  1035. self.get_success(self.store.get_event(event.event_id))