test_presence.py 36 KB


  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Optional
  15. from unittest.mock import Mock, call
  16. from signedjson.key import generate_signing_key
  17. from synapse.api.constants import EventTypes, Membership, PresenceState
  18. from synapse.api.presence import UserPresenceState
  19. from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
  20. from synapse.events.builder import EventBuilder
  21. from synapse.federation.sender import FederationSender
  22. from synapse.handlers.presence import (
  23. EXTERNAL_PROCESS_EXPIRY,
  24. FEDERATION_PING_INTERVAL,
  25. FEDERATION_TIMEOUT,
  26. IDLE_TIMER,
  27. LAST_ACTIVE_GRANULARITY,
  28. SYNC_ONLINE_TIMEOUT,
  29. handle_timeout,
  30. handle_update,
  31. )
  32. from synapse.rest import admin
  33. from synapse.rest.client import room
  34. from synapse.types import UserID, get_domain_from_id
  35. from tests import unittest
  36. class PresenceUpdateTestCase(unittest.HomeserverTestCase):
  37. servlets = [admin.register_servlets]
  38. def prepare(self, reactor, clock, homeserver):
  39. self.store = homeserver.get_datastores().main
  40. def test_offline_to_online(self):
  41. wheel_timer = Mock()
  42. user_id = "@foo:bar"
  43. now = 5000000
  44. prev_state = UserPresenceState.default(user_id)
  45. new_state = prev_state.copy_and_replace(
  46. state=PresenceState.ONLINE, last_active_ts=now
  47. )
  48. state, persist_and_notify, federation_ping = handle_update(
  49. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  50. )
  51. self.assertTrue(persist_and_notify)
  52. self.assertTrue(state.currently_active)
  53. self.assertEqual(new_state.state, state.state)
  54. self.assertEqual(new_state.status_msg, state.status_msg)
  55. self.assertEqual(state.last_federation_update_ts, now)
  56. self.assertEqual(wheel_timer.insert.call_count, 3)
  57. wheel_timer.insert.assert_has_calls(
  58. [
  59. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  60. call(
  61. now=now,
  62. obj=user_id,
  63. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  64. ),
  65. call(
  66. now=now,
  67. obj=user_id,
  68. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  69. ),
  70. ],
  71. any_order=True,
  72. )
  73. def test_online_to_online(self):
  74. wheel_timer = Mock()
  75. user_id = "@foo:bar"
  76. now = 5000000
  77. prev_state = UserPresenceState.default(user_id)
  78. prev_state = prev_state.copy_and_replace(
  79. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  80. )
  81. new_state = prev_state.copy_and_replace(
  82. state=PresenceState.ONLINE, last_active_ts=now
  83. )
  84. state, persist_and_notify, federation_ping = handle_update(
  85. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  86. )
  87. self.assertFalse(persist_and_notify)
  88. self.assertTrue(federation_ping)
  89. self.assertTrue(state.currently_active)
  90. self.assertEqual(new_state.state, state.state)
  91. self.assertEqual(new_state.status_msg, state.status_msg)
  92. self.assertEqual(state.last_federation_update_ts, now)
  93. self.assertEqual(wheel_timer.insert.call_count, 3)
  94. wheel_timer.insert.assert_has_calls(
  95. [
  96. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  97. call(
  98. now=now,
  99. obj=user_id,
  100. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  101. ),
  102. call(
  103. now=now,
  104. obj=user_id,
  105. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  106. ),
  107. ],
  108. any_order=True,
  109. )
  110. def test_online_to_online_last_active_noop(self):
  111. wheel_timer = Mock()
  112. user_id = "@foo:bar"
  113. now = 5000000
  114. prev_state = UserPresenceState.default(user_id)
  115. prev_state = prev_state.copy_and_replace(
  116. state=PresenceState.ONLINE,
  117. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 10,
  118. currently_active=True,
  119. )
  120. new_state = prev_state.copy_and_replace(
  121. state=PresenceState.ONLINE, last_active_ts=now
  122. )
  123. state, persist_and_notify, federation_ping = handle_update(
  124. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  125. )
  126. self.assertFalse(persist_and_notify)
  127. self.assertTrue(federation_ping)
  128. self.assertTrue(state.currently_active)
  129. self.assertEqual(new_state.state, state.state)
  130. self.assertEqual(new_state.status_msg, state.status_msg)
  131. self.assertEqual(state.last_federation_update_ts, now)
  132. self.assertEqual(wheel_timer.insert.call_count, 3)
  133. wheel_timer.insert.assert_has_calls(
  134. [
  135. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  136. call(
  137. now=now,
  138. obj=user_id,
  139. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  140. ),
  141. call(
  142. now=now,
  143. obj=user_id,
  144. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  145. ),
  146. ],
  147. any_order=True,
  148. )
  149. def test_online_to_online_last_active(self):
  150. wheel_timer = Mock()
  151. user_id = "@foo:bar"
  152. now = 5000000
  153. prev_state = UserPresenceState.default(user_id)
  154. prev_state = prev_state.copy_and_replace(
  155. state=PresenceState.ONLINE,
  156. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
  157. currently_active=True,
  158. )
  159. new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
  160. state, persist_and_notify, federation_ping = handle_update(
  161. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  162. )
  163. self.assertTrue(persist_and_notify)
  164. self.assertFalse(state.currently_active)
  165. self.assertEqual(new_state.state, state.state)
  166. self.assertEqual(new_state.status_msg, state.status_msg)
  167. self.assertEqual(state.last_federation_update_ts, now)
  168. self.assertEqual(wheel_timer.insert.call_count, 2)
  169. wheel_timer.insert.assert_has_calls(
  170. [
  171. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  172. call(
  173. now=now,
  174. obj=user_id,
  175. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  176. ),
  177. ],
  178. any_order=True,
  179. )
  180. def test_remote_ping_timer(self):
  181. wheel_timer = Mock()
  182. user_id = "@foo:bar"
  183. now = 5000000
  184. prev_state = UserPresenceState.default(user_id)
  185. prev_state = prev_state.copy_and_replace(
  186. state=PresenceState.ONLINE, last_active_ts=now
  187. )
  188. new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
  189. state, persist_and_notify, federation_ping = handle_update(
  190. prev_state, new_state, is_mine=False, wheel_timer=wheel_timer, now=now
  191. )
  192. self.assertFalse(persist_and_notify)
  193. self.assertFalse(federation_ping)
  194. self.assertFalse(state.currently_active)
  195. self.assertEqual(new_state.state, state.state)
  196. self.assertEqual(new_state.status_msg, state.status_msg)
  197. self.assertEqual(wheel_timer.insert.call_count, 1)
  198. wheel_timer.insert.assert_has_calls(
  199. [
  200. call(
  201. now=now,
  202. obj=user_id,
  203. then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
  204. )
  205. ],
  206. any_order=True,
  207. )
  208. def test_online_to_offline(self):
  209. wheel_timer = Mock()
  210. user_id = "@foo:bar"
  211. now = 5000000
  212. prev_state = UserPresenceState.default(user_id)
  213. prev_state = prev_state.copy_and_replace(
  214. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  215. )
  216. new_state = prev_state.copy_and_replace(state=PresenceState.OFFLINE)
  217. state, persist_and_notify, federation_ping = handle_update(
  218. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  219. )
  220. self.assertTrue(persist_and_notify)
  221. self.assertEqual(new_state.state, state.state)
  222. self.assertEqual(state.last_federation_update_ts, now)
  223. self.assertEqual(wheel_timer.insert.call_count, 0)
  224. def test_online_to_idle(self):
  225. wheel_timer = Mock()
  226. user_id = "@foo:bar"
  227. now = 5000000
  228. prev_state = UserPresenceState.default(user_id)
  229. prev_state = prev_state.copy_and_replace(
  230. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  231. )
  232. new_state = prev_state.copy_and_replace(state=PresenceState.UNAVAILABLE)
  233. state, persist_and_notify, federation_ping = handle_update(
  234. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  235. )
  236. self.assertTrue(persist_and_notify)
  237. self.assertEqual(new_state.state, state.state)
  238. self.assertEqual(state.last_federation_update_ts, now)
  239. self.assertEqual(new_state.state, state.state)
  240. self.assertEqual(new_state.status_msg, state.status_msg)
  241. self.assertEqual(wheel_timer.insert.call_count, 1)
  242. wheel_timer.insert.assert_has_calls(
  243. [
  244. call(
  245. now=now,
  246. obj=user_id,
  247. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  248. )
  249. ],
  250. any_order=True,
  251. )
  252. def test_persisting_presence_updates(self):
  253. """Tests that the latest presence state for each user is persisted correctly"""
  254. # Create some test users and presence states for them
  255. presence_states = []
  256. for i in range(5):
  257. user_id = self.register_user(f"user_{i}", "password")
  258. presence_state = UserPresenceState(
  259. user_id=user_id,
  260. state="online",
  261. last_active_ts=1,
  262. last_federation_update_ts=1,
  263. last_user_sync_ts=1,
  264. status_msg="I'm online!",
  265. currently_active=True,
  266. )
  267. presence_states.append(presence_state)
  268. # Persist these presence updates to the database
  269. self.get_success(self.store.update_presence(presence_states))
  270. # Check that each update is present in the database
  271. db_presence_states = self.get_success(
  272. self.store.get_all_presence_updates(
  273. instance_name="master",
  274. last_id=0,
  275. current_id=len(presence_states) + 1,
  276. limit=len(presence_states),
  277. )
  278. )
  279. # Extract presence update user ID and state information into lists of tuples
  280. db_presence_states = [(ps[0], ps[1]) for _, ps in db_presence_states[0]]
  281. presence_states_compare = [(ps.user_id, ps.state) for ps in presence_states]
  282. # Compare what we put into the storage with what we got out.
  283. # They should be identical.
  284. self.assertEqual(presence_states_compare, db_presence_states)
  285. class PresenceTimeoutTestCase(unittest.TestCase):
  286. """Tests different timers and that the timer does not change `status_msg` of user."""
  287. def test_idle_timer(self):
  288. user_id = "@foo:bar"
  289. status_msg = "I'm here!"
  290. now = 5000000
  291. state = UserPresenceState.default(user_id)
  292. state = state.copy_and_replace(
  293. state=PresenceState.ONLINE,
  294. last_active_ts=now - IDLE_TIMER - 1,
  295. last_user_sync_ts=now,
  296. status_msg=status_msg,
  297. )
  298. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  299. self.assertIsNotNone(new_state)
  300. assert new_state is not None
  301. self.assertEqual(new_state.state, PresenceState.UNAVAILABLE)
  302. self.assertEqual(new_state.status_msg, status_msg)
  303. def test_busy_no_idle(self):
  304. """
  305. Tests that a user setting their presence to busy but idling doesn't turn their
  306. presence state into unavailable.
  307. """
  308. user_id = "@foo:bar"
  309. status_msg = "I'm here!"
  310. now = 5000000
  311. state = UserPresenceState.default(user_id)
  312. state = state.copy_and_replace(
  313. state=PresenceState.BUSY,
  314. last_active_ts=now - IDLE_TIMER - 1,
  315. last_user_sync_ts=now,
  316. status_msg=status_msg,
  317. )
  318. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  319. self.assertIsNotNone(new_state)
  320. assert new_state is not None
  321. self.assertEqual(new_state.state, PresenceState.BUSY)
  322. self.assertEqual(new_state.status_msg, status_msg)
  323. def test_sync_timeout(self):
  324. user_id = "@foo:bar"
  325. status_msg = "I'm here!"
  326. now = 5000000
  327. state = UserPresenceState.default(user_id)
  328. state = state.copy_and_replace(
  329. state=PresenceState.ONLINE,
  330. last_active_ts=0,
  331. last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  332. status_msg=status_msg,
  333. )
  334. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  335. self.assertIsNotNone(new_state)
  336. assert new_state is not None
  337. self.assertEqual(new_state.state, PresenceState.OFFLINE)
  338. self.assertEqual(new_state.status_msg, status_msg)
  339. def test_sync_online(self):
  340. user_id = "@foo:bar"
  341. status_msg = "I'm here!"
  342. now = 5000000
  343. state = UserPresenceState.default(user_id)
  344. state = state.copy_and_replace(
  345. state=PresenceState.ONLINE,
  346. last_active_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  347. last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  348. status_msg=status_msg,
  349. )
  350. new_state = handle_timeout(
  351. state, is_mine=True, syncing_user_ids={user_id}, now=now
  352. )
  353. self.assertIsNotNone(new_state)
  354. assert new_state is not None
  355. self.assertEqual(new_state.state, PresenceState.ONLINE)
  356. self.assertEqual(new_state.status_msg, status_msg)
  357. def test_federation_ping(self):
  358. user_id = "@foo:bar"
  359. status_msg = "I'm here!"
  360. now = 5000000
  361. state = UserPresenceState.default(user_id)
  362. state = state.copy_and_replace(
  363. state=PresenceState.ONLINE,
  364. last_active_ts=now,
  365. last_user_sync_ts=now,
  366. last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
  367. status_msg=status_msg,
  368. )
  369. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  370. self.assertIsNotNone(new_state)
  371. self.assertEqual(state, new_state)
  372. def test_no_timeout(self):
  373. user_id = "@foo:bar"
  374. now = 5000000
  375. state = UserPresenceState.default(user_id)
  376. state = state.copy_and_replace(
  377. state=PresenceState.ONLINE,
  378. last_active_ts=now,
  379. last_user_sync_ts=now,
  380. last_federation_update_ts=now,
  381. )
  382. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  383. self.assertIsNone(new_state)
  384. def test_federation_timeout(self):
  385. user_id = "@foo:bar"
  386. status_msg = "I'm here!"
  387. now = 5000000
  388. state = UserPresenceState.default(user_id)
  389. state = state.copy_and_replace(
  390. state=PresenceState.ONLINE,
  391. last_active_ts=now,
  392. last_user_sync_ts=now,
  393. last_federation_update_ts=now - FEDERATION_TIMEOUT - 1,
  394. status_msg=status_msg,
  395. )
  396. new_state = handle_timeout(
  397. state, is_mine=False, syncing_user_ids=set(), now=now
  398. )
  399. self.assertIsNotNone(new_state)
  400. assert new_state is not None
  401. self.assertEqual(new_state.state, PresenceState.OFFLINE)
  402. self.assertEqual(new_state.status_msg, status_msg)
  403. def test_last_active(self):
  404. user_id = "@foo:bar"
  405. status_msg = "I'm here!"
  406. now = 5000000
  407. state = UserPresenceState.default(user_id)
  408. state = state.copy_and_replace(
  409. state=PresenceState.ONLINE,
  410. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
  411. last_user_sync_ts=now,
  412. last_federation_update_ts=now,
  413. status_msg=status_msg,
  414. )
  415. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  416. self.assertIsNotNone(new_state)
  417. self.assertEqual(state, new_state)
  418. class PresenceHandlerTestCase(unittest.HomeserverTestCase):
  419. def prepare(self, reactor, clock, hs):
  420. self.presence_handler = hs.get_presence_handler()
  421. self.clock = hs.get_clock()
  422. def test_external_process_timeout(self):
  423. """Test that if an external process doesn't update the records for a while
  424. we time out their syncing users presence.
  425. """
  426. process_id = 1
  427. user_id = "@test:server"
  428. # Notify handler that a user is now syncing.
  429. self.get_success(
  430. self.presence_handler.update_external_syncs_row(
  431. process_id, user_id, True, self.clock.time_msec()
  432. )
  433. )
  434. # Check that if we wait a while without telling the handler the user has
  435. # stopped syncing that their presence state doesn't get timed out.
  436. self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)
  437. state = self.get_success(
  438. self.presence_handler.get_state(UserID.from_string(user_id))
  439. )
  440. self.assertEqual(state.state, PresenceState.ONLINE)
  441. # Check that if the external process timeout fires, then the syncing
  442. # user gets timed out
  443. self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)
  444. state = self.get_success(
  445. self.presence_handler.get_state(UserID.from_string(user_id))
  446. )
  447. self.assertEqual(state.state, PresenceState.OFFLINE)
  448. def test_user_goes_offline_by_timeout_status_msg_remain(self):
  449. """Test that if a user doesn't update the records for a while
  450. users presence goes `OFFLINE` because of timeout and `status_msg` remains.
  451. """
  452. user_id = "@test:server"
  453. status_msg = "I'm here!"
  454. # Mark user as online
  455. self._set_presencestate_with_status_msg(
  456. user_id, PresenceState.ONLINE, status_msg
  457. )
  458. # Check that if we wait a while without telling the handler the user has
  459. # stopped syncing that their presence state doesn't get timed out.
  460. self.reactor.advance(SYNC_ONLINE_TIMEOUT / 2)
  461. state = self.get_success(
  462. self.presence_handler.get_state(UserID.from_string(user_id))
  463. )
  464. self.assertEqual(state.state, PresenceState.ONLINE)
  465. self.assertEqual(state.status_msg, status_msg)
  466. # Check that if the timeout fires, then the syncing user gets timed out
  467. self.reactor.advance(SYNC_ONLINE_TIMEOUT)
  468. state = self.get_success(
  469. self.presence_handler.get_state(UserID.from_string(user_id))
  470. )
  471. # status_msg should remain even after going offline
  472. self.assertEqual(state.state, PresenceState.OFFLINE)
  473. self.assertEqual(state.status_msg, status_msg)
  474. def test_user_goes_offline_manually_with_no_status_msg(self):
  475. """Test that if a user change presence manually to `OFFLINE`
  476. and no status is set, that `status_msg` is `None`.
  477. """
  478. user_id = "@test:server"
  479. status_msg = "I'm here!"
  480. # Mark user as online
  481. self._set_presencestate_with_status_msg(
  482. user_id, PresenceState.ONLINE, status_msg
  483. )
  484. # Mark user as offline
  485. self.get_success(
  486. self.presence_handler.set_state(
  487. UserID.from_string(user_id), {"presence": PresenceState.OFFLINE}
  488. )
  489. )
  490. state = self.get_success(
  491. self.presence_handler.get_state(UserID.from_string(user_id))
  492. )
  493. self.assertEqual(state.state, PresenceState.OFFLINE)
  494. self.assertEqual(state.status_msg, None)
  495. def test_user_goes_offline_manually_with_status_msg(self):
  496. """Test that if a user change presence manually to `OFFLINE`
  497. and a status is set, that `status_msg` appears.
  498. """
  499. user_id = "@test:server"
  500. status_msg = "I'm here!"
  501. # Mark user as online
  502. self._set_presencestate_with_status_msg(
  503. user_id, PresenceState.ONLINE, status_msg
  504. )
  505. # Mark user as offline
  506. self._set_presencestate_with_status_msg(
  507. user_id, PresenceState.OFFLINE, "And now here."
  508. )
  509. def test_user_reset_online_with_no_status(self):
  510. """Test that if a user set again the presence manually
  511. and no status is set, that `status_msg` is `None`.
  512. """
  513. user_id = "@test:server"
  514. status_msg = "I'm here!"
  515. # Mark user as online
  516. self._set_presencestate_with_status_msg(
  517. user_id, PresenceState.ONLINE, status_msg
  518. )
  519. # Mark user as online again
  520. self.get_success(
  521. self.presence_handler.set_state(
  522. UserID.from_string(user_id), {"presence": PresenceState.ONLINE}
  523. )
  524. )
  525. state = self.get_success(
  526. self.presence_handler.get_state(UserID.from_string(user_id))
  527. )
  528. # status_msg should remain even after going offline
  529. self.assertEqual(state.state, PresenceState.ONLINE)
  530. self.assertEqual(state.status_msg, None)
  531. def test_set_presence_with_status_msg_none(self):
  532. """Test that if a user set again the presence manually
  533. and status is `None`, that `status_msg` is `None`.
  534. """
  535. user_id = "@test:server"
  536. status_msg = "I'm here!"
  537. # Mark user as online
  538. self._set_presencestate_with_status_msg(
  539. user_id, PresenceState.ONLINE, status_msg
  540. )
  541. # Mark user as online and `status_msg = None`
  542. self._set_presencestate_with_status_msg(user_id, PresenceState.ONLINE, None)
  543. def _set_presencestate_with_status_msg(
  544. self, user_id: str, state: str, status_msg: Optional[str]
  545. ):
  546. """Set a PresenceState and status_msg and check the result.
  547. Args:
  548. user_id: User for that the status is to be set.
  549. state: The new PresenceState.
  550. status_msg: Status message that is to be set.
  551. """
  552. self.get_success(
  553. self.presence_handler.set_state(
  554. UserID.from_string(user_id),
  555. {"presence": state, "status_msg": status_msg},
  556. )
  557. )
  558. new_state = self.get_success(
  559. self.presence_handler.get_state(UserID.from_string(user_id))
  560. )
  561. self.assertEqual(new_state.state, state)
  562. self.assertEqual(new_state.status_msg, status_msg)
  563. class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
  564. def prepare(self, reactor, clock, hs):
  565. self.presence_handler = hs.get_presence_handler()
  566. self.clock = hs.get_clock()
  567. self.instance_name = hs.get_instance_name()
  568. self.queue = self.presence_handler.get_federation_queue()
  569. def test_send_and_get(self):
  570. state1 = UserPresenceState.default("@user1:test")
  571. state2 = UserPresenceState.default("@user2:test")
  572. state3 = UserPresenceState.default("@user3:test")
  573. prev_token = self.queue.get_current_token(self.instance_name)
  574. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  575. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  576. now_token = self.queue.get_current_token(self.instance_name)
  577. rows, upto_token, limited = self.get_success(
  578. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  579. )
  580. self.assertEqual(upto_token, now_token)
  581. self.assertFalse(limited)
  582. expected_rows = [
  583. (1, ("dest1", "@user1:test")),
  584. (1, ("dest2", "@user1:test")),
  585. (1, ("dest1", "@user2:test")),
  586. (1, ("dest2", "@user2:test")),
  587. (2, ("dest3", "@user3:test")),
  588. ]
  589. self.assertCountEqual(rows, expected_rows)
  590. now_token = self.queue.get_current_token(self.instance_name)
  591. rows, upto_token, limited = self.get_success(
  592. self.queue.get_replication_rows("master", upto_token, now_token, 10)
  593. )
  594. self.assertEqual(upto_token, now_token)
  595. self.assertFalse(limited)
  596. self.assertCountEqual(rows, [])
  597. def test_send_and_get_split(self):
  598. state1 = UserPresenceState.default("@user1:test")
  599. state2 = UserPresenceState.default("@user2:test")
  600. state3 = UserPresenceState.default("@user3:test")
  601. prev_token = self.queue.get_current_token(self.instance_name)
  602. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  603. now_token = self.queue.get_current_token(self.instance_name)
  604. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  605. rows, upto_token, limited = self.get_success(
  606. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  607. )
  608. self.assertEqual(upto_token, now_token)
  609. self.assertFalse(limited)
  610. expected_rows = [
  611. (1, ("dest1", "@user1:test")),
  612. (1, ("dest2", "@user1:test")),
  613. (1, ("dest1", "@user2:test")),
  614. (1, ("dest2", "@user2:test")),
  615. ]
  616. self.assertCountEqual(rows, expected_rows)
  617. now_token = self.queue.get_current_token(self.instance_name)
  618. rows, upto_token, limited = self.get_success(
  619. self.queue.get_replication_rows("master", upto_token, now_token, 10)
  620. )
  621. self.assertEqual(upto_token, now_token)
  622. self.assertFalse(limited)
  623. expected_rows = [
  624. (2, ("dest3", "@user3:test")),
  625. ]
  626. self.assertCountEqual(rows, expected_rows)
  627. def test_clear_queue_all(self):
  628. state1 = UserPresenceState.default("@user1:test")
  629. state2 = UserPresenceState.default("@user2:test")
  630. state3 = UserPresenceState.default("@user3:test")
  631. prev_token = self.queue.get_current_token(self.instance_name)
  632. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  633. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  634. self.reactor.advance(10 * 60 * 1000)
  635. now_token = self.queue.get_current_token(self.instance_name)
  636. rows, upto_token, limited = self.get_success(
  637. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  638. )
  639. self.assertEqual(upto_token, now_token)
  640. self.assertFalse(limited)
  641. self.assertCountEqual(rows, [])
  642. prev_token = self.queue.get_current_token(self.instance_name)
  643. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  644. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  645. now_token = self.queue.get_current_token(self.instance_name)
  646. rows, upto_token, limited = self.get_success(
  647. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  648. )
  649. self.assertEqual(upto_token, now_token)
  650. self.assertFalse(limited)
  651. expected_rows = [
  652. (3, ("dest1", "@user1:test")),
  653. (3, ("dest2", "@user1:test")),
  654. (3, ("dest1", "@user2:test")),
  655. (3, ("dest2", "@user2:test")),
  656. (4, ("dest3", "@user3:test")),
  657. ]
  658. self.assertCountEqual(rows, expected_rows)
  659. def test_partially_clear_queue(self):
  660. state1 = UserPresenceState.default("@user1:test")
  661. state2 = UserPresenceState.default("@user2:test")
  662. state3 = UserPresenceState.default("@user3:test")
  663. prev_token = self.queue.get_current_token(self.instance_name)
  664. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  665. self.reactor.advance(2 * 60 * 1000)
  666. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  667. self.reactor.advance(4 * 60 * 1000)
  668. now_token = self.queue.get_current_token(self.instance_name)
  669. rows, upto_token, limited = self.get_success(
  670. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  671. )
  672. self.assertEqual(upto_token, now_token)
  673. self.assertFalse(limited)
  674. expected_rows = [
  675. (2, ("dest3", "@user3:test")),
  676. ]
  677. self.assertCountEqual(rows, [])
  678. prev_token = self.queue.get_current_token(self.instance_name)
  679. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  680. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  681. now_token = self.queue.get_current_token(self.instance_name)
  682. rows, upto_token, limited = self.get_success(
  683. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  684. )
  685. self.assertEqual(upto_token, now_token)
  686. self.assertFalse(limited)
  687. expected_rows = [
  688. (3, ("dest1", "@user1:test")),
  689. (3, ("dest2", "@user1:test")),
  690. (3, ("dest1", "@user2:test")),
  691. (3, ("dest2", "@user2:test")),
  692. (4, ("dest3", "@user3:test")),
  693. ]
  694. self.assertCountEqual(rows, expected_rows)
  695. class PresenceJoinTestCase(unittest.HomeserverTestCase):
  696. """Tests remote servers get told about presence of users in the room when
  697. they join and when new local users join.
  698. """
  699. user_id = "@test:server"
  700. servlets = [room.register_servlets]
  701. def make_homeserver(self, reactor, clock):
  702. hs = self.setup_test_homeserver(
  703. "server",
  704. federation_http_client=None,
  705. federation_sender=Mock(spec=FederationSender),
  706. )
  707. return hs
  708. def default_config(self):
  709. config = super().default_config()
  710. config["send_federation"] = True
  711. return config
  712. def prepare(self, reactor, clock, hs):
  713. self.federation_sender = hs.get_federation_sender()
  714. self.event_builder_factory = hs.get_event_builder_factory()
  715. self.federation_event_handler = hs.get_federation_event_handler()
  716. self.presence_handler = hs.get_presence_handler()
  717. # self.event_builder_for_2 = EventBuilderFactory(hs)
  718. # self.event_builder_for_2.hostname = "test2"
  719. self.store = hs.get_datastores().main
  720. self.state = hs.get_state_handler()
  721. self._event_auth_handler = hs.get_event_auth_handler()
  722. # We don't actually check signatures in tests, so lets just create a
  723. # random key to use.
  724. self.random_signing_key = generate_signing_key("ver")
  725. def test_remote_joins(self):
  726. # We advance time to something that isn't 0, as we use 0 as a special
  727. # value.
  728. self.reactor.advance(1000000000000)
  729. # Create a room with two local users
  730. room_id = self.helper.create_room_as(self.user_id)
  731. self.helper.join(room_id, "@test2:server")
  732. # Mark test2 as online, test will be offline with a last_active of 0
  733. self.get_success(
  734. self.presence_handler.set_state(
  735. UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}
  736. )
  737. )
  738. self.reactor.pump([0]) # Wait for presence updates to be handled
  739. #
  740. # Test that a new server gets told about existing presence
  741. #
  742. self.federation_sender.reset_mock()
  743. # Add a new remote server to the room
  744. self._add_new_user(room_id, "@alice:server2")
  745. # When new server is joined we send it the local users presence states.
  746. # We expect to only see user @test2:server, as @test:server is offline
  747. # and has a zero last_active_ts
  748. expected_state = self.get_success(
  749. self.presence_handler.current_state_for_user("@test2:server")
  750. )
  751. self.assertEqual(expected_state.state, PresenceState.ONLINE)
  752. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  753. destinations={"server2"}, states=[expected_state]
  754. )
  755. #
  756. # Test that only the new server gets sent presence and not existing servers
  757. #
  758. self.federation_sender.reset_mock()
  759. self._add_new_user(room_id, "@bob:server3")
  760. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  761. destinations={"server3"}, states=[expected_state]
  762. )
  763. def test_remote_gets_presence_when_local_user_joins(self):
  764. # We advance time to something that isn't 0, as we use 0 as a special
  765. # value.
  766. self.reactor.advance(1000000000000)
  767. # Create a room with one local users
  768. room_id = self.helper.create_room_as(self.user_id)
  769. # Mark test as online
  770. self.get_success(
  771. self.presence_handler.set_state(
  772. UserID.from_string("@test:server"), {"presence": PresenceState.ONLINE}
  773. )
  774. )
  775. # Mark test2 as online, test will be offline with a last_active of 0.
  776. # Note we don't join them to the room yet
  777. self.get_success(
  778. self.presence_handler.set_state(
  779. UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}
  780. )
  781. )
  782. # Add servers to the room
  783. self._add_new_user(room_id, "@alice:server2")
  784. self._add_new_user(room_id, "@bob:server3")
  785. self.reactor.pump([0]) # Wait for presence updates to be handled
  786. #
  787. # Test that when a local join happens remote servers get told about it
  788. #
  789. self.federation_sender.reset_mock()
  790. # Join local user to room
  791. self.helper.join(room_id, "@test2:server")
  792. self.reactor.pump([0]) # Wait for presence updates to be handled
  793. # We expect to only send test2 presence to server2 and server3
  794. expected_state = self.get_success(
  795. self.presence_handler.current_state_for_user("@test2:server")
  796. )
  797. self.assertEqual(expected_state.state, PresenceState.ONLINE)
  798. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  799. destinations={"server2", "server3"}, states=[expected_state]
  800. )
  801. def _add_new_user(self, room_id, user_id):
  802. """Add new user to the room by creating an event and poking the federation API."""
  803. hostname = get_domain_from_id(user_id)
  804. room_version = self.get_success(self.store.get_room_version_id(room_id))
  805. builder = EventBuilder(
  806. state=self.state,
  807. event_auth_handler=self._event_auth_handler,
  808. store=self.store,
  809. clock=self.clock,
  810. hostname=hostname,
  811. signing_key=self.random_signing_key,
  812. room_version=KNOWN_ROOM_VERSIONS[room_version],
  813. room_id=room_id,
  814. type=EventTypes.Member,
  815. sender=user_id,
  816. state_key=user_id,
  817. content={"membership": Membership.JOIN},
  818. )
  819. prev_event_ids = self.get_success(
  820. self.store.get_latest_event_ids_in_room(room_id)
  821. )
  822. event = self.get_success(
  823. builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
  824. )
  825. self.get_success(self.federation_event_handler.on_receive_pdu(hostname, event))
  826. # Check that it was successfully persisted.
  827. self.get_success(self.store.get_event(event.event_id))
  828. self.get_success(self.store.get_event(event.event_id))