1
0

test_presence.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from unittest.mock import Mock, call
  15. from signedjson.key import generate_signing_key
  16. from synapse.api.constants import EventTypes, Membership, PresenceState
  17. from synapse.api.presence import UserPresenceState
  18. from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
  19. from synapse.events.builder import EventBuilder
  20. from synapse.federation.sender import FederationSender
  21. from synapse.handlers.presence import (
  22. EXTERNAL_PROCESS_EXPIRY,
  23. FEDERATION_PING_INTERVAL,
  24. FEDERATION_TIMEOUT,
  25. IDLE_TIMER,
  26. LAST_ACTIVE_GRANULARITY,
  27. SYNC_ONLINE_TIMEOUT,
  28. handle_timeout,
  29. handle_update,
  30. )
  31. from synapse.rest.client.v1 import room
  32. from synapse.types import UserID, get_domain_from_id
  33. from tests import unittest
  34. class PresenceUpdateTestCase(unittest.TestCase):
  35. def test_offline_to_online(self):
  36. wheel_timer = Mock()
  37. user_id = "@foo:bar"
  38. now = 5000000
  39. prev_state = UserPresenceState.default(user_id)
  40. new_state = prev_state.copy_and_replace(
  41. state=PresenceState.ONLINE, last_active_ts=now
  42. )
  43. state, persist_and_notify, federation_ping = handle_update(
  44. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  45. )
  46. self.assertTrue(persist_and_notify)
  47. self.assertTrue(state.currently_active)
  48. self.assertEquals(new_state.state, state.state)
  49. self.assertEquals(new_state.status_msg, state.status_msg)
  50. self.assertEquals(state.last_federation_update_ts, now)
  51. self.assertEquals(wheel_timer.insert.call_count, 3)
  52. wheel_timer.insert.assert_has_calls(
  53. [
  54. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  55. call(
  56. now=now,
  57. obj=user_id,
  58. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  59. ),
  60. call(
  61. now=now,
  62. obj=user_id,
  63. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  64. ),
  65. ],
  66. any_order=True,
  67. )
  68. def test_online_to_online(self):
  69. wheel_timer = Mock()
  70. user_id = "@foo:bar"
  71. now = 5000000
  72. prev_state = UserPresenceState.default(user_id)
  73. prev_state = prev_state.copy_and_replace(
  74. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  75. )
  76. new_state = prev_state.copy_and_replace(
  77. state=PresenceState.ONLINE, last_active_ts=now
  78. )
  79. state, persist_and_notify, federation_ping = handle_update(
  80. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  81. )
  82. self.assertFalse(persist_and_notify)
  83. self.assertTrue(federation_ping)
  84. self.assertTrue(state.currently_active)
  85. self.assertEquals(new_state.state, state.state)
  86. self.assertEquals(new_state.status_msg, state.status_msg)
  87. self.assertEquals(state.last_federation_update_ts, now)
  88. self.assertEquals(wheel_timer.insert.call_count, 3)
  89. wheel_timer.insert.assert_has_calls(
  90. [
  91. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  92. call(
  93. now=now,
  94. obj=user_id,
  95. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  96. ),
  97. call(
  98. now=now,
  99. obj=user_id,
  100. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  101. ),
  102. ],
  103. any_order=True,
  104. )
  105. def test_online_to_online_last_active_noop(self):
  106. wheel_timer = Mock()
  107. user_id = "@foo:bar"
  108. now = 5000000
  109. prev_state = UserPresenceState.default(user_id)
  110. prev_state = prev_state.copy_and_replace(
  111. state=PresenceState.ONLINE,
  112. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 10,
  113. currently_active=True,
  114. )
  115. new_state = prev_state.copy_and_replace(
  116. state=PresenceState.ONLINE, last_active_ts=now
  117. )
  118. state, persist_and_notify, federation_ping = handle_update(
  119. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  120. )
  121. self.assertFalse(persist_and_notify)
  122. self.assertTrue(federation_ping)
  123. self.assertTrue(state.currently_active)
  124. self.assertEquals(new_state.state, state.state)
  125. self.assertEquals(new_state.status_msg, state.status_msg)
  126. self.assertEquals(state.last_federation_update_ts, now)
  127. self.assertEquals(wheel_timer.insert.call_count, 3)
  128. wheel_timer.insert.assert_has_calls(
  129. [
  130. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  131. call(
  132. now=now,
  133. obj=user_id,
  134. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  135. ),
  136. call(
  137. now=now,
  138. obj=user_id,
  139. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  140. ),
  141. ],
  142. any_order=True,
  143. )
  144. def test_online_to_online_last_active(self):
  145. wheel_timer = Mock()
  146. user_id = "@foo:bar"
  147. now = 5000000
  148. prev_state = UserPresenceState.default(user_id)
  149. prev_state = prev_state.copy_and_replace(
  150. state=PresenceState.ONLINE,
  151. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
  152. currently_active=True,
  153. )
  154. new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
  155. state, persist_and_notify, federation_ping = handle_update(
  156. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  157. )
  158. self.assertTrue(persist_and_notify)
  159. self.assertFalse(state.currently_active)
  160. self.assertEquals(new_state.state, state.state)
  161. self.assertEquals(new_state.status_msg, state.status_msg)
  162. self.assertEquals(state.last_federation_update_ts, now)
  163. self.assertEquals(wheel_timer.insert.call_count, 2)
  164. wheel_timer.insert.assert_has_calls(
  165. [
  166. call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
  167. call(
  168. now=now,
  169. obj=user_id,
  170. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  171. ),
  172. ],
  173. any_order=True,
  174. )
  175. def test_remote_ping_timer(self):
  176. wheel_timer = Mock()
  177. user_id = "@foo:bar"
  178. now = 5000000
  179. prev_state = UserPresenceState.default(user_id)
  180. prev_state = prev_state.copy_and_replace(
  181. state=PresenceState.ONLINE, last_active_ts=now
  182. )
  183. new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
  184. state, persist_and_notify, federation_ping = handle_update(
  185. prev_state, new_state, is_mine=False, wheel_timer=wheel_timer, now=now
  186. )
  187. self.assertFalse(persist_and_notify)
  188. self.assertFalse(federation_ping)
  189. self.assertFalse(state.currently_active)
  190. self.assertEquals(new_state.state, state.state)
  191. self.assertEquals(new_state.status_msg, state.status_msg)
  192. self.assertEquals(wheel_timer.insert.call_count, 1)
  193. wheel_timer.insert.assert_has_calls(
  194. [
  195. call(
  196. now=now,
  197. obj=user_id,
  198. then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
  199. )
  200. ],
  201. any_order=True,
  202. )
  203. def test_online_to_offline(self):
  204. wheel_timer = Mock()
  205. user_id = "@foo:bar"
  206. now = 5000000
  207. prev_state = UserPresenceState.default(user_id)
  208. prev_state = prev_state.copy_and_replace(
  209. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  210. )
  211. new_state = prev_state.copy_and_replace(state=PresenceState.OFFLINE)
  212. state, persist_and_notify, federation_ping = handle_update(
  213. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  214. )
  215. self.assertTrue(persist_and_notify)
  216. self.assertEquals(new_state.state, state.state)
  217. self.assertEquals(state.last_federation_update_ts, now)
  218. self.assertEquals(wheel_timer.insert.call_count, 0)
  219. def test_online_to_idle(self):
  220. wheel_timer = Mock()
  221. user_id = "@foo:bar"
  222. now = 5000000
  223. prev_state = UserPresenceState.default(user_id)
  224. prev_state = prev_state.copy_and_replace(
  225. state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
  226. )
  227. new_state = prev_state.copy_and_replace(state=PresenceState.UNAVAILABLE)
  228. state, persist_and_notify, federation_ping = handle_update(
  229. prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
  230. )
  231. self.assertTrue(persist_and_notify)
  232. self.assertEquals(new_state.state, state.state)
  233. self.assertEquals(state.last_federation_update_ts, now)
  234. self.assertEquals(new_state.state, state.state)
  235. self.assertEquals(new_state.status_msg, state.status_msg)
  236. self.assertEquals(wheel_timer.insert.call_count, 1)
  237. wheel_timer.insert.assert_has_calls(
  238. [
  239. call(
  240. now=now,
  241. obj=user_id,
  242. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  243. )
  244. ],
  245. any_order=True,
  246. )
  247. class PresenceTimeoutTestCase(unittest.TestCase):
  248. def test_idle_timer(self):
  249. user_id = "@foo:bar"
  250. now = 5000000
  251. state = UserPresenceState.default(user_id)
  252. state = state.copy_and_replace(
  253. state=PresenceState.ONLINE,
  254. last_active_ts=now - IDLE_TIMER - 1,
  255. last_user_sync_ts=now,
  256. )
  257. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  258. self.assertIsNotNone(new_state)
  259. self.assertEquals(new_state.state, PresenceState.UNAVAILABLE)
  260. def test_busy_no_idle(self):
  261. """
  262. Tests that a user setting their presence to busy but idling doesn't turn their
  263. presence state into unavailable.
  264. """
  265. user_id = "@foo:bar"
  266. now = 5000000
  267. state = UserPresenceState.default(user_id)
  268. state = state.copy_and_replace(
  269. state=PresenceState.BUSY,
  270. last_active_ts=now - IDLE_TIMER - 1,
  271. last_user_sync_ts=now,
  272. )
  273. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  274. self.assertIsNotNone(new_state)
  275. self.assertEquals(new_state.state, PresenceState.BUSY)
  276. def test_sync_timeout(self):
  277. user_id = "@foo:bar"
  278. now = 5000000
  279. state = UserPresenceState.default(user_id)
  280. state = state.copy_and_replace(
  281. state=PresenceState.ONLINE,
  282. last_active_ts=0,
  283. last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  284. )
  285. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  286. self.assertIsNotNone(new_state)
  287. self.assertEquals(new_state.state, PresenceState.OFFLINE)
  288. def test_sync_online(self):
  289. user_id = "@foo:bar"
  290. now = 5000000
  291. state = UserPresenceState.default(user_id)
  292. state = state.copy_and_replace(
  293. state=PresenceState.ONLINE,
  294. last_active_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  295. last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
  296. )
  297. new_state = handle_timeout(
  298. state, is_mine=True, syncing_user_ids={user_id}, now=now
  299. )
  300. self.assertIsNotNone(new_state)
  301. self.assertEquals(new_state.state, PresenceState.ONLINE)
  302. def test_federation_ping(self):
  303. user_id = "@foo:bar"
  304. now = 5000000
  305. state = UserPresenceState.default(user_id)
  306. state = state.copy_and_replace(
  307. state=PresenceState.ONLINE,
  308. last_active_ts=now,
  309. last_user_sync_ts=now,
  310. last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
  311. )
  312. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  313. self.assertIsNotNone(new_state)
  314. self.assertEquals(new_state, new_state)
  315. def test_no_timeout(self):
  316. user_id = "@foo:bar"
  317. now = 5000000
  318. state = UserPresenceState.default(user_id)
  319. state = state.copy_and_replace(
  320. state=PresenceState.ONLINE,
  321. last_active_ts=now,
  322. last_user_sync_ts=now,
  323. last_federation_update_ts=now,
  324. )
  325. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  326. self.assertIsNone(new_state)
  327. def test_federation_timeout(self):
  328. user_id = "@foo:bar"
  329. now = 5000000
  330. state = UserPresenceState.default(user_id)
  331. state = state.copy_and_replace(
  332. state=PresenceState.ONLINE,
  333. last_active_ts=now,
  334. last_user_sync_ts=now,
  335. last_federation_update_ts=now - FEDERATION_TIMEOUT - 1,
  336. )
  337. new_state = handle_timeout(
  338. state, is_mine=False, syncing_user_ids=set(), now=now
  339. )
  340. self.assertIsNotNone(new_state)
  341. self.assertEquals(new_state.state, PresenceState.OFFLINE)
  342. def test_last_active(self):
  343. user_id = "@foo:bar"
  344. now = 5000000
  345. state = UserPresenceState.default(user_id)
  346. state = state.copy_and_replace(
  347. state=PresenceState.ONLINE,
  348. last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
  349. last_user_sync_ts=now,
  350. last_federation_update_ts=now,
  351. )
  352. new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now)
  353. self.assertIsNotNone(new_state)
  354. self.assertEquals(state, new_state)
  355. class PresenceHandlerTestCase(unittest.HomeserverTestCase):
  356. def prepare(self, reactor, clock, hs):
  357. self.presence_handler = hs.get_presence_handler()
  358. self.clock = hs.get_clock()
  359. def test_external_process_timeout(self):
  360. """Test that if an external process doesn't update the records for a while
  361. we time out their syncing users presence.
  362. """
  363. process_id = 1
  364. user_id = "@test:server"
  365. # Notify handler that a user is now syncing.
  366. self.get_success(
  367. self.presence_handler.update_external_syncs_row(
  368. process_id, user_id, True, self.clock.time_msec()
  369. )
  370. )
  371. # Check that if we wait a while without telling the handler the user has
  372. # stopped syncing that their presence state doesn't get timed out.
  373. self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)
  374. state = self.get_success(
  375. self.presence_handler.get_state(UserID.from_string(user_id))
  376. )
  377. self.assertEqual(state.state, PresenceState.ONLINE)
  378. # Check that if the external process timeout fires, then the syncing
  379. # user gets timed out
  380. self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)
  381. state = self.get_success(
  382. self.presence_handler.get_state(UserID.from_string(user_id))
  383. )
  384. self.assertEqual(state.state, PresenceState.OFFLINE)
  385. class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
  386. def prepare(self, reactor, clock, hs):
  387. self.presence_handler = hs.get_presence_handler()
  388. self.clock = hs.get_clock()
  389. self.instance_name = hs.get_instance_name()
  390. self.queue = self.presence_handler.get_federation_queue()
  391. def test_send_and_get(self):
  392. state1 = UserPresenceState.default("@user1:test")
  393. state2 = UserPresenceState.default("@user2:test")
  394. state3 = UserPresenceState.default("@user3:test")
  395. prev_token = self.queue.get_current_token(self.instance_name)
  396. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  397. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  398. now_token = self.queue.get_current_token(self.instance_name)
  399. rows, upto_token, limited = self.get_success(
  400. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  401. )
  402. self.assertEqual(upto_token, now_token)
  403. self.assertFalse(limited)
  404. expected_rows = [
  405. (1, ("dest1", "@user1:test")),
  406. (1, ("dest2", "@user1:test")),
  407. (1, ("dest1", "@user2:test")),
  408. (1, ("dest2", "@user2:test")),
  409. (2, ("dest3", "@user3:test")),
  410. ]
  411. self.assertCountEqual(rows, expected_rows)
  412. now_token = self.queue.get_current_token(self.instance_name)
  413. rows, upto_token, limited = self.get_success(
  414. self.queue.get_replication_rows("master", upto_token, now_token, 10)
  415. )
  416. self.assertEqual(upto_token, now_token)
  417. self.assertFalse(limited)
  418. self.assertCountEqual(rows, [])
  419. def test_send_and_get_split(self):
  420. state1 = UserPresenceState.default("@user1:test")
  421. state2 = UserPresenceState.default("@user2:test")
  422. state3 = UserPresenceState.default("@user3:test")
  423. prev_token = self.queue.get_current_token(self.instance_name)
  424. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  425. now_token = self.queue.get_current_token(self.instance_name)
  426. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  427. rows, upto_token, limited = self.get_success(
  428. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  429. )
  430. self.assertEqual(upto_token, now_token)
  431. self.assertFalse(limited)
  432. expected_rows = [
  433. (1, ("dest1", "@user1:test")),
  434. (1, ("dest2", "@user1:test")),
  435. (1, ("dest1", "@user2:test")),
  436. (1, ("dest2", "@user2:test")),
  437. ]
  438. self.assertCountEqual(rows, expected_rows)
  439. now_token = self.queue.get_current_token(self.instance_name)
  440. rows, upto_token, limited = self.get_success(
  441. self.queue.get_replication_rows("master", upto_token, now_token, 10)
  442. )
  443. self.assertEqual(upto_token, now_token)
  444. self.assertFalse(limited)
  445. expected_rows = [
  446. (2, ("dest3", "@user3:test")),
  447. ]
  448. self.assertCountEqual(rows, expected_rows)
  449. def test_clear_queue_all(self):
  450. state1 = UserPresenceState.default("@user1:test")
  451. state2 = UserPresenceState.default("@user2:test")
  452. state3 = UserPresenceState.default("@user3:test")
  453. prev_token = self.queue.get_current_token(self.instance_name)
  454. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  455. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  456. self.reactor.advance(10 * 60 * 1000)
  457. now_token = self.queue.get_current_token(self.instance_name)
  458. rows, upto_token, limited = self.get_success(
  459. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  460. )
  461. self.assertEqual(upto_token, now_token)
  462. self.assertFalse(limited)
  463. self.assertCountEqual(rows, [])
  464. prev_token = self.queue.get_current_token(self.instance_name)
  465. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  466. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  467. now_token = self.queue.get_current_token(self.instance_name)
  468. rows, upto_token, limited = self.get_success(
  469. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  470. )
  471. self.assertEqual(upto_token, now_token)
  472. self.assertFalse(limited)
  473. expected_rows = [
  474. (3, ("dest1", "@user1:test")),
  475. (3, ("dest2", "@user1:test")),
  476. (3, ("dest1", "@user2:test")),
  477. (3, ("dest2", "@user2:test")),
  478. (4, ("dest3", "@user3:test")),
  479. ]
  480. self.assertCountEqual(rows, expected_rows)
  481. def test_partially_clear_queue(self):
  482. state1 = UserPresenceState.default("@user1:test")
  483. state2 = UserPresenceState.default("@user2:test")
  484. state3 = UserPresenceState.default("@user3:test")
  485. prev_token = self.queue.get_current_token(self.instance_name)
  486. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  487. self.reactor.advance(2 * 60 * 1000)
  488. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  489. self.reactor.advance(4 * 60 * 1000)
  490. now_token = self.queue.get_current_token(self.instance_name)
  491. rows, upto_token, limited = self.get_success(
  492. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  493. )
  494. self.assertEqual(upto_token, now_token)
  495. self.assertFalse(limited)
  496. expected_rows = [
  497. (2, ("dest3", "@user3:test")),
  498. ]
  499. self.assertCountEqual(rows, [])
  500. prev_token = self.queue.get_current_token(self.instance_name)
  501. self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2"))
  502. self.queue.send_presence_to_destinations((state3,), ("dest3",))
  503. now_token = self.queue.get_current_token(self.instance_name)
  504. rows, upto_token, limited = self.get_success(
  505. self.queue.get_replication_rows("master", prev_token, now_token, 10)
  506. )
  507. self.assertEqual(upto_token, now_token)
  508. self.assertFalse(limited)
  509. expected_rows = [
  510. (3, ("dest1", "@user1:test")),
  511. (3, ("dest2", "@user1:test")),
  512. (3, ("dest1", "@user2:test")),
  513. (3, ("dest2", "@user2:test")),
  514. (4, ("dest3", "@user3:test")),
  515. ]
  516. self.assertCountEqual(rows, expected_rows)
  517. class PresenceJoinTestCase(unittest.HomeserverTestCase):
  518. """Tests remote servers get told about presence of users in the room when
  519. they join and when new local users join.
  520. """
  521. user_id = "@test:server"
  522. servlets = [room.register_servlets]
  523. def make_homeserver(self, reactor, clock):
  524. hs = self.setup_test_homeserver(
  525. "server",
  526. federation_http_client=None,
  527. federation_sender=Mock(spec=FederationSender),
  528. )
  529. return hs
  530. def default_config(self):
  531. config = super().default_config()
  532. config["send_federation"] = True
  533. return config
  534. def prepare(self, reactor, clock, hs):
  535. self.federation_sender = hs.get_federation_sender()
  536. self.event_builder_factory = hs.get_event_builder_factory()
  537. self.federation_handler = hs.get_federation_handler()
  538. self.presence_handler = hs.get_presence_handler()
  539. # self.event_builder_for_2 = EventBuilderFactory(hs)
  540. # self.event_builder_for_2.hostname = "test2"
  541. self.store = hs.get_datastore()
  542. self.state = hs.get_state_handler()
  543. self.auth = hs.get_auth()
  544. # We don't actually check signatures in tests, so lets just create a
  545. # random key to use.
  546. self.random_signing_key = generate_signing_key("ver")
  547. def test_remote_joins(self):
  548. # We advance time to something that isn't 0, as we use 0 as a special
  549. # value.
  550. self.reactor.advance(1000000000000)
  551. # Create a room with two local users
  552. room_id = self.helper.create_room_as(self.user_id)
  553. self.helper.join(room_id, "@test2:server")
  554. # Mark test2 as online, test will be offline with a last_active of 0
  555. self.get_success(
  556. self.presence_handler.set_state(
  557. UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}
  558. )
  559. )
  560. self.reactor.pump([0]) # Wait for presence updates to be handled
  561. #
  562. # Test that a new server gets told about existing presence
  563. #
  564. self.federation_sender.reset_mock()
  565. # Add a new remote server to the room
  566. self._add_new_user(room_id, "@alice:server2")
  567. # When new server is joined we send it the local users presence states.
  568. # We expect to only see user @test2:server, as @test:server is offline
  569. # and has a zero last_active_ts
  570. expected_state = self.get_success(
  571. self.presence_handler.current_state_for_user("@test2:server")
  572. )
  573. self.assertEqual(expected_state.state, PresenceState.ONLINE)
  574. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  575. destinations={"server2"}, states=[expected_state]
  576. )
  577. #
  578. # Test that only the new server gets sent presence and not existing servers
  579. #
  580. self.federation_sender.reset_mock()
  581. self._add_new_user(room_id, "@bob:server3")
  582. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  583. destinations={"server3"}, states=[expected_state]
  584. )
  585. def test_remote_gets_presence_when_local_user_joins(self):
  586. # We advance time to something that isn't 0, as we use 0 as a special
  587. # value.
  588. self.reactor.advance(1000000000000)
  589. # Create a room with one local users
  590. room_id = self.helper.create_room_as(self.user_id)
  591. # Mark test as online
  592. self.get_success(
  593. self.presence_handler.set_state(
  594. UserID.from_string("@test:server"), {"presence": PresenceState.ONLINE}
  595. )
  596. )
  597. # Mark test2 as online, test will be offline with a last_active of 0.
  598. # Note we don't join them to the room yet
  599. self.get_success(
  600. self.presence_handler.set_state(
  601. UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE}
  602. )
  603. )
  604. # Add servers to the room
  605. self._add_new_user(room_id, "@alice:server2")
  606. self._add_new_user(room_id, "@bob:server3")
  607. self.reactor.pump([0]) # Wait for presence updates to be handled
  608. #
  609. # Test that when a local join happens remote servers get told about it
  610. #
  611. self.federation_sender.reset_mock()
  612. # Join local user to room
  613. self.helper.join(room_id, "@test2:server")
  614. self.reactor.pump([0]) # Wait for presence updates to be handled
  615. # We expect to only send test2 presence to server2 and server3
  616. expected_state = self.get_success(
  617. self.presence_handler.current_state_for_user("@test2:server")
  618. )
  619. self.assertEqual(expected_state.state, PresenceState.ONLINE)
  620. self.federation_sender.send_presence_to_destinations.assert_called_once_with(
  621. destinations={"server2", "server3"}, states=[expected_state]
  622. )
  623. def _add_new_user(self, room_id, user_id):
  624. """Add new user to the room by creating an event and poking the federation API."""
  625. hostname = get_domain_from_id(user_id)
  626. room_version = self.get_success(self.store.get_room_version_id(room_id))
  627. builder = EventBuilder(
  628. state=self.state,
  629. auth=self.auth,
  630. store=self.store,
  631. clock=self.clock,
  632. hostname=hostname,
  633. signing_key=self.random_signing_key,
  634. room_version=KNOWN_ROOM_VERSIONS[room_version],
  635. room_id=room_id,
  636. type=EventTypes.Member,
  637. sender=user_id,
  638. state_key=user_id,
  639. content={"membership": Membership.JOIN},
  640. )
  641. prev_event_ids = self.get_success(
  642. self.store.get_latest_event_ids_in_room(room_id)
  643. )
  644. event = self.get_success(builder.build(prev_event_ids, None))
  645. self.get_success(self.federation_handler.on_receive_pdu(hostname, event))
  646. # Check that it was successfully persisted.
  647. self.get_success(self.store.get_event(event.event_id))
  648. self.get_success(self.store.get_event(event.event_id))