test_presence.py 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from unittest.mock import Mock, call
  15. from signedjson.key import generate_signing_key
  16. from synapse.api.constants import EventTypes, Membership, PresenceState
  17. from synapse.api.presence import UserPresenceState
  18. from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
  19. from synapse.events.builder import EventBuilder
  20. from synapse.federation.sender import FederationSender
  21. from synapse.handlers.presence import (
  22. EXTERNAL_PROCESS_EXPIRY,
  23. FEDERATION_PING_INTERVAL,
  24. FEDERATION_TIMEOUT,
  25. IDLE_TIMER,
  26. LAST_ACTIVE_GRANULARITY,
  27. SYNC_ONLINE_TIMEOUT,
  28. handle_timeout,
  29. handle_update,
  30. )
  31. from synapse.rest import admin
  32. from synapse.rest.client.v1 import room
  33. from synapse.types import UserID, get_domain_from_id
  34. from tests import unittest
  35. class PresenceUpdateTestCase(unittest.HomeserverTestCase):
  36. servlets = [admin.register_servlets]
  37. def prepare(self, reactor, clock, homeserver):
  38. self.store = homeserver.get_datastore()
  39. def test_offline_to_online(self):
  40. wheel_timer = Mock()
  41. user_id = "@foo:bar"
  42. now = 5000000
  43. prev_state = UserPresenceState.default(user_id)
  44. new_state = prev_state.copy_and_replace(
  45. state=PresenceState.ONLINE, last_active_ts=now
  46. )
  47. state, persist_and_notify, federation_ping = handle_update(
  48. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  49. )
  50. self.assertTrue(persist_and_notify)
  51. self.assertTrue(state.currently_active)
  52. self.assertEquals(new_state.state, state.state)
  53. self.assertEquals(new_state.status_msg, state.status_msg)
  54. self.assertEquals(state.last_federation_update_ts, now)
  55. self.assertEquals(wheel_timer.insert.call_count, 3)
  56. wheel_timer.insert.assert_has_calls(
  57. [
  58. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  59. call(
  60. now=now,
  61. obj=user_id,
  62. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  63. ),
  64. call(
  65. now=now,
  66. obj=user_id,
  67. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  68. ),
  69. ],
  70. any_order=True,
  71. )
  72. def test_online_to_online(self):
  73. wheel_timer = Mock()
  74. user_id = "@foo:bar"
  75. now = 5000000
  76. prev_state = UserPresenceState.default(user_id)
  77. prev_state = prev_state.copy_and_replace(
  78. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  79. )
  80. new_state = prev_state.copy_and_replace(
  81. state=PresenceState.ONLINE, last_active_ts=now
  82. )
  83. state, persist_and_notify, federation_ping = handle_update(
  84. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  85. )
  86. self.assertFalse(persist_and_notify)
  87. self.assertTrue(federation_ping)
  88. self.assertTrue(state.currently_active)
  89. self.assertEquals(new_state.state, state.state)
  90. self.assertEquals(new_state.status_msg, state.status_msg)
  91. self.assertEquals(state.last_federation_update_ts, now)
  92. self.assertEquals(wheel_timer.insert.call_count, 3)
  93. wheel_timer.insert.assert_has_calls(
  94. [
  95. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  96. call(
  97. now=now,
  98. obj=user_id,
  99. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  100. ),
  101. call(
  102. now=now,
  103. obj=user_id,
  104. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  105. ),
  106. ],
  107. any_order=True,
  108. )
  109. def test_online_to_online_last_active_noop(self):
  110. wheel_timer = Mock()
  111. user_id = "@foo:bar"
  112. now = 5000000
  113. prev_state = UserPresenceState.default(user_id)
  114. prev_state = prev_state.copy_and_replace(
  115. state=PresenceState.ONLINE,
  116. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 10,
  117. currently_active=True,
  118. )
  119. new_state = prev_state.copy_and_replace(
  120. state=PresenceState.ONLINE, last_active_ts=now
  121. )
  122. state, persist_and_notify, federation_ping = handle_update(
  123. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  124. )
  125. self.assertFalse(persist_and_notify)
  126. self.assertTrue(federation_ping)
  127. self.assertTrue(state.currently_active)
  128. self.assertEquals(new_state.state, state.state)
  129. self.assertEquals(new_state.status_msg, state.status_msg)
  130. self.assertEquals(state.last_federation_update_ts, now)
  131. self.assertEquals(wheel_timer.insert.call_count, 3)
  132. wheel_timer.insert.assert_has_calls(
  133. [
  134. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  135. call(
  136. now=now,
  137. obj=user_id,
  138. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  139. ),
  140. call(
  141. now=now,
  142. obj=user_id,
  143. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  144. ),
  145. ],
  146. any_order=True,
  147. )
  148. def test_online_to_online_last_active(self):
  149. wheel_timer = Mock()
  150. user_id = "@foo:bar"
  151. now = 5000000
  152. prev_state = UserPresenceState.default(user_id)
  153. prev_state = prev_state.copy_and_replace(
  154. state=PresenceState.ONLINE,
  155. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
  156. currently_active=True,
  157. )
  158. new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
  159. state, persist_and_notify, federation_ping = handle_update(
  160. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  161. )
  162. self.assertTrue(persist_and_notify)
  163. self.assertFalse(state.currently_active)
  164. self.assertEquals(new_state.state, state.state)
  165. self.assertEquals(new_state.status_msg, state.status_msg)
  166. self.assertEquals(state.last_federation_update_ts, now)
  167. self.assertEquals(wheel_timer.insert.call_count, 2)
  168. wheel_timer.insert.assert_has_calls(
  169. [
  170. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  171. call(
  172. now=now,
  173. obj=user_id,
  174. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  175. ),
  176. ],
  177. any_order=True,
  178. )
  179. def test_remote_ping_timer(self):
  180. wheel_timer = Mock()
  181. user_id = "@foo:bar"
  182. now = 5000000
  183. prev_state = UserPresenceState.default(user_id)
  184. prev_state = prev_state.copy_and_replace(
  185. state=PresenceState.ONLINE, last_active_ts=now
  186. )
  187. new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
  188. state, persist_and_notify, federation_ping = handle_update(
  189. prev_state, new_state, is_mine=False, wheel_timer=wheel_timer, now=now
  190. )
  191. self.assertFalse(persist_and_notify)
  192. self.assertFalse(federation_ping)
  193. self.assertFalse(state.currently_active)
  194. self.assertEquals(new_state.state, state.state)
  195. self.assertEquals(new_state.status_msg, state.status_msg)
  196. self.assertEquals(wheel_timer.insert.call_count, 1)
  197. wheel_timer.insert.assert_has_calls(
  198. [
  199. call(
  200. now=now,
  201. obj=user_id,
  202. then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
  203. )
  204. ],
  205. any_order=True,
  206. )
  207. def test_online_to_offline(self):
  208. wheel_timer = Mock()
  209. user_id = "@foo:bar"
  210. now = 5000000
  211. prev_state = UserPresenceState.default(user_id)
  212. prev_state = prev_state.copy_and_replace(
  213. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  214. )
  215. new_state = prev_state.copy_and_replace(state=PresenceState.OFFLINE)
  216. state, persist_and_notify, federation_ping = handle_update(
  217. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  218. )
  219. self.assertTrue(persist_and_notify)
  220. self.assertEquals(new_state.state, state.state)
  221. self.assertEquals(state.last_federation_update_ts, now)
  222. self.assertEquals(wheel_timer.insert.call_count, 0)
  223. def test_online_to_idle(self):
  224. wheel_timer = Mock()
  225. user_id = "@foo:bar"
  226. now = 5000000
  227. prev_state = UserPresenceState.default(user_id)
  228. prev_state = prev_state.copy_and_replace(
  229. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  230. )
  231. new_state = prev_state.copy_and_replace(state=PresenceState.UNAVAILABLE)
  232. state, persist_and_notify, federation_ping = handle_update(
  233. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  234. )
  235. self.assertTrue(persist_and_notify)
  236. self.assertEquals(new_state.state, state.state)
  237. self.assertEquals(state.last_federation_update_ts, now)
  238. self.assertEquals(new_state.state, state.state)
  239. self.assertEquals(new_state.status_msg, state.status_msg)
  240. self.assertEquals(wheel_timer.insert.call_count, 1)
  241. wheel_timer.insert.assert_has_calls(
  242. [
  243. call(
  244. now=now,
  245. obj=user_id,
  246. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  247. )
  248. ],
  249. any_order=True,
  250. )
  251. def test_persisting_presence_updates(self):
  252. """Tests that the latest presence state for each user is persisted correctly"""
  253. # Create some test users and presence states for them
  254. presence_states = []
  255. for i in range(5):
  256. user_id = self.register_user(f"user_{i}", "password")
  257. presence_state = UserPresenceState(
  258. user_id=user_id,
  259. state="online",
  260. last_active_ts=1,
  261. last_federation_update_ts=1,
  262. last_user_sync_ts=1,
  263. status_msg="I'm online!",
  264. currently_active=True,
  265. )
  266. presence_states.append(presence_state)
  267. # Persist these presence updates to the database
  268. self.get_success(self.store.update_presence(presence_states))
  269. # Check that each update is present in the database
  270. db_presence_states = self.get_success(
  271. self.store.get_all_presence_updates(
  272. instance_name="master",
  273. last_id=0,
  274. current_id=len(presence_states) + 1,
  275. limit=len(presence_states),
  276. )
  277. )
  278. # Extract presence update user ID and state information into lists of tuples
  279. db_presence_states = [(ps[0], ps[1]) for _, ps in db_presence_states[0]]
  280. presence_states = [(ps.user_id, ps.state) for ps in presence_states]
  281. # Compare what we put into the storage with what we got out.
  282. # They should be identical.
  283. self.assertEqual(presence_states, db_presence_states)
  284. class PresenceTimeoutTestCase(unittest.TestCase):
  285. def test_idle_timer(self):
  286. user_id = "@foo:bar"
  287. now = 5000000
  288. state = UserPresenceState.default(user_id)
  289. state = state.copy_and_replace(
  290. state=PresenceState.ONLINE,
  291. last_active_ts=now - IDLE_TIMER - 1,
  292. last_user_sync_ts=now,
  293. )
  294. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  295. self.assertIsNotNone(new_state)
  296. self.assertEquals(new_state.state, PresenceState.UNAVAILABLE)
  297. def test_busy_no_idle(self):
  298. """
  299. Tests that a user setting their presence to busy but idling doesn't turn their
  300. presence state into unavailable.
  301. """
  302. user_id = "@foo:bar"
  303. now = 5000000
  304. state = UserPresenceState.default(user_id)
  305. state = state.copy_and_replace(
  306. state=PresenceState.BUSY,
  307. last_active_ts=now - IDLE_TIMER - 1,
  308. last_user_sync_ts=now,
  309. )
  310. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  311. self.assertIsNotNone(new_state)
  312. self.assertEquals(new_state.state, PresenceState.BUSY)
  313. def test_sync_timeout(self):
  314. user_id = "@foo:bar"
  315. now = 5000000
  316. state = UserPresenceState.default(user_id)
  317. state = state.copy_and_replace(
  318. state=PresenceState.ONLINE,
  319. last_active_ts=0,
  320. last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  321. )
  322. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  323. self.assertIsNotNone(new_state)
  324. self.assertEquals(new_state.state, PresenceState.OFFLINE)
  325. def test_sync_online(self):
  326. user_id = "@foo:bar"
  327. now = 5000000
  328. state = UserPresenceState.default(user_id)
  329. state = state.copy_and_replace(
  330. state=PresenceState.ONLINE,
  331. last_active_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  332. last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  333. )
  334. new_state = handle_timeout(
  335. state, is_mine=True, syncing_user_ids={user_id}, now=now
  336. )
  337. self.assertIsNotNone(new_state)
  338. self.assertEquals(new_state.state, PresenceState.ONLINE)
  339. def test_federation_ping(self):
  340. user_id = "@foo:bar"
  341. now = 5000000
  342. state = UserPresenceState.default(user_id)
  343. state = state.copy_and_replace(
  344. state=PresenceState.ONLINE,
  345. last_active_ts=now,
  346. last_user_sync_ts=now,
  347. last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
  348. )
  349. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  350. self.assertIsNotNone(new_state)
  351. self.assertEquals(new_state, new_state)
  352. def test_no_timeout(self):
  353. user_id = "@foo:bar"
  354. now = 5000000
  355. state = UserPresenceState.default(user_id)
  356. state = state.copy_and_replace(
  357. state=PresenceState.ONLINE,
  358. last_active_ts=now,
  359. last_user_sync_ts=now,
  360. last_federation_update_ts=now,
  361. )
  362. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  363. self.assertIsNone(new_state)
  364. def test_federation_timeout(self):
  365. user_id = "@foo:bar"
  366. now = 5000000
  367. state = UserPresenceState.default(user_id)
  368. state = state.copy_and_replace(
  369. state=PresenceState.ONLINE,
  370. last_active_ts=now,
  371. last_user_sync_ts=now,
  372. last_federation_update_ts=now - FEDERATION_TIMEOUT - 1,
  373. )
  374. new_state = handle_timeout(
  375. state, is_mine=False, syncing_user_ids=set(), now=now
  376. )
  377. self.assertIsNotNone(new_state)
  378. self.assertEquals(new_state.state, PresenceState.OFFLINE)
  379. def test_last_active(self):
  380. user_id = "@foo:bar"
  381. now = 5000000
  382. state = UserPresenceState.default(user_id)
  383. state = state.copy_and_replace(
  384. state=PresenceState.ONLINE,
  385. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
  386. last_user_sync_ts=now,
  387. last_federation_update_ts=now,
  388. )
  389. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  390. self.assertIsNotNone(new_state)
  391. self.assertEquals(state, new_state)
  392. class PresenceHandlerTestCase(unittest.HomeserverTestCase):
  393. def prepare(self, reactor, clock, hs):
  394. self.presence_handler = hs.get_presence_handler()
  395. self.clock = hs.get_clock()
  396. def test_external_process_timeout(self):
  397. """Test that if an external process doesn't update the records for a while
  398. we time out their syncing users presence.
  399. """
  400. process_id = 1
  401. user_id = "@test:server"
  402. # Notify handler that a user is now syncing.
  403. self.get_success(
  404. self.presence_handler.update_external_syncs_row(
  405. process_id, user_id, True, self.clock.time_msec()
  406. )
  407. )
  408. # Check that if we wait a while without telling the handler the user has
  409. # stopped syncing that their presence state doesn't get timed out.
  410. self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)
  411. state = self.get_success(
  412. self.presence_handler.get_state(UserID.from_string(user_id))
  413. )
  414. self.assertEqual(state.state, PresenceState.ONLINE)
  415. # Check that if the external process timeout fires, then the syncing
  416. # user gets timed out
  417. self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)
  418. state = self.get_success(
  419. self.presence_handler.get_state(UserID.from_string(user_id))
  420. )
  421. self.assertEqual(state.state, PresenceState.OFFLINE)
  422. class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
  423. def prepare(self, reactor, clock, hs):
  424. self.presence_handler = hs.get_presence_handler()
  425. self.clock = hs.get_clock()
  426. self.instance_name = hs.get_instance_name()
  427. self.queue = self.presence_handler.get_federation_queue()
  428. def test_send_and_get(self):
  429. state1 = UserPresenceState.default("@user1:test")
  430. state2 = UserPresenceState.default("@user2:test")
  431. state3 = UserPresenceState.default("@user3:test")
  432. prev_token = self.queue.get_current_token(self.instance_name)
  433. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  434. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  435. now_token = self.queue.get_current_token(self.instance_name)
  436. rows, upto_token, limited = self.get_success(
  437. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  438. )
  439. self.assertEqual(upto_token, now_token)
  440. self.assertFalse(limited)
  441. expected_rows = [
  442. (1, ("dest1", "@user1:test")),
  443. (1, ("dest2", "@user1:test")),
  444. (1, ("dest1", "@user2:test")),
  445. (1, ("dest2", "@user2:test")),
  446. (2, ("dest3", "@user3:test")),
  447. ]
  448. self.assertCountEqual(rows, expected_rows)
  449. now_token = self.queue.get_current_token(self.instance_name)
  450. rows, upto_token, limited = self.get_success(
  451. self.queue.get_replication_rows("master", upto_token, now_token, 10)
  452. )
  453. self.assertEqual(upto_token, now_token)
  454. self.assertFalse(limited)
  455. self.assertCountEqual(rows, [])
  456. def test_send_and_get_split(self):
  457. state1 = UserPresenceState.default("@user1:test")
  458. state2 = UserPresenceState.default("@user2:test")
  459. state3 = UserPresenceState.default("@user3:test")
  460. prev_token = self.queue.get_current_token(self.instance_name)
  461. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  462. now_token = self.queue.get_current_token(self.instance_name)
  463. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  464. rows, upto_token, limited = self.get_success(
  465. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  466. )
  467. self.assertEqual(upto_token, now_token)
  468. self.assertFalse(limited)
  469. expected_rows = [
  470. (1, ("dest1", "@user1:test")),
  471. (1, ("dest2", "@user1:test")),
  472. (1, ("dest1", "@user2:test")),
  473. (1, ("dest2", "@user2:test")),
  474. ]
  475. self.assertCountEqual(rows, expected_rows)
  476. now_token = self.queue.get_current_token(self.instance_name)
  477. rows, upto_token, limited = self.get_success(
  478. self.queue.get_replication_rows("master", upto_token, now_token, 10)
  479. )
  480. self.assertEqual(upto_token, now_token)
  481. self.assertFalse(limited)
  482. expected_rows = [
  483. (2, ("dest3", "@user3:test")),
  484. ]
  485. self.assertCountEqual(rows, expected_rows)
  486. def test_clear_queue_all(self):
  487. state1 = UserPresenceState.default("@user1:test")
  488. state2 = UserPresenceState.default("@user2:test")
  489. state3 = UserPresenceState.default("@user3:test")
  490. prev_token = self.queue.get_current_token(self.instance_name)
  491. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  492. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  493. self.reactor.advance(10 * 60 * 1000)
  494. now_token = self.queue.get_current_token(self.instance_name)
  495. rows, upto_token, limited = self.get_success(
  496. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  497. )
  498. self.assertEqual(upto_token, now_token)
  499. self.assertFalse(limited)
  500. self.assertCountEqual(rows, [])
  501. prev_token = self.queue.get_current_token(self.instance_name)
  502. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  503. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  504. now_token = self.queue.get_current_token(self.instance_name)
  505. rows, upto_token, limited = self.get_success(
  506. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  507. )
  508. self.assertEqual(upto_token, now_token)
  509. self.assertFalse(limited)
  510. expected_rows = [
  511. (3, ("dest1", "@user1:test")),
  512. (3, ("dest2", "@user1:test")),
  513. (3, ("dest1", "@user2:test")),
  514. (3, ("dest2", "@user2:test")),
  515. (4, ("dest3", "@user3:test")),
  516. ]
  517. self.assertCountEqual(rows, expected_rows)
  518. def test_partially_clear_queue(self):
  519. state1 = UserPresenceState.default("@user1:test")
  520. state2 = UserPresenceState.default("@user2:test")
  521. state3 = UserPresenceState.default("@user3:test")
  522. prev_token = self.queue.get_current_token(self.instance_name)
  523. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  524. self.reactor.advance(2 * 60 * 1000)
  525. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  526. self.reactor.advance(4 * 60 * 1000)
  527. now_token = self.queue.get_current_token(self.instance_name)
  528. rows, upto_token, limited = self.get_success(
  529. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  530. )
  531. self.assertEqual(upto_token, now_token)
  532. self.assertFalse(limited)
  533. expected_rows = [
  534. (2, ("dest3", "@user3:test")),
  535. ]
  536. self.assertCountEqual(rows, [])
  537. prev_token = self.queue.get_current_token(self.instance_name)
  538. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  539. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  540. now_token = self.queue.get_current_token(self.instance_name)
  541. rows, upto_token, limited = self.get_success(
  542. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  543. )
  544. self.assertEqual(upto_token, now_token)
  545. self.assertFalse(limited)
  546. expected_rows = [
  547. (3, ("dest1", "@user1:test")),
  548. (3, ("dest2", "@user1:test")),
  549. (3, ("dest1", "@user2:test")),
  550. (3, ("dest2", "@user2:test")),
  551. (4, ("dest3", "@user3:test")),
  552. ]
  553. self.assertCountEqual(rows, expected_rows)
  554. class PresenceJoinTestCase(unittest.HomeserverTestCase):
  555. """Tests remote servers get told about presence of users in the room when
  556. they join and when new local users join.
  557. """
  558. user_id = "@test:server"
  559. servlets = [room.register_servlets]
  560. def make_homeserver(self, reactor, clock):
  561. hs = self.setup_test_homeserver(
  562. "server",
  563. federation_http_client=None,
  564. federation_sender=Mock(spec=FederationSender),
  565. )
  566. return hs
  567. def default_config(self):
  568. config = super().default_config()
  569. config["send_federation"] = True
  570. return config
  571. def prepare(self, reactor, clock, hs):
  572. self.federation_sender = hs.get_federation_sender()
  573. self.event_builder_factory = hs.get_event_builder_factory()
  574. self.federation_handler = hs.get_federation_handler()
  575. self.presence_handler = hs.get_presence_handler()
  576. # self.event_builder_for_2 = EventBuilderFactory(hs)
  577. # self.event_builder_for_2.hostname = "test2"
  578. self.store = hs.get_datastore()
  579. self.state = hs.get_state_handler()
  580. self.auth = hs.get_auth()
  581. # We don't actually check signatures in tests, so lets just create a
  582. # random key to use.
  583. self.random_signing_key = generate_signing_key("ver")
  584. def test_remote_joins(self):
  585. # We advance time to something that isn't 0, as we use 0 as a special
  586. # value.
  587. self.reactor.advance(1000000000000)
  588. # Create a room with two local users
  589. room_id = self.helper.create_room_as(self.user_id)
  590. self.helper.join(room_id, "@test2:server")
  591. # Mark test2 as online, test will be offline with a last_active of 0
  592. self.get_success(
  593. self.presence_handler.set_state(
  594. UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}
  595. )
  596. )
  597. self.reactor.pump([0]) # Wait for presence updates to be handled
  598. #
  599. # Test that a new server gets told about existing presence
  600. #
  601. self.federation_sender.reset_mock()
  602. # Add a new remote server to the room
  603. self._add_new_user(room_id, "@alice:server2")
  604. # When new server is joined we send it the local users presence states.
  605. # We expect to only see user @test2:server, as @test:server is offline
  606. # and has a zero last_active_ts
  607. expected_state = self.get_success(
  608. self.presence_handler.current_state_for_user("@test2:server")
  609. )
  610. self.assertEqual(expected_state.state, PresenceState.ONLINE)
  611. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  612. destinations={"server2"}, states=[expected_state]
  613. )
  614. #
  615. # Test that only the new server gets sent presence and not existing servers
  616. #
  617. self.federation_sender.reset_mock()
  618. self._add_new_user(room_id, "@bob:server3")
  619. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  620. destinations={"server3"}, states=[expected_state]
  621. )
  622. def test_remote_gets_presence_when_local_user_joins(self):
  623. # We advance time to something that isn't 0, as we use 0 as a special
  624. # value.
  625. self.reactor.advance(1000000000000)
  626. # Create a room with one local users
  627. room_id = self.helper.create_room_as(self.user_id)
  628. # Mark test as online
  629. self.get_success(
  630. self.presence_handler.set_state(
  631. UserID.from_string("@test:server"), {"presence": PresenceState.ONLINE}
  632. )
  633. )
  634. # Mark test2 as online, test will be offline with a last_active of 0.
  635. # Note we don't join them to the room yet
  636. self.get_success(
  637. self.presence_handler.set_state(
  638. UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}
  639. )
  640. )
  641. # Add servers to the room
  642. self._add_new_user(room_id, "@alice:server2")
  643. self._add_new_user(room_id, "@bob:server3")
  644. self.reactor.pump([0]) # Wait for presence updates to be handled
  645. #
  646. # Test that when a local join happens remote servers get told about it
  647. #
  648. self.federation_sender.reset_mock()
  649. # Join local user to room
  650. self.helper.join(room_id, "@test2:server")
  651. self.reactor.pump([0]) # Wait for presence updates to be handled
  652. # We expect to only send test2 presence to server2 and server3
  653. expected_state = self.get_success(
  654. self.presence_handler.current_state_for_user("@test2:server")
  655. )
  656. self.assertEqual(expected_state.state, PresenceState.ONLINE)
  657. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  658. destinations={"server2", "server3"}, states=[expected_state]
  659. )
  660. def _add_new_user(self, room_id, user_id):
  661. """Add new user to the room by creating an event and poking the federation API."""
  662. hostname = get_domain_from_id(user_id)
  663. room_version = self.get_success(self.store.get_room_version_id(room_id))
  664. builder = EventBuilder(
  665. state=self.state,
  666. auth=self.auth,
  667. store=self.store,
  668. clock=self.clock,
  669. hostname=hostname,
  670. signing_key=self.random_signing_key,
  671. room_version=KNOWN_ROOM_VERSIONS[room_version],
  672. room_id=room_id,
  673. type=EventTypes.Member,
  674. sender=user_id,
  675. state_key=user_id,
  676. content={"membership": Membership.JOIN},
  677. )
  678. prev_event_ids = self.get_success(
  679. self.store.get_latest_event_ids_in_room(room_id)
  680. )
  681. event = self.get_success(
  682. builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
  683. )
  684. self.get_success(self.federation_handler.on_receive_pdu(hostname, event))
  685. # Check that it was successfully persisted.
  686. self.get_success(self.store.get_event(event.event_id))
  687. self.get_success(self.store.get_event(event.event_id))