12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868 |
- # Copyright 2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- from typing import Optional, cast
- from unittest.mock import Mock, call
- from parameterized import parameterized
- from signedjson.key import generate_signing_key
- from twisted.test.proto_helpers import MemoryReactor
- from synapse.api.constants import EventTypes, Membership, PresenceState
- from synapse.api.presence import UserDevicePresenceState, UserPresenceState
- from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
- from synapse.events.builder import EventBuilder
- from synapse.federation.sender import FederationSender
- from synapse.handlers.presence import (
- BUSY_ONLINE_TIMEOUT,
- EXTERNAL_PROCESS_EXPIRY,
- FEDERATION_PING_INTERVAL,
- FEDERATION_TIMEOUT,
- IDLE_TIMER,
- LAST_ACTIVE_GRANULARITY,
- SYNC_ONLINE_TIMEOUT,
- handle_timeout,
- handle_update,
- )
- from synapse.rest import admin
- from synapse.rest.client import room
- from synapse.server import HomeServer
- from synapse.storage.database import LoggingDatabaseConnection
- from synapse.types import JsonDict, UserID, get_domain_from_id
- from synapse.util import Clock
- from tests import unittest
- from tests.replication._base import BaseMultiWorkerStreamTestCase
- class PresenceUpdateTestCase(unittest.HomeserverTestCase):
- servlets = [admin.register_servlets]
- def prepare(
- self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer
- ) -> None:
- self.store = homeserver.get_datastores().main
- def test_offline_to_online(self) -> None:
- wheel_timer = Mock()
- user_id = "@foo:bar"
- now = 5000000
- prev_state = UserPresenceState.default(user_id)
- new_state = prev_state.copy_and_replace(
- state=PresenceState.ONLINE, last_active_ts=now
- )
- state, persist_and_notify, federation_ping = handle_update(
- prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
- )
- self.assertTrue(persist_and_notify)
- self.assertTrue(state.currently_active)
- self.assertEqual(new_state.state, state.state)
- self.assertEqual(new_state.status_msg, state.status_msg)
- self.assertEqual(state.last_federation_update_ts, now)
- self.assertEqual(wheel_timer.insert.call_count, 3)
- wheel_timer.insert.assert_has_calls(
- [
- call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
- call(
- now=now,
- obj=user_id,
- then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
- ),
- call(
- now=now,
- obj=user_id,
- then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
- ),
- ],
- any_order=True,
- )
- def test_online_to_online(self) -> None:
- wheel_timer = Mock()
- user_id = "@foo:bar"
- now = 5000000
- prev_state = UserPresenceState.default(user_id)
- prev_state = prev_state.copy_and_replace(
- state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
- )
- new_state = prev_state.copy_and_replace(
- state=PresenceState.ONLINE, last_active_ts=now
- )
- state, persist_and_notify, federation_ping = handle_update(
- prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
- )
- self.assertFalse(persist_and_notify)
- self.assertTrue(federation_ping)
- self.assertTrue(state.currently_active)
- self.assertEqual(new_state.state, state.state)
- self.assertEqual(new_state.status_msg, state.status_msg)
- self.assertEqual(state.last_federation_update_ts, now)
- self.assertEqual(wheel_timer.insert.call_count, 3)
- wheel_timer.insert.assert_has_calls(
- [
- call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
- call(
- now=now,
- obj=user_id,
- then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
- ),
- call(
- now=now,
- obj=user_id,
- then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
- ),
- ],
- any_order=True,
- )
- def test_online_to_online_last_active_noop(self) -> None:
- wheel_timer = Mock()
- user_id = "@foo:bar"
- now = 5000000
- prev_state = UserPresenceState.default(user_id)
- prev_state = prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=now - LAST_ACTIVE_GRANULARITY - 10,
- currently_active=True,
- )
- new_state = prev_state.copy_and_replace(
- state=PresenceState.ONLINE, last_active_ts=now
- )
- state, persist_and_notify, federation_ping = handle_update(
- prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
- )
- self.assertFalse(persist_and_notify)
- self.assertTrue(federation_ping)
- self.assertTrue(state.currently_active)
- self.assertEqual(new_state.state, state.state)
- self.assertEqual(new_state.status_msg, state.status_msg)
- self.assertEqual(state.last_federation_update_ts, now)
- self.assertEqual(wheel_timer.insert.call_count, 3)
- wheel_timer.insert.assert_has_calls(
- [
- call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
- call(
- now=now,
- obj=user_id,
- then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
- ),
- call(
- now=now,
- obj=user_id,
- then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
- ),
- ],
- any_order=True,
- )
- def test_online_to_online_last_active(self) -> None:
- wheel_timer = Mock()
- user_id = "@foo:bar"
- now = 5000000
- prev_state = UserPresenceState.default(user_id)
- prev_state = prev_state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
- currently_active=True,
- )
- new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
- state, persist_and_notify, federation_ping = handle_update(
- prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
- )
- self.assertTrue(persist_and_notify)
- self.assertFalse(state.currently_active)
- self.assertEqual(new_state.state, state.state)
- self.assertEqual(new_state.status_msg, state.status_msg)
- self.assertEqual(state.last_federation_update_ts, now)
- self.assertEqual(wheel_timer.insert.call_count, 2)
- wheel_timer.insert.assert_has_calls(
- [
- call(now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER),
- call(
- now=now,
- obj=user_id,
- then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
- ),
- ],
- any_order=True,
- )
- def test_remote_ping_timer(self) -> None:
- wheel_timer = Mock()
- user_id = "@foo:bar"
- now = 5000000
- prev_state = UserPresenceState.default(user_id)
- prev_state = prev_state.copy_and_replace(
- state=PresenceState.ONLINE, last_active_ts=now
- )
- new_state = prev_state.copy_and_replace(state=PresenceState.ONLINE)
- state, persist_and_notify, federation_ping = handle_update(
- prev_state, new_state, is_mine=False, wheel_timer=wheel_timer, now=now
- )
- self.assertFalse(persist_and_notify)
- self.assertFalse(federation_ping)
- self.assertFalse(state.currently_active)
- self.assertEqual(new_state.state, state.state)
- self.assertEqual(new_state.status_msg, state.status_msg)
- self.assertEqual(wheel_timer.insert.call_count, 1)
- wheel_timer.insert.assert_has_calls(
- [
- call(
- now=now,
- obj=user_id,
- then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
- )
- ],
- any_order=True,
- )
- def test_online_to_offline(self) -> None:
- wheel_timer = Mock()
- user_id = "@foo:bar"
- now = 5000000
- prev_state = UserPresenceState.default(user_id)
- prev_state = prev_state.copy_and_replace(
- state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
- )
- new_state = prev_state.copy_and_replace(state=PresenceState.OFFLINE)
- state, persist_and_notify, federation_ping = handle_update(
- prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
- )
- self.assertTrue(persist_and_notify)
- self.assertEqual(new_state.state, state.state)
- self.assertEqual(state.last_federation_update_ts, now)
- self.assertEqual(wheel_timer.insert.call_count, 0)
- def test_online_to_idle(self) -> None:
- wheel_timer = Mock()
- user_id = "@foo:bar"
- now = 5000000
- prev_state = UserPresenceState.default(user_id)
- prev_state = prev_state.copy_and_replace(
- state=PresenceState.ONLINE, last_active_ts=now, currently_active=True
- )
- new_state = prev_state.copy_and_replace(state=PresenceState.UNAVAILABLE)
- state, persist_and_notify, federation_ping = handle_update(
- prev_state, new_state, is_mine=True, wheel_timer=wheel_timer, now=now
- )
- self.assertTrue(persist_and_notify)
- self.assertEqual(new_state.state, state.state)
- self.assertEqual(state.last_federation_update_ts, now)
- self.assertEqual(new_state.state, state.state)
- self.assertEqual(new_state.status_msg, state.status_msg)
- self.assertEqual(wheel_timer.insert.call_count, 1)
- wheel_timer.insert.assert_has_calls(
- [
- call(
- now=now,
- obj=user_id,
- then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
- )
- ],
- any_order=True,
- )
- def test_persisting_presence_updates(self) -> None:
- """Tests that the latest presence state for each user is persisted correctly"""
- # Create some test users and presence states for them
- presence_states = []
- for i in range(5):
- user_id = self.register_user(f"user_{i}", "password")
- presence_state = UserPresenceState(
- user_id=user_id,
- state="online",
- last_active_ts=1,
- last_federation_update_ts=1,
- last_user_sync_ts=1,
- status_msg="I'm online!",
- currently_active=True,
- )
- presence_states.append(presence_state)
- # Persist these presence updates to the database
- self.get_success(self.store.update_presence(presence_states))
- # Check that each update is present in the database
- db_presence_states_raw = self.get_success(
- self.store.get_all_presence_updates(
- instance_name="master",
- last_id=0,
- current_id=len(presence_states) + 1,
- limit=len(presence_states),
- )
- )
- # Extract presence update user ID and state information into lists of tuples
- db_presence_states = [(ps[0], ps[1]) for _, ps in db_presence_states_raw[0]]
- presence_states_compare = [(ps.user_id, ps.state) for ps in presence_states]
- # Compare what we put into the storage with what we got out.
- # They should be identical.
- self.assertEqual(presence_states_compare, db_presence_states)
- class PresenceTimeoutTestCase(unittest.TestCase):
- """Tests different timers and that the timer does not change `status_msg` of user."""
- def test_idle_timer(self) -> None:
- user_id = "@foo:bar"
- device_id = "dev-1"
- status_msg = "I'm here!"
- now = 5000000
- state = UserPresenceState.default(user_id)
- state = state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=now - IDLE_TIMER - 1,
- last_user_sync_ts=now,
- status_msg=status_msg,
- )
- device_state = UserDevicePresenceState(
- user_id=user_id,
- device_id=device_id,
- state=state.state,
- last_active_ts=state.last_active_ts,
- last_sync_ts=state.last_user_sync_ts,
- )
- new_state = handle_timeout(
- state,
- is_mine=True,
- syncing_device_ids=set(),
- user_devices={device_id: device_state},
- now=now,
- )
- self.assertIsNotNone(new_state)
- assert new_state is not None
- self.assertEqual(new_state.state, PresenceState.UNAVAILABLE)
- self.assertEqual(new_state.status_msg, status_msg)
- def test_busy_no_idle(self) -> None:
- """
- Tests that a user setting their presence to busy but idling doesn't turn their
- presence state into unavailable.
- """
- user_id = "@foo:bar"
- device_id = "dev-1"
- status_msg = "I'm here!"
- now = 5000000
- state = UserPresenceState.default(user_id)
- state = state.copy_and_replace(
- state=PresenceState.BUSY,
- last_active_ts=now - IDLE_TIMER - 1,
- last_user_sync_ts=now,
- status_msg=status_msg,
- )
- device_state = UserDevicePresenceState(
- user_id=user_id,
- device_id=device_id,
- state=state.state,
- last_active_ts=state.last_active_ts,
- last_sync_ts=state.last_user_sync_ts,
- )
- new_state = handle_timeout(
- state,
- is_mine=True,
- syncing_device_ids=set(),
- user_devices={device_id: device_state},
- now=now,
- )
- self.assertIsNotNone(new_state)
- assert new_state is not None
- self.assertEqual(new_state.state, PresenceState.BUSY)
- self.assertEqual(new_state.status_msg, status_msg)
- def test_sync_timeout(self) -> None:
- user_id = "@foo:bar"
- device_id = "dev-1"
- status_msg = "I'm here!"
- now = 5000000
- state = UserPresenceState.default(user_id)
- state = state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=0,
- last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
- status_msg=status_msg,
- )
- device_state = UserDevicePresenceState(
- user_id=user_id,
- device_id=device_id,
- state=state.state,
- last_active_ts=state.last_active_ts,
- last_sync_ts=state.last_user_sync_ts,
- )
- new_state = handle_timeout(
- state,
- is_mine=True,
- syncing_device_ids=set(),
- user_devices={device_id: device_state},
- now=now,
- )
- self.assertIsNotNone(new_state)
- assert new_state is not None
- self.assertEqual(new_state.state, PresenceState.OFFLINE)
- self.assertEqual(new_state.status_msg, status_msg)
- def test_sync_online(self) -> None:
- user_id = "@foo:bar"
- device_id = "dev-1"
- status_msg = "I'm here!"
- now = 5000000
- state = UserPresenceState.default(user_id)
- state = state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=now - SYNC_ONLINE_TIMEOUT - 1,
- last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1,
- status_msg=status_msg,
- )
- device_state = UserDevicePresenceState(
- user_id=user_id,
- device_id=device_id,
- state=state.state,
- last_active_ts=state.last_active_ts,
- last_sync_ts=state.last_user_sync_ts,
- )
- new_state = handle_timeout(
- state,
- is_mine=True,
- syncing_device_ids={(user_id, device_id)},
- user_devices={device_id: device_state},
- now=now,
- )
- self.assertIsNotNone(new_state)
- assert new_state is not None
- self.assertEqual(new_state.state, PresenceState.ONLINE)
- self.assertEqual(new_state.status_msg, status_msg)
- def test_federation_ping(self) -> None:
- user_id = "@foo:bar"
- device_id = "dev-1"
- status_msg = "I'm here!"
- now = 5000000
- state = UserPresenceState.default(user_id)
- state = state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=now,
- last_user_sync_ts=now,
- last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1,
- status_msg=status_msg,
- )
- device_state = UserDevicePresenceState(
- user_id=user_id,
- device_id=device_id,
- state=state.state,
- last_active_ts=state.last_active_ts,
- last_sync_ts=state.last_user_sync_ts,
- )
- new_state = handle_timeout(
- state,
- is_mine=True,
- syncing_device_ids=set(),
- user_devices={device_id: device_state},
- now=now,
- )
- self.assertIsNotNone(new_state)
- self.assertEqual(state, new_state)
- def test_no_timeout(self) -> None:
- user_id = "@foo:bar"
- device_id = "dev-1"
- now = 5000000
- state = UserPresenceState.default(user_id)
- state = state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=now,
- last_user_sync_ts=now,
- last_federation_update_ts=now,
- )
- device_state = UserDevicePresenceState(
- user_id=user_id,
- device_id=device_id,
- state=state.state,
- last_active_ts=state.last_active_ts,
- last_sync_ts=state.last_user_sync_ts,
- )
- new_state = handle_timeout(
- state,
- is_mine=True,
- syncing_device_ids=set(),
- user_devices={device_id: device_state},
- now=now,
- )
- self.assertIsNone(new_state)
- def test_federation_timeout(self) -> None:
- user_id = "@foo:bar"
- status_msg = "I'm here!"
- now = 5000000
- state = UserPresenceState.default(user_id)
- state = state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=now,
- last_user_sync_ts=now,
- last_federation_update_ts=now - FEDERATION_TIMEOUT - 1,
- status_msg=status_msg,
- )
- # Note that this is a remote user so we do not have their device information.
- new_state = handle_timeout(
- state, is_mine=False, syncing_device_ids=set(), user_devices={}, now=now
- )
- self.assertIsNotNone(new_state)
- assert new_state is not None
- self.assertEqual(new_state.state, PresenceState.OFFLINE)
- self.assertEqual(new_state.status_msg, status_msg)
- def test_last_active(self) -> None:
- user_id = "@foo:bar"
- device_id = "dev-1"
- status_msg = "I'm here!"
- now = 5000000
- state = UserPresenceState.default(user_id)
- state = state.copy_and_replace(
- state=PresenceState.ONLINE,
- last_active_ts=now - LAST_ACTIVE_GRANULARITY - 1,
- last_user_sync_ts=now,
- last_federation_update_ts=now,
- status_msg=status_msg,
- )
- device_state = UserDevicePresenceState(
- user_id=user_id,
- device_id=device_id,
- state=state.state,
- last_active_ts=state.last_active_ts,
- last_sync_ts=state.last_user_sync_ts,
- )
- new_state = handle_timeout(
- state,
- is_mine=True,
- syncing_device_ids=set(),
- user_devices={device_id: device_state},
- now=now,
- )
- self.assertIsNotNone(new_state)
- self.assertEqual(state, new_state)
- class PresenceHandlerInitTestCase(unittest.HomeserverTestCase):
- def default_config(self) -> JsonDict:
- config = super().default_config()
- # Disable background tasks on this worker so that the PresenceHandler isn't
- # loaded until we request it.
- config["run_background_tasks_on"] = "other"
- return config
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.user_id = f"@test:{self.hs.config.server.server_name}"
- self.device_id = "dev-1"
- # Move the reactor to the initial time.
- self.reactor.advance(1000)
- now = self.clock.time_msec()
- main_store = hs.get_datastores().main
- self.get_success(
- main_store.update_presence(
- [
- UserPresenceState(
- user_id=self.user_id,
- state=PresenceState.ONLINE,
- last_active_ts=now,
- last_federation_update_ts=now,
- last_user_sync_ts=now,
- status_msg=None,
- currently_active=True,
- )
- ]
- )
- )
- # Regenerate the preloaded presence information on PresenceStore.
- def refill_presence(db_conn: LoggingDatabaseConnection) -> None:
- main_store._presence_on_startup = main_store._get_active_presence(db_conn)
- self.get_success(main_store.db_pool.runWithConnection(refill_presence))
- def test_restored_presence_idles(self) -> None:
- """The presence state restored from the database should not persist forever."""
- # Get the handler (which kicks off a bunch of timers).
- presence_handler = self.hs.get_presence_handler()
- # Assert the user is online.
- state = self.get_success(
- presence_handler.get_state(UserID.from_string(self.user_id))
- )
- self.assertEqual(state.state, PresenceState.ONLINE)
- # Advance such that the user should timeout.
- self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
- self.reactor.pump([5])
- # Check that the user is now offline.
- state = self.get_success(
- presence_handler.get_state(UserID.from_string(self.user_id))
- )
- self.assertEqual(state.state, PresenceState.OFFLINE)
- @parameterized.expand(
- [
- (PresenceState.BUSY, PresenceState.BUSY),
- (PresenceState.ONLINE, PresenceState.ONLINE),
- (PresenceState.UNAVAILABLE, PresenceState.ONLINE),
- # Offline syncs don't update the state.
- (PresenceState.OFFLINE, PresenceState.ONLINE),
- ]
- )
- @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
- def test_restored_presence_online_after_sync(
- self, sync_state: str, expected_state: str
- ) -> None:
- """
- The presence state restored from the database should be overridden with sync after a timeout.
- Args:
- sync_state: The presence state of the new sync.
- expected_state: The expected presence right after the sync.
- """
- # Get the handler (which kicks off a bunch of timers).
- presence_handler = self.hs.get_presence_handler()
- # Assert the user is online, as restored.
- state = self.get_success(
- presence_handler.get_state(UserID.from_string(self.user_id))
- )
- self.assertEqual(state.state, PresenceState.ONLINE)
- # Advance slightly and sync.
- self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
- self.get_success(
- presence_handler.user_syncing(
- self.user_id,
- self.device_id,
- sync_state != PresenceState.OFFLINE,
- sync_state,
- )
- )
- # Assert the user is in the expected state.
- state = self.get_success(
- presence_handler.get_state(UserID.from_string(self.user_id))
- )
- self.assertEqual(state.state, expected_state)
- # Advance such that the user's preloaded data times out, but not the new sync.
- self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2)
- self.reactor.pump([5])
- # Check that the user is in the sync state (as the client is currently syncing still).
- state = self.get_success(
- presence_handler.get_state(UserID.from_string(self.user_id))
- )
- self.assertEqual(state.state, sync_state)
- class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase):
- user_id = "@test:server"
- user_id_obj = UserID.from_string(user_id)
- device_id = "dev-1"
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.presence_handler = hs.get_presence_handler()
- self.clock = hs.get_clock()
- def test_external_process_timeout(self) -> None:
- """Test that if an external process doesn't update the records for a while
- we time out their syncing users presence.
- """
- # Create a worker and use it to handle /sync traffic instead.
- # This is used to test that presence changes get replicated from workers
- # to the main process correctly.
- worker_to_sync_against = self.make_worker_hs(
- "synapse.app.generic_worker", {"worker_name": "synchrotron"}
- )
- worker_presence_handler = worker_to_sync_against.get_presence_handler()
- self.get_success(
- worker_presence_handler.user_syncing(
- self.user_id, self.device_id, True, PresenceState.ONLINE
- ),
- by=0.1,
- )
- # Check that if we wait a while without telling the handler the user has
- # stopped syncing that their presence state doesn't get timed out.
- self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2)
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- self.assertEqual(state.state, PresenceState.ONLINE)
- # Check that if the external process timeout fires, then the syncing
- # user gets timed out
- self.reactor.advance(EXTERNAL_PROCESS_EXPIRY)
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- self.assertEqual(state.state, PresenceState.OFFLINE)
- def test_user_goes_offline_by_timeout_status_msg_remain(self) -> None:
- """Test that if a user doesn't update the records for a while
- users presence goes `OFFLINE` because of timeout and `status_msg` remains.
- """
- status_msg = "I'm here!"
- # Mark user as online
- self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
- # Check that if we wait a while without telling the handler the user has
- # stopped syncing that their presence state doesn't get timed out.
- self.reactor.advance(SYNC_ONLINE_TIMEOUT / 2)
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- self.assertEqual(state.state, PresenceState.ONLINE)
- self.assertEqual(state.status_msg, status_msg)
- # Check that if the timeout fires, then the syncing user gets timed out
- self.reactor.advance(SYNC_ONLINE_TIMEOUT)
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- # status_msg should remain even after going offline
- self.assertEqual(state.state, PresenceState.OFFLINE)
- self.assertEqual(state.status_msg, status_msg)
- def test_user_goes_offline_manually_with_no_status_msg(self) -> None:
- """Test that if a user change presence manually to `OFFLINE`
- and no status is set, that `status_msg` is `None`.
- """
- status_msg = "I'm here!"
- # Mark user as online
- self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
- # Mark user as offline
- self.get_success(
- self.presence_handler.set_state(
- self.user_id_obj, self.device_id, {"presence": PresenceState.OFFLINE}
- )
- )
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- self.assertEqual(state.state, PresenceState.OFFLINE)
- self.assertEqual(state.status_msg, None)
- def test_user_goes_offline_manually_with_status_msg(self) -> None:
- """Test that if a user change presence manually to `OFFLINE`
- and a status is set, that `status_msg` appears.
- """
- status_msg = "I'm here!"
- # Mark user as online
- self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
- # Mark user as offline
- self._set_presencestate_with_status_msg(PresenceState.OFFLINE, "And now here.")
- def test_user_reset_online_with_no_status(self) -> None:
- """Test that if a user set again the presence manually
- and no status is set, that `status_msg` is `None`.
- """
- status_msg = "I'm here!"
- # Mark user as online
- self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
- # Mark user as online again
- self.get_success(
- self.presence_handler.set_state(
- self.user_id_obj, self.device_id, {"presence": PresenceState.ONLINE}
- )
- )
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- # status_msg should remain even after going offline
- self.assertEqual(state.state, PresenceState.ONLINE)
- self.assertEqual(state.status_msg, None)
- def test_set_presence_with_status_msg_none(self) -> None:
- """Test that if a user set again the presence manually
- and status is `None`, that `status_msg` is `None`.
- """
- status_msg = "I'm here!"
- # Mark user as online
- self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
- # Mark user as online and `status_msg = None`
- self._set_presencestate_with_status_msg(PresenceState.ONLINE, None)
- def test_set_presence_from_syncing_not_set(self) -> None:
- """Test that presence is not set by syncing if affect_presence is false"""
- status_msg = "I'm here!"
- self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
- self.get_success(
- self.presence_handler.user_syncing(
- self.user_id, self.device_id, False, PresenceState.ONLINE
- )
- )
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- # we should still be unavailable
- self.assertEqual(state.state, PresenceState.UNAVAILABLE)
- # and status message should still be the same
- self.assertEqual(state.status_msg, status_msg)
- def test_set_presence_from_syncing_is_set(self) -> None:
- """Test that presence is set by syncing if affect_presence is true"""
- status_msg = "I'm here!"
- self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
- self.get_success(
- self.presence_handler.user_syncing(
- self.user_id, self.device_id, True, PresenceState.ONLINE
- )
- )
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- # we should now be online
- self.assertEqual(state.state, PresenceState.ONLINE)
- @parameterized.expand(
- # A list of tuples of 4 strings:
- #
- # * The presence state of device 1.
- # * The presence state of device 2.
- # * The expected user presence state after both devices have synced.
- # * The expected user presence state after device 1 has idled.
- # * The expected user presence state after device 2 has idled.
- # * True to use workers, False a monolith.
- [
- (*cases, workers)
- for workers in (False, True)
- for cases in [
- # If both devices have the same state, online should eventually idle.
- # Otherwise, the state doesn't change.
- (
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- PresenceState.UNAVAILABLE,
- ),
- (
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- ),
- (
- PresenceState.OFFLINE,
- PresenceState.OFFLINE,
- PresenceState.OFFLINE,
- PresenceState.OFFLINE,
- PresenceState.OFFLINE,
- ),
- # If the second device has a "lower" state it should fallback to it,
- # except for "busy" which overrides.
- (
- PresenceState.BUSY,
- PresenceState.ONLINE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.BUSY,
- PresenceState.UNAVAILABLE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.BUSY,
- PresenceState.OFFLINE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.ONLINE,
- PresenceState.UNAVAILABLE,
- PresenceState.ONLINE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- ),
- (
- PresenceState.ONLINE,
- PresenceState.OFFLINE,
- PresenceState.ONLINE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- ),
- (
- PresenceState.UNAVAILABLE,
- PresenceState.OFFLINE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- ),
- # If the second device has a "higher" state it should override.
- (
- PresenceState.ONLINE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.UNAVAILABLE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.OFFLINE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.UNAVAILABLE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- PresenceState.UNAVAILABLE,
- ),
- (
- PresenceState.OFFLINE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- PresenceState.UNAVAILABLE,
- ),
- (
- PresenceState.OFFLINE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- ),
- ]
- ],
- name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[5] else 'monolith'}",
- )
- @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
- def test_set_presence_from_syncing_multi_device(
- self,
- dev_1_state: str,
- dev_2_state: str,
- expected_state_1: str,
- expected_state_2: str,
- expected_state_3: str,
- test_with_workers: bool,
- ) -> None:
- """
- Test the behaviour of multiple devices syncing at the same time.
- Roughly the user's presence state should be set to the "highest" priority
- of all the devices. When a device then goes offline its state should be
- discarded and the next highest should win.
- Note that these tests use the idle timer (and don't close the syncs), it
- is unlikely that a *single* sync would last this long, but is close enough
- to continually syncing with that current state.
- """
- user_id = f"@test:{self.hs.config.server.server_name}"
- # By default, we call /sync against the main process.
- worker_presence_handler = self.presence_handler
- if test_with_workers:
- # Create a worker and use it to handle /sync traffic instead.
- # This is used to test that presence changes get replicated from workers
- # to the main process correctly.
- worker_to_sync_against = self.make_worker_hs(
- "synapse.app.generic_worker", {"worker_name": "synchrotron"}
- )
- worker_presence_handler = worker_to_sync_against.get_presence_handler()
- # 1. Sync with the first device.
- self.get_success(
- worker_presence_handler.user_syncing(
- user_id,
- "dev-1",
- affect_presence=dev_1_state != PresenceState.OFFLINE,
- presence_state=dev_1_state,
- ),
- by=0.01,
- )
- # 2. Wait half the idle timer.
- self.reactor.advance(IDLE_TIMER / 1000 / 2)
- self.reactor.pump([0.1])
- # 3. Sync with the second device.
- self.get_success(
- worker_presence_handler.user_syncing(
- user_id,
- "dev-2",
- affect_presence=dev_2_state != PresenceState.OFFLINE,
- presence_state=dev_2_state,
- ),
- by=0.01,
- )
- # 4. Assert the expected presence state.
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, expected_state_1)
- if test_with_workers:
- state = self.get_success(
- worker_presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, expected_state_1)
- # When testing with workers, make another random sync (with any *different*
- # user) to keep the process information from expiring.
- #
- # This is due to EXTERNAL_PROCESS_EXPIRY being equivalent to IDLE_TIMER.
- if test_with_workers:
- with self.get_success(
- worker_presence_handler.user_syncing(
- f"@other-user:{self.hs.config.server.server_name}",
- "dev-3",
- affect_presence=True,
- presence_state=PresenceState.ONLINE,
- ),
- by=0.01,
- ):
- pass
- # 5. Advance such that the first device should be discarded (the idle timer),
- # then pump so _handle_timeouts function to called.
- self.reactor.advance(IDLE_TIMER / 1000 / 2)
- self.reactor.pump([0.01])
- # 6. Assert the expected presence state.
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, expected_state_2)
- if test_with_workers:
- state = self.get_success(
- worker_presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, expected_state_2)
- # 7. Advance such that the second device should be discarded (half the idle timer),
- # then pump so _handle_timeouts function to called.
- self.reactor.advance(IDLE_TIMER / 1000 / 2)
- self.reactor.pump([0.1])
- # 8. The devices are still "syncing" (the sync context managers were never
- # closed), so might idle.
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, expected_state_3)
- if test_with_workers:
- state = self.get_success(
- worker_presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, expected_state_3)
- @parameterized.expand(
- # A list of tuples of 4 strings:
- #
- # * The presence state of device 1.
- # * The presence state of device 2.
- # * The expected user presence state after both devices have synced.
- # * The expected user presence state after device 1 has stopped syncing.
- # * True to use workers, False a monolith.
- [
- (*cases, workers)
- for workers in (False, True)
- for cases in [
- # If both devices have the same state, nothing exciting should happen.
- (
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- ),
- (
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- ),
- (
- PresenceState.OFFLINE,
- PresenceState.OFFLINE,
- PresenceState.OFFLINE,
- PresenceState.OFFLINE,
- ),
- # If the second device has a "lower" state it should fallback to it,
- # except for "busy" which overrides.
- (
- PresenceState.BUSY,
- PresenceState.ONLINE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.BUSY,
- PresenceState.UNAVAILABLE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.BUSY,
- PresenceState.OFFLINE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.ONLINE,
- PresenceState.UNAVAILABLE,
- PresenceState.ONLINE,
- PresenceState.UNAVAILABLE,
- ),
- (
- PresenceState.ONLINE,
- PresenceState.OFFLINE,
- PresenceState.ONLINE,
- PresenceState.OFFLINE,
- ),
- (
- PresenceState.UNAVAILABLE,
- PresenceState.OFFLINE,
- PresenceState.UNAVAILABLE,
- PresenceState.OFFLINE,
- ),
- # If the second device has a "higher" state it should override.
- (
- PresenceState.ONLINE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.UNAVAILABLE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.OFFLINE,
- PresenceState.BUSY,
- PresenceState.BUSY,
- PresenceState.BUSY,
- ),
- (
- PresenceState.UNAVAILABLE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- ),
- (
- PresenceState.OFFLINE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- PresenceState.ONLINE,
- ),
- (
- PresenceState.OFFLINE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- PresenceState.UNAVAILABLE,
- ),
- ]
- ],
- name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[4] else 'monolith'}",
- )
- @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
- def test_set_presence_from_non_syncing_multi_device(
- self,
- dev_1_state: str,
- dev_2_state: str,
- expected_state_1: str,
- expected_state_2: str,
- test_with_workers: bool,
- ) -> None:
- """
- Test the behaviour of multiple devices syncing at the same time.
- Roughly the user's presence state should be set to the "highest" priority
- of all the devices. When a device then goes offline its state should be
- discarded and the next highest should win.
- Note that these tests use the idle timer (and don't close the syncs), it
- is unlikely that a *single* sync would last this long, but is close enough
- to continually syncing with that current state.
- """
- user_id = f"@test:{self.hs.config.server.server_name}"
- # By default, we call /sync against the main process.
- worker_presence_handler = self.presence_handler
- if test_with_workers:
- # Create a worker and use it to handle /sync traffic instead.
- # This is used to test that presence changes get replicated from workers
- # to the main process correctly.
- worker_to_sync_against = self.make_worker_hs(
- "synapse.app.generic_worker", {"worker_name": "synchrotron"}
- )
- worker_presence_handler = worker_to_sync_against.get_presence_handler()
- # 1. Sync with the first device.
- sync_1 = self.get_success(
- worker_presence_handler.user_syncing(
- user_id,
- "dev-1",
- affect_presence=dev_1_state != PresenceState.OFFLINE,
- presence_state=dev_1_state,
- ),
- by=0.1,
- )
- # 2. Sync with the second device.
- sync_2 = self.get_success(
- worker_presence_handler.user_syncing(
- user_id,
- "dev-2",
- affect_presence=dev_2_state != PresenceState.OFFLINE,
- presence_state=dev_2_state,
- ),
- by=0.1,
- )
- # 3. Assert the expected presence state.
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, expected_state_1)
- if test_with_workers:
- state = self.get_success(
- worker_presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, expected_state_1)
- # 4. Disconnect the first device.
- with sync_1:
- pass
- # 5. Advance such that the first device should be discarded (the sync timeout),
- # then pump so _handle_timeouts function to called.
- self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000)
- self.reactor.pump([5])
- # 6. Assert the expected presence state.
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, expected_state_2)
- if test_with_workers:
- state = self.get_success(
- worker_presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, expected_state_2)
- # 7. Disconnect the second device.
- with sync_2:
- pass
- # 8. Advance such that the second device should be discarded (the sync timeout),
- # then pump so _handle_timeouts function to called.
- if dev_1_state == PresenceState.BUSY or dev_2_state == PresenceState.BUSY:
- timeout = BUSY_ONLINE_TIMEOUT
- else:
- timeout = SYNC_ONLINE_TIMEOUT
- self.reactor.advance(timeout / 1000)
- self.reactor.pump([5])
- # 9. There are no more devices, should be offline.
- state = self.get_success(
- self.presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, PresenceState.OFFLINE)
- if test_with_workers:
- state = self.get_success(
- worker_presence_handler.get_state(UserID.from_string(user_id))
- )
- self.assertEqual(state.state, PresenceState.OFFLINE)
- def test_set_presence_from_syncing_keeps_status(self) -> None:
- """Test that presence set by syncing retains status message"""
- status_msg = "I'm here!"
- self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg)
- self.get_success(
- self.presence_handler.user_syncing(
- self.user_id, self.device_id, True, PresenceState.ONLINE
- )
- )
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- # our status message should be the same as it was before
- self.assertEqual(state.status_msg, status_msg)
- @parameterized.expand([(False,), (True,)])
- @unittest.override_config({"experimental_features": {"msc3026_enabled": True}})
- def test_set_presence_from_syncing_keeps_busy(
- self, test_with_workers: bool
- ) -> None:
- """Test that presence set by syncing doesn't affect busy status
- Args:
- test_with_workers: If True, check the presence state of the user by calling
- /sync against a worker, rather than the main process.
- """
- status_msg = "I'm busy!"
- # By default, we call /sync against the main process.
- worker_to_sync_against = self.hs
- if test_with_workers:
- # Create a worker and use it to handle /sync traffic instead.
- # This is used to test that presence changes get replicated from workers
- # to the main process correctly.
- worker_to_sync_against = self.make_worker_hs(
- "synapse.app.generic_worker", {"worker_name": "synchrotron"}
- )
- # Set presence to BUSY
- self._set_presencestate_with_status_msg(PresenceState.BUSY, status_msg)
- # Perform a sync with a presence state other than busy. This should NOT change
- # our presence status; we only change from busy if we explicitly set it via
- # /presence/*.
- self.get_success(
- worker_to_sync_against.get_presence_handler().user_syncing(
- self.user_id, self.device_id, True, PresenceState.ONLINE
- ),
- by=0.1,
- )
- # Check against the main process that the user's presence did not change.
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- # we should still be busy
- self.assertEqual(state.state, PresenceState.BUSY)
- # Advance such that the device would be discarded if it was not busy,
- # then pump so _handle_timeouts function to called.
- self.reactor.advance(IDLE_TIMER / 1000)
- self.reactor.pump([5])
- # The account should still be busy.
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- self.assertEqual(state.state, PresenceState.BUSY)
- # Ensure that a /presence call can set the user *off* busy.
- self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg)
- state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- self.assertEqual(state.state, PresenceState.ONLINE)
- def _set_presencestate_with_status_msg(
- self, state: str, status_msg: Optional[str]
- ) -> None:
- """Set a PresenceState and status_msg and check the result.
- Args:
- state: The new PresenceState.
- status_msg: Status message that is to be set.
- """
- self.get_success(
- self.presence_handler.set_state(
- self.user_id_obj,
- self.device_id,
- {"presence": state, "status_msg": status_msg},
- )
- )
- new_state = self.get_success(self.presence_handler.get_state(self.user_id_obj))
- self.assertEqual(new_state.state, state)
- self.assertEqual(new_state.status_msg, status_msg)
- class PresenceFederationQueueTestCase(unittest.HomeserverTestCase):
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.presence_handler = hs.get_presence_handler()
- self.clock = hs.get_clock()
- self.instance_name = hs.get_instance_name()
- self.queue = self.presence_handler.get_federation_queue()
- def test_send_and_get(self) -> None:
- state1 = UserPresenceState.default("@user1:test")
- state2 = UserPresenceState.default("@user2:test")
- state3 = UserPresenceState.default("@user3:test")
- prev_token = self.queue.get_current_token(self.instance_name)
- self.get_success(
- self.queue.send_presence_to_destinations(
- (state1, state2), ("dest1", "dest2")
- )
- )
- self.get_success(
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
- )
- now_token = self.queue.get_current_token(self.instance_name)
- rows, upto_token, limited = self.get_success(
- self.queue.get_replication_rows("master", prev_token, now_token, 10)
- )
- self.assertEqual(upto_token, now_token)
- self.assertFalse(limited)
- expected_rows = [
- (1, ("dest1", "@user1:test")),
- (1, ("dest2", "@user1:test")),
- (1, ("dest1", "@user2:test")),
- (1, ("dest2", "@user2:test")),
- (2, ("dest3", "@user3:test")),
- ]
- self.assertCountEqual(rows, expected_rows)
- now_token = self.queue.get_current_token(self.instance_name)
- rows, upto_token, limited = self.get_success(
- self.queue.get_replication_rows("master", upto_token, now_token, 10)
- )
- self.assertEqual(upto_token, now_token)
- self.assertFalse(limited)
- self.assertCountEqual(rows, [])
- def test_send_and_get_split(self) -> None:
- state1 = UserPresenceState.default("@user1:test")
- state2 = UserPresenceState.default("@user2:test")
- state3 = UserPresenceState.default("@user3:test")
- prev_token = self.queue.get_current_token(self.instance_name)
- self.get_success(
- self.queue.send_presence_to_destinations(
- (state1, state2), ("dest1", "dest2")
- )
- )
- now_token = self.queue.get_current_token(self.instance_name)
- self.get_success(
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
- )
- rows, upto_token, limited = self.get_success(
- self.queue.get_replication_rows("master", prev_token, now_token, 10)
- )
- self.assertEqual(upto_token, now_token)
- self.assertFalse(limited)
- expected_rows = [
- (1, ("dest1", "@user1:test")),
- (1, ("dest2", "@user1:test")),
- (1, ("dest1", "@user2:test")),
- (1, ("dest2", "@user2:test")),
- ]
- self.assertCountEqual(rows, expected_rows)
- now_token = self.queue.get_current_token(self.instance_name)
- rows, upto_token, limited = self.get_success(
- self.queue.get_replication_rows("master", upto_token, now_token, 10)
- )
- self.assertEqual(upto_token, now_token)
- self.assertFalse(limited)
- expected_rows = [
- (2, ("dest3", "@user3:test")),
- ]
- self.assertCountEqual(rows, expected_rows)
- def test_clear_queue_all(self) -> None:
- state1 = UserPresenceState.default("@user1:test")
- state2 = UserPresenceState.default("@user2:test")
- state3 = UserPresenceState.default("@user3:test")
- prev_token = self.queue.get_current_token(self.instance_name)
- self.get_success(
- self.queue.send_presence_to_destinations(
- (state1, state2), ("dest1", "dest2")
- )
- )
- self.get_success(
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
- )
- self.reactor.advance(10 * 60 * 1000)
- now_token = self.queue.get_current_token(self.instance_name)
- rows, upto_token, limited = self.get_success(
- self.queue.get_replication_rows("master", prev_token, now_token, 10)
- )
- self.assertEqual(upto_token, now_token)
- self.assertFalse(limited)
- self.assertCountEqual(rows, [])
- prev_token = self.queue.get_current_token(self.instance_name)
- self.get_success(
- self.queue.send_presence_to_destinations(
- (state1, state2), ("dest1", "dest2")
- )
- )
- self.get_success(
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
- )
- now_token = self.queue.get_current_token(self.instance_name)
- rows, upto_token, limited = self.get_success(
- self.queue.get_replication_rows("master", prev_token, now_token, 10)
- )
- self.assertEqual(upto_token, now_token)
- self.assertFalse(limited)
- expected_rows = [
- (3, ("dest1", "@user1:test")),
- (3, ("dest2", "@user1:test")),
- (3, ("dest1", "@user2:test")),
- (3, ("dest2", "@user2:test")),
- (4, ("dest3", "@user3:test")),
- ]
- self.assertCountEqual(rows, expected_rows)
- def test_partially_clear_queue(self) -> None:
- state1 = UserPresenceState.default("@user1:test")
- state2 = UserPresenceState.default("@user2:test")
- state3 = UserPresenceState.default("@user3:test")
- prev_token = self.queue.get_current_token(self.instance_name)
- self.get_success(
- self.queue.send_presence_to_destinations(
- (state1, state2), ("dest1", "dest2")
- )
- )
- self.reactor.advance(2 * 60 * 1000)
- self.get_success(
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
- )
- self.reactor.advance(4 * 60 * 1000)
- now_token = self.queue.get_current_token(self.instance_name)
- rows, upto_token, limited = self.get_success(
- self.queue.get_replication_rows("master", prev_token, now_token, 10)
- )
- self.assertEqual(upto_token, now_token)
- self.assertFalse(limited)
- self.assertCountEqual(rows, [])
- prev_token = self.queue.get_current_token(self.instance_name)
- self.get_success(
- self.queue.send_presence_to_destinations(
- (state1, state2), ("dest1", "dest2")
- )
- )
- self.get_success(
- self.queue.send_presence_to_destinations((state3,), ("dest3",))
- )
- now_token = self.queue.get_current_token(self.instance_name)
- rows, upto_token, limited = self.get_success(
- self.queue.get_replication_rows("master", prev_token, now_token, 10)
- )
- self.assertEqual(upto_token, now_token)
- self.assertFalse(limited)
- expected_rows = [
- (3, ("dest1", "@user1:test")),
- (3, ("dest2", "@user1:test")),
- (3, ("dest1", "@user2:test")),
- (3, ("dest2", "@user2:test")),
- (4, ("dest3", "@user3:test")),
- ]
- self.assertCountEqual(rows, expected_rows)
- class PresenceJoinTestCase(unittest.HomeserverTestCase):
- """Tests remote servers get told about presence of users in the room when
- they join and when new local users join.
- """
- user_id = "@test:server"
- servlets = [room.register_servlets]
- def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
- hs = self.setup_test_homeserver(
- "server",
- federation_sender=Mock(spec=FederationSender),
- )
- return hs
- def default_config(self) -> JsonDict:
- config = super().default_config()
- # Enable federation sending on the main process.
- config["federation_sender_instances"] = None
- return config
- def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
- self.federation_sender = cast(Mock, hs.get_federation_sender())
- self.event_builder_factory = hs.get_event_builder_factory()
- self.federation_event_handler = hs.get_federation_event_handler()
- self.presence_handler = hs.get_presence_handler()
- # self.event_builder_for_2 = EventBuilderFactory(hs)
- # self.event_builder_for_2.hostname = "test2"
- self.store = hs.get_datastores().main
- self.state = hs.get_state_handler()
- self._event_auth_handler = hs.get_event_auth_handler()
- # We don't actually check signatures in tests, so lets just create a
- # random key to use.
- self.random_signing_key = generate_signing_key("ver")
- def test_remote_joins(self) -> None:
- # We advance time to something that isn't 0, as we use 0 as a special
- # value.
- self.reactor.advance(1000000000000)
- # Create a room with two local users
- room_id = self.helper.create_room_as(self.user_id)
- self.helper.join(room_id, "@test2:server")
- # Mark test2 as online, test will be offline with a last_active of 0
- self.get_success(
- self.presence_handler.set_state(
- UserID.from_string("@test2:server"),
- "dev-1",
- {"presence": PresenceState.ONLINE},
- )
- )
- self.reactor.pump([0]) # Wait for presence updates to be handled
- #
- # Test that a new server gets told about existing presence
- #
- self.federation_sender.reset_mock()
- # Add a new remote server to the room
- self._add_new_user(room_id, "@alice:server2")
- # When new server is joined we send it the local users presence states.
- # We expect to only see user @test2:server, as @test:server is offline
- # and has a zero last_active_ts
- expected_state = self.get_success(
- self.presence_handler.current_state_for_user("@test2:server")
- )
- self.assertEqual(expected_state.state, PresenceState.ONLINE)
- self.federation_sender.send_presence_to_destinations.assert_called_once_with(
- destinations={"server2"}, states=[expected_state]
- )
- #
- # Test that only the new server gets sent presence and not existing servers
- #
- self.federation_sender.reset_mock()
- self._add_new_user(room_id, "@bob:server3")
- self.federation_sender.send_presence_to_destinations.assert_called_once_with(
- destinations={"server3"}, states=[expected_state]
- )
- def test_remote_gets_presence_when_local_user_joins(self) -> None:
- # We advance time to something that isn't 0, as we use 0 as a special
- # value.
- self.reactor.advance(1000000000000)
- # Create a room with one local users
- room_id = self.helper.create_room_as(self.user_id)
- # Mark test as online
- self.get_success(
- self.presence_handler.set_state(
- UserID.from_string("@test:server"),
- "dev-1",
- {"presence": PresenceState.ONLINE},
- )
- )
- # Mark test2 as online, test will be offline with a last_active of 0.
- # Note we don't join them to the room yet
- self.get_success(
- self.presence_handler.set_state(
- UserID.from_string("@test2:server"),
- "dev-1",
- {"presence": PresenceState.ONLINE},
- )
- )
- # Add servers to the room
- self._add_new_user(room_id, "@alice:server2")
- self._add_new_user(room_id, "@bob:server3")
- self.reactor.pump([0]) # Wait for presence updates to be handled
- #
- # Test that when a local join happens remote servers get told about it
- #
- self.federation_sender.reset_mock()
- # Join local user to room
- self.helper.join(room_id, "@test2:server")
- self.reactor.pump([0]) # Wait for presence updates to be handled
- # We expect to only send test2 presence to server2 and server3
- expected_state = self.get_success(
- self.presence_handler.current_state_for_user("@test2:server")
- )
- self.assertEqual(expected_state.state, PresenceState.ONLINE)
- self.federation_sender.send_presence_to_destinations.assert_called_once_with(
- destinations={"server2", "server3"}, states=[expected_state]
- )
- def _add_new_user(self, room_id: str, user_id: str) -> None:
- """Add new user to the room by creating an event and poking the federation API."""
- hostname = get_domain_from_id(user_id)
- room_version = self.get_success(self.store.get_room_version_id(room_id))
- builder = EventBuilder(
- state=self.state,
- event_auth_handler=self._event_auth_handler,
- store=self.store,
- clock=self.clock,
- hostname=hostname,
- signing_key=self.random_signing_key,
- room_version=KNOWN_ROOM_VERSIONS[room_version],
- room_id=room_id,
- type=EventTypes.Member,
- sender=user_id,
- state_key=user_id,
- content={"membership": Membership.JOIN},
- )
- prev_event_ids = self.get_success(
- self.store.get_latest_event_ids_in_room(room_id)
- )
- event = self.get_success(
- builder.build(prev_event_ids=list(prev_event_ids), auth_event_ids=None)
- )
- self.get_success(self.federation_event_handler.on_receive_pdu(hostname, event))
- # Check that it was successfully persisted.
- self.get_success(self.store.get_event(event.event_id))
- self.get_success(self.store.get_event(event.event_id))
|