presence.py 50 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2020 The Matrix.org Foundation C.I.C.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. """This module is responsible for keeping track of presence status of local
  17. and remote users.
  18. The methods that define policy are:
  19. - PresenceHandler._update_states
  20. - PresenceHandler._handle_timeouts
  21. - should_notify
  22. """
  23. import abc
  24. import logging
  25. from contextlib import contextmanager
  26. from typing import Dict, Iterable, List, Set, Tuple
  27. from prometheus_client import Counter
  28. from typing_extensions import ContextManager
  29. import synapse.metrics
  30. from synapse.api.constants import EventTypes, Membership, PresenceState
  31. from synapse.api.errors import SynapseError
  32. from synapse.api.presence import UserPresenceState
  33. from synapse.logging.context import run_in_background
  34. from synapse.logging.utils import log_function
  35. from synapse.metrics import LaterGauge
  36. from synapse.metrics.background_process_metrics import run_as_background_process
  37. from synapse.state import StateHandler
  38. from synapse.storage.databases.main import DataStore
  39. from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
  40. from synapse.util.async_helpers import Linearizer
  41. from synapse.util.caches.descriptors import cached
  42. from synapse.util.metrics import Measure
  43. from synapse.util.wheel_timer import WheelTimer
  44. MYPY = False
  45. if MYPY:
  46. import synapse.server
  47. logger = logging.getLogger(__name__)
  48. notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
  49. federation_presence_out_counter = Counter(
  50. "synapse_handler_presence_federation_presence_out", ""
  51. )
  52. presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
  53. timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
  54. federation_presence_counter = Counter(
  55. "synapse_handler_presence_federation_presence", ""
  56. )
  57. bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
  58. get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
  59. notify_reason_counter = Counter(
  60. "synapse_handler_presence_notify_reason", "", ["reason"]
  61. )
  62. state_transition_counter = Counter(
  63. "synapse_handler_presence_state_transition", "", ["from", "to"]
  64. )
  65. # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
  66. # "currently_active"
  67. LAST_ACTIVE_GRANULARITY = 60 * 1000
  68. # How long to wait until a new /events or /sync request before assuming
  69. # the client has gone.
  70. SYNC_ONLINE_TIMEOUT = 30 * 1000
  71. # How long to wait before marking the user as idle. Compared against last active
  72. IDLE_TIMER = 5 * 60 * 1000
  73. # How often we expect remote servers to resend us presence.
  74. FEDERATION_TIMEOUT = 30 * 60 * 1000
  75. # How often to resend presence to remote servers
  76. FEDERATION_PING_INTERVAL = 25 * 60 * 1000
  77. # How long we will wait before assuming that the syncs from an external process
  78. # are dead.
  79. EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
  80. assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
  81. class BasePresenceHandler(abc.ABC):
  82. """Parts of the PresenceHandler that are shared between workers and master"""
  83. def __init__(self, hs: "synapse.server.HomeServer"):
  84. self.clock = hs.get_clock()
  85. self.store = hs.get_datastore()
  86. active_presence = self.store.take_presence_startup_info()
  87. self.user_to_current_state = {state.user_id: state for state in active_presence}
  88. @abc.abstractmethod
  89. async def user_syncing(
  90. self, user_id: str, affect_presence: bool
  91. ) -> ContextManager[None]:
  92. """Returns a context manager that should surround any stream requests
  93. from the user.
  94. This allows us to keep track of who is currently streaming and who isn't
  95. without having to have timers outside of this module to avoid flickering
  96. when users disconnect/reconnect.
  97. Args:
  98. user_id: the user that is starting a sync
  99. affect_presence: If false this function will be a no-op.
  100. Useful for streams that are not associated with an actual
  101. client that is being used by a user.
  102. """
  103. @abc.abstractmethod
  104. def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
  105. """Get an iterable of syncing users on this worker, to send to the presence handler
  106. This is called when a replication connection is established. It should return
  107. a list of user ids, which are then sent as USER_SYNC commands to inform the
  108. process handling presence about those users.
  109. Returns:
  110. An iterable of user_id strings.
  111. """
  112. async def get_state(self, target_user: UserID) -> UserPresenceState:
  113. results = await self.get_states([target_user.to_string()])
  114. return results[0]
  115. async def get_states(
  116. self, target_user_ids: Iterable[str]
  117. ) -> List[UserPresenceState]:
  118. """Get the presence state for users."""
  119. updates_d = await self.current_state_for_users(target_user_ids)
  120. updates = list(updates_d.values())
  121. for user_id in set(target_user_ids) - {u.user_id for u in updates}:
  122. updates.append(UserPresenceState.default(user_id))
  123. return updates
  124. async def current_state_for_users(
  125. self, user_ids: Iterable[str]
  126. ) -> Dict[str, UserPresenceState]:
  127. """Get the current presence state for multiple users.
  128. Returns:
  129. dict: `user_id` -> `UserPresenceState`
  130. """
  131. states = {
  132. user_id: self.user_to_current_state.get(user_id, None)
  133. for user_id in user_ids
  134. }
  135. missing = [user_id for user_id, state in states.items() if not state]
  136. if missing:
  137. # There are things not in our in memory cache. Lets pull them out of
  138. # the database.
  139. res = await self.store.get_presence_for_users(missing)
  140. states.update(res)
  141. missing = [user_id for user_id, state in states.items() if not state]
  142. if missing:
  143. new = {
  144. user_id: UserPresenceState.default(user_id) for user_id in missing
  145. }
  146. states.update(new)
  147. self.user_to_current_state.update(new)
  148. return states
  149. @abc.abstractmethod
  150. async def set_state(
  151. self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
  152. ) -> None:
  153. """Set the presence state of the user. """
  154. @abc.abstractmethod
  155. async def bump_presence_active_time(self, user: UserID):
  156. """We've seen the user do something that indicates they're interacting
  157. with the app.
  158. """
  159. class PresenceHandler(BasePresenceHandler):
  160. def __init__(self, hs: "synapse.server.HomeServer"):
  161. super().__init__(hs)
  162. self.hs = hs
  163. self.is_mine_id = hs.is_mine_id
  164. self.server_name = hs.hostname
  165. self.wheel_timer = WheelTimer()
  166. self.notifier = hs.get_notifier()
  167. self.federation = hs.get_federation_sender()
  168. self.state = hs.get_state_handler()
  169. self._presence_enabled = hs.config.use_presence
  170. federation_registry = hs.get_federation_registry()
  171. federation_registry.register_edu_handler("m.presence", self.incoming_presence)
  172. LaterGauge(
  173. "synapse_handlers_presence_user_to_current_state_size",
  174. "",
  175. [],
  176. lambda: len(self.user_to_current_state),
  177. )
  178. now = self.clock.time_msec()
  179. for state in self.user_to_current_state.values():
  180. self.wheel_timer.insert(
  181. now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
  182. )
  183. self.wheel_timer.insert(
  184. now=now,
  185. obj=state.user_id,
  186. then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  187. )
  188. if self.is_mine_id(state.user_id):
  189. self.wheel_timer.insert(
  190. now=now,
  191. obj=state.user_id,
  192. then=state.last_federation_update_ts + FEDERATION_PING_INTERVAL,
  193. )
  194. else:
  195. self.wheel_timer.insert(
  196. now=now,
  197. obj=state.user_id,
  198. then=state.last_federation_update_ts + FEDERATION_TIMEOUT,
  199. )
  200. # Set of users who have presence in the `user_to_current_state` that
  201. # have not yet been persisted
  202. self.unpersisted_users_changes = set() # type: Set[str]
  203. hs.get_reactor().addSystemEventTrigger(
  204. "before",
  205. "shutdown",
  206. run_as_background_process,
  207. "presence.on_shutdown",
  208. self._on_shutdown,
  209. )
  210. self._next_serial = 1
  211. # Keeps track of the number of *ongoing* syncs on this process. While
  212. # this is non zero a user will never go offline.
  213. self.user_to_num_current_syncs = {} # type: Dict[str, int]
  214. # Keeps track of the number of *ongoing* syncs on other processes.
  215. # While any sync is ongoing on another process the user will never
  216. # go offline.
  217. # Each process has a unique identifier and an update frequency. If
  218. # no update is received from that process within the update period then
  219. # we assume that all the sync requests on that process have stopped.
  220. # Stored as a dict from process_id to set of user_id, and a dict of
  221. # process_id to millisecond timestamp last updated.
  222. self.external_process_to_current_syncs = {} # type: Dict[int, Set[str]]
  223. self.external_process_last_updated_ms = {} # type: Dict[int, int]
  224. self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
  225. # Start a LoopingCall in 30s that fires every 5s.
  226. # The initial delay is to allow disconnected clients a chance to
  227. # reconnect before we treat them as offline.
  228. def run_timeout_handler():
  229. return run_as_background_process(
  230. "handle_presence_timeouts", self._handle_timeouts
  231. )
  232. self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000)
  233. def run_persister():
  234. return run_as_background_process(
  235. "persist_presence_changes", self._persist_unpersisted_changes
  236. )
  237. self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
  238. LaterGauge(
  239. "synapse_handlers_presence_wheel_timer_size",
  240. "",
  241. [],
  242. lambda: len(self.wheel_timer),
  243. )
  244. # Used to handle sending of presence to newly joined users/servers
  245. if hs.config.use_presence:
  246. self.notifier.add_replication_callback(self.notify_new_event)
  247. # Presence is best effort and quickly heals itself, so lets just always
  248. # stream from the current state when we restart.
  249. self._event_pos = self.store.get_current_events_token()
  250. self._event_processing = False
  251. async def _on_shutdown(self):
  252. """Gets called when shutting down. This lets us persist any updates that
  253. we haven't yet persisted, e.g. updates that only changes some internal
  254. timers. This allows changes to persist across startup without having to
  255. persist every single change.
  256. If this does not run it simply means that some of the timers will fire
  257. earlier than they should when synapse is restarted. This affect of this
  258. is some spurious presence changes that will self-correct.
  259. """
  260. # If the DB pool has already terminated, don't try updating
  261. if not self.store.db_pool.is_running():
  262. return
  263. logger.info(
  264. "Performing _on_shutdown. Persisting %d unpersisted changes",
  265. len(self.user_to_current_state),
  266. )
  267. if self.unpersisted_users_changes:
  268. await self.store.update_presence(
  269. [
  270. self.user_to_current_state[user_id]
  271. for user_id in self.unpersisted_users_changes
  272. ]
  273. )
  274. logger.info("Finished _on_shutdown")
  275. async def _persist_unpersisted_changes(self):
  276. """We periodically persist the unpersisted changes, as otherwise they
  277. may stack up and slow down shutdown times.
  278. """
  279. unpersisted = self.unpersisted_users_changes
  280. self.unpersisted_users_changes = set()
  281. if unpersisted:
  282. logger.info("Persisting %d unpersisted presence updates", len(unpersisted))
  283. await self.store.update_presence(
  284. [self.user_to_current_state[user_id] for user_id in unpersisted]
  285. )
  286. async def _update_states(self, new_states):
  287. """Updates presence of users. Sets the appropriate timeouts. Pokes
  288. the notifier and federation if and only if the changed presence state
  289. should be sent to clients/servers.
  290. """
  291. now = self.clock.time_msec()
  292. with Measure(self.clock, "presence_update_states"):
  293. # NOTE: We purposefully don't await between now and when we've
  294. # calculated what we want to do with the new states, to avoid races.
  295. to_notify = {} # Changes we want to notify everyone about
  296. to_federation_ping = {} # These need sending keep-alives
  297. # Only bother handling the last presence change for each user
  298. new_states_dict = {}
  299. for new_state in new_states:
  300. new_states_dict[new_state.user_id] = new_state
  301. new_state = new_states_dict.values()
  302. for new_state in new_states:
  303. user_id = new_state.user_id
  304. # Its fine to not hit the database here, as the only thing not in
  305. # the current state cache are OFFLINE states, where the only field
  306. # of interest is last_active which is safe enough to assume is 0
  307. # here.
  308. prev_state = self.user_to_current_state.get(
  309. user_id, UserPresenceState.default(user_id)
  310. )
  311. new_state, should_notify, should_ping = handle_update(
  312. prev_state,
  313. new_state,
  314. is_mine=self.is_mine_id(user_id),
  315. wheel_timer=self.wheel_timer,
  316. now=now,
  317. )
  318. self.user_to_current_state[user_id] = new_state
  319. if should_notify:
  320. to_notify[user_id] = new_state
  321. elif should_ping:
  322. to_federation_ping[user_id] = new_state
  323. # TODO: We should probably ensure there are no races hereafter
  324. presence_updates_counter.inc(len(new_states))
  325. if to_notify:
  326. notified_presence_counter.inc(len(to_notify))
  327. await self._persist_and_notify(list(to_notify.values()))
  328. self.unpersisted_users_changes |= {s.user_id for s in new_states}
  329. self.unpersisted_users_changes -= set(to_notify.keys())
  330. to_federation_ping = {
  331. user_id: state
  332. for user_id, state in to_federation_ping.items()
  333. if user_id not in to_notify
  334. }
  335. if to_federation_ping:
  336. federation_presence_out_counter.inc(len(to_federation_ping))
  337. self._push_to_remotes(to_federation_ping.values())
  338. async def _handle_timeouts(self):
  339. """Checks the presence of users that have timed out and updates as
  340. appropriate.
  341. """
  342. logger.debug("Handling presence timeouts")
  343. now = self.clock.time_msec()
  344. # Fetch the list of users that *may* have timed out. Things may have
  345. # changed since the timeout was set, so we won't necessarily have to
  346. # take any action.
  347. users_to_check = set(self.wheel_timer.fetch(now))
  348. # Check whether the lists of syncing processes from an external
  349. # process have expired.
  350. expired_process_ids = [
  351. process_id
  352. for process_id, last_update in self.external_process_last_updated_ms.items()
  353. if now - last_update > EXTERNAL_PROCESS_EXPIRY
  354. ]
  355. for process_id in expired_process_ids:
  356. # For each expired process drop tracking info and check the users
  357. # that were syncing on that process to see if they need to be timed
  358. # out.
  359. users_to_check.update(
  360. self.external_process_to_current_syncs.pop(process_id, ())
  361. )
  362. self.external_process_last_updated_ms.pop(process_id)
  363. states = [
  364. self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
  365. for user_id in users_to_check
  366. ]
  367. timers_fired_counter.inc(len(states))
  368. syncing_user_ids = {
  369. user_id
  370. for user_id, count in self.user_to_num_current_syncs.items()
  371. if count
  372. }
  373. for user_ids in self.external_process_to_current_syncs.values():
  374. syncing_user_ids.update(user_ids)
  375. changes = handle_timeouts(
  376. states,
  377. is_mine_fn=self.is_mine_id,
  378. syncing_user_ids=syncing_user_ids,
  379. now=now,
  380. )
  381. return await self._update_states(changes)
  382. async def bump_presence_active_time(self, user):
  383. """We've seen the user do something that indicates they're interacting
  384. with the app.
  385. """
  386. # If presence is disabled, no-op
  387. if not self.hs.config.use_presence:
  388. return
  389. user_id = user.to_string()
  390. bump_active_time_counter.inc()
  391. prev_state = await self.current_state_for_user(user_id)
  392. new_fields = {"last_active_ts": self.clock.time_msec()}
  393. if prev_state.state == PresenceState.UNAVAILABLE:
  394. new_fields["state"] = PresenceState.ONLINE
  395. await self._update_states([prev_state.copy_and_replace(**new_fields)])
  396. async def user_syncing(
  397. self, user_id: str, affect_presence: bool = True
  398. ) -> ContextManager[None]:
  399. """Returns a context manager that should surround any stream requests
  400. from the user.
  401. This allows us to keep track of who is currently streaming and who isn't
  402. without having to have timers outside of this module to avoid flickering
  403. when users disconnect/reconnect.
  404. Args:
  405. user_id (str)
  406. affect_presence (bool): If false this function will be a no-op.
  407. Useful for streams that are not associated with an actual
  408. client that is being used by a user.
  409. """
  410. # Override if it should affect the user's presence, if presence is
  411. # disabled.
  412. if not self.hs.config.use_presence:
  413. affect_presence = False
  414. if affect_presence:
  415. curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
  416. self.user_to_num_current_syncs[user_id] = curr_sync + 1
  417. prev_state = await self.current_state_for_user(user_id)
  418. if prev_state.state == PresenceState.OFFLINE:
  419. # If they're currently offline then bring them online, otherwise
  420. # just update the last sync times.
  421. await self._update_states(
  422. [
  423. prev_state.copy_and_replace(
  424. state=PresenceState.ONLINE,
  425. last_active_ts=self.clock.time_msec(),
  426. last_user_sync_ts=self.clock.time_msec(),
  427. )
  428. ]
  429. )
  430. else:
  431. await self._update_states(
  432. [
  433. prev_state.copy_and_replace(
  434. last_user_sync_ts=self.clock.time_msec()
  435. )
  436. ]
  437. )
  438. async def _end():
  439. try:
  440. self.user_to_num_current_syncs[user_id] -= 1
  441. prev_state = await self.current_state_for_user(user_id)
  442. await self._update_states(
  443. [
  444. prev_state.copy_and_replace(
  445. last_user_sync_ts=self.clock.time_msec()
  446. )
  447. ]
  448. )
  449. except Exception:
  450. logger.exception("Error updating presence after sync")
  451. @contextmanager
  452. def _user_syncing():
  453. try:
  454. yield
  455. finally:
  456. if affect_presence:
  457. run_in_background(_end)
  458. return _user_syncing()
  459. def get_currently_syncing_users_for_replication(self) -> Iterable[str]:
  460. # since we are the process handling presence, there is nothing to do here.
  461. return []
  462. async def update_external_syncs_row(
  463. self, process_id, user_id, is_syncing, sync_time_msec
  464. ):
  465. """Update the syncing users for an external process as a delta.
  466. Args:
  467. process_id (str): An identifier for the process the users are
  468. syncing against. This allows synapse to process updates
  469. as user start and stop syncing against a given process.
  470. user_id (str): The user who has started or stopped syncing
  471. is_syncing (bool): Whether or not the user is now syncing
  472. sync_time_msec(int): Time in ms when the user was last syncing
  473. """
  474. with (await self.external_sync_linearizer.queue(process_id)):
  475. prev_state = await self.current_state_for_user(user_id)
  476. process_presence = self.external_process_to_current_syncs.setdefault(
  477. process_id, set()
  478. )
  479. updates = []
  480. if is_syncing and user_id not in process_presence:
  481. if prev_state.state == PresenceState.OFFLINE:
  482. updates.append(
  483. prev_state.copy_and_replace(
  484. state=PresenceState.ONLINE,
  485. last_active_ts=sync_time_msec,
  486. last_user_sync_ts=sync_time_msec,
  487. )
  488. )
  489. else:
  490. updates.append(
  491. prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
  492. )
  493. process_presence.add(user_id)
  494. elif user_id in process_presence:
  495. updates.append(
  496. prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
  497. )
  498. if not is_syncing:
  499. process_presence.discard(user_id)
  500. if updates:
  501. await self._update_states(updates)
  502. self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
  503. async def update_external_syncs_clear(self, process_id):
  504. """Marks all users that had been marked as syncing by a given process
  505. as offline.
  506. Used when the process has stopped/disappeared.
  507. """
  508. with (await self.external_sync_linearizer.queue(process_id)):
  509. process_presence = self.external_process_to_current_syncs.pop(
  510. process_id, set()
  511. )
  512. prev_states = await self.current_state_for_users(process_presence)
  513. time_now_ms = self.clock.time_msec()
  514. await self._update_states(
  515. [
  516. prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
  517. for prev_state in prev_states.values()
  518. ]
  519. )
  520. self.external_process_last_updated_ms.pop(process_id, None)
  521. async def current_state_for_user(self, user_id):
  522. """Get the current presence state for a user.
  523. """
  524. res = await self.current_state_for_users([user_id])
  525. return res[user_id]
  526. async def _persist_and_notify(self, states):
  527. """Persist states in the database, poke the notifier and send to
  528. interested remote servers
  529. """
  530. stream_id, max_token = await self.store.update_presence(states)
  531. parties = await get_interested_parties(self.store, states)
  532. room_ids_to_states, users_to_states = parties
  533. self.notifier.on_new_event(
  534. "presence_key",
  535. stream_id,
  536. rooms=room_ids_to_states.keys(),
  537. users=[UserID.from_string(u) for u in users_to_states],
  538. )
  539. self._push_to_remotes(states)
  540. async def notify_for_states(self, state, stream_id):
  541. parties = await get_interested_parties(self.store, [state])
  542. room_ids_to_states, users_to_states = parties
  543. self.notifier.on_new_event(
  544. "presence_key",
  545. stream_id,
  546. rooms=room_ids_to_states.keys(),
  547. users=[UserID.from_string(u) for u in users_to_states],
  548. )
  549. def _push_to_remotes(self, states):
  550. """Sends state updates to remote servers.
  551. Args:
  552. states (list(UserPresenceState))
  553. """
  554. self.federation.send_presence(states)
  555. async def incoming_presence(self, origin, content):
  556. """Called when we receive a `m.presence` EDU from a remote server.
  557. """
  558. if not self._presence_enabled:
  559. return
  560. now = self.clock.time_msec()
  561. updates = []
  562. for push in content.get("push", []):
  563. # A "push" contains a list of presence that we are probably interested
  564. # in.
  565. user_id = push.get("user_id", None)
  566. if not user_id:
  567. logger.info(
  568. "Got presence update from %r with no 'user_id': %r", origin, push
  569. )
  570. continue
  571. if get_domain_from_id(user_id) != origin:
  572. logger.info(
  573. "Got presence update from %r with bad 'user_id': %r",
  574. origin,
  575. user_id,
  576. )
  577. continue
  578. presence_state = push.get("presence", None)
  579. if not presence_state:
  580. logger.info(
  581. "Got presence update from %r with no 'presence_state': %r",
  582. origin,
  583. push,
  584. )
  585. continue
  586. new_fields = {"state": presence_state, "last_federation_update_ts": now}
  587. last_active_ago = push.get("last_active_ago", None)
  588. if last_active_ago is not None:
  589. new_fields["last_active_ts"] = now - last_active_ago
  590. new_fields["status_msg"] = push.get("status_msg", None)
  591. new_fields["currently_active"] = push.get("currently_active", False)
  592. prev_state = await self.current_state_for_user(user_id)
  593. updates.append(prev_state.copy_and_replace(**new_fields))
  594. if updates:
  595. federation_presence_counter.inc(len(updates))
  596. await self._update_states(updates)
  597. async def set_state(self, target_user, state, ignore_status_msg=False):
  598. """Set the presence state of the user.
  599. """
  600. status_msg = state.get("status_msg", None)
  601. presence = state["presence"]
  602. valid_presence = (
  603. PresenceState.ONLINE,
  604. PresenceState.UNAVAILABLE,
  605. PresenceState.OFFLINE,
  606. )
  607. if presence not in valid_presence:
  608. raise SynapseError(400, "Invalid presence state")
  609. user_id = target_user.to_string()
  610. prev_state = await self.current_state_for_user(user_id)
  611. new_fields = {"state": presence}
  612. if not ignore_status_msg:
  613. msg = status_msg if presence != PresenceState.OFFLINE else None
  614. new_fields["status_msg"] = msg
  615. if presence == PresenceState.ONLINE:
  616. new_fields["last_active_ts"] = self.clock.time_msec()
  617. await self._update_states([prev_state.copy_and_replace(**new_fields)])
  618. async def is_visible(self, observed_user, observer_user):
  619. """Returns whether a user can see another user's presence.
  620. """
  621. observer_room_ids = await self.store.get_rooms_for_user(
  622. observer_user.to_string()
  623. )
  624. observed_room_ids = await self.store.get_rooms_for_user(
  625. observed_user.to_string()
  626. )
  627. if observer_room_ids & observed_room_ids:
  628. return True
  629. return False
  630. async def get_all_presence_updates(
  631. self, instance_name: str, last_id: int, current_id: int, limit: int
  632. ) -> Tuple[List[Tuple[int, list]], int, bool]:
  633. """
  634. Gets a list of presence update rows from between the given stream ids.
  635. Each row has:
  636. - stream_id(str)
  637. - user_id(str)
  638. - state(str)
  639. - last_active_ts(int)
  640. - last_federation_update_ts(int)
  641. - last_user_sync_ts(int)
  642. - status_msg(int)
  643. - currently_active(int)
  644. Args:
  645. instance_name: The writer we want to fetch updates from. Unused
  646. here since there is only ever one writer.
  647. last_id: The token to fetch updates from. Exclusive.
  648. current_id: The token to fetch updates up to. Inclusive.
  649. limit: The requested limit for the number of rows to return. The
  650. function may return more or fewer rows.
  651. Returns:
  652. A tuple consisting of: the updates, a token to use to fetch
  653. subsequent updates, and whether we returned fewer rows than exists
  654. between the requested tokens due to the limit.
  655. The token returned can be used in a subsequent call to this
  656. function to get further updatees.
  657. The updates are a list of 2-tuples of stream ID and the row data
  658. """
  659. # TODO(markjh): replicate the unpersisted changes.
  660. # This could use the in-memory stores for recent changes.
  661. rows = await self.store.get_all_presence_updates(
  662. instance_name, last_id, current_id, limit
  663. )
  664. return rows
  665. def notify_new_event(self):
  666. """Called when new events have happened. Handles users and servers
  667. joining rooms and require being sent presence.
  668. """
  669. if self._event_processing:
  670. return
  671. async def _process_presence():
  672. assert not self._event_processing
  673. self._event_processing = True
  674. try:
  675. await self._unsafe_process()
  676. finally:
  677. self._event_processing = False
  678. run_as_background_process("presence.notify_new_event", _process_presence)
  679. async def _unsafe_process(self):
  680. # Loop round handling deltas until we're up to date
  681. while True:
  682. with Measure(self.clock, "presence_delta"):
  683. room_max_stream_ordering = self.store.get_room_max_stream_ordering()
  684. if self._event_pos == room_max_stream_ordering:
  685. return
  686. logger.debug(
  687. "Processing presence stats %s->%s",
  688. self._event_pos,
  689. room_max_stream_ordering,
  690. )
  691. max_pos, deltas = await self.store.get_current_state_deltas(
  692. self._event_pos, room_max_stream_ordering
  693. )
  694. await self._handle_state_delta(deltas)
  695. self._event_pos = max_pos
  696. # Expose current event processing position to prometheus
  697. synapse.metrics.event_processing_positions.labels("presence").set(
  698. max_pos
  699. )
  700. async def _handle_state_delta(self, deltas):
  701. """Process current state deltas to find new joins that need to be
  702. handled.
  703. """
  704. for delta in deltas:
  705. typ = delta["type"]
  706. state_key = delta["state_key"]
  707. room_id = delta["room_id"]
  708. event_id = delta["event_id"]
  709. prev_event_id = delta["prev_event_id"]
  710. logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
  711. if typ != EventTypes.Member:
  712. continue
  713. if event_id is None:
  714. # state has been deleted, so this is not a join. We only care about
  715. # joins.
  716. continue
  717. event = await self.store.get_event(event_id, allow_none=True)
  718. if not event or event.content.get("membership") != Membership.JOIN:
  719. # We only care about joins
  720. continue
  721. if prev_event_id:
  722. prev_event = await self.store.get_event(prev_event_id, allow_none=True)
  723. if (
  724. prev_event
  725. and prev_event.content.get("membership") == Membership.JOIN
  726. ):
  727. # Ignore changes to join events.
  728. continue
  729. await self._on_user_joined_room(room_id, state_key)
  730. async def _on_user_joined_room(self, room_id: str, user_id: str) -> None:
  731. """Called when we detect a user joining the room via the current state
  732. delta stream.
  733. """
  734. if self.is_mine_id(user_id):
  735. # If this is a local user then we need to send their presence
  736. # out to hosts in the room (who don't already have it)
  737. # TODO: We should be able to filter the hosts down to those that
  738. # haven't previously seen the user
  739. state = await self.current_state_for_user(user_id)
  740. hosts = await self.state.get_current_hosts_in_room(room_id)
  741. # Filter out ourselves.
  742. hosts = {host for host in hosts if host != self.server_name}
  743. self.federation.send_presence_to_destinations(
  744. states=[state], destinations=hosts
  745. )
  746. else:
  747. # A remote user has joined the room, so we need to:
  748. # 1. Check if this is a new server in the room
  749. # 2. If so send any presence they don't already have for
  750. # local users in the room.
  751. # TODO: We should be able to filter the users down to those that
  752. # the server hasn't previously seen
  753. # TODO: Check that this is actually a new server joining the
  754. # room.
  755. users = await self.state.get_current_users_in_room(room_id)
  756. user_ids = list(filter(self.is_mine_id, users))
  757. states_d = await self.current_state_for_users(user_ids)
  758. # Filter out old presence, i.e. offline presence states where
  759. # the user hasn't been active for a week. We can change this
  760. # depending on what we want the UX to be, but at the least we
  761. # should filter out offline presence where the state is just the
  762. # default state.
  763. now = self.clock.time_msec()
  764. states = [
  765. state
  766. for state in states_d.values()
  767. if state.state != PresenceState.OFFLINE
  768. or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
  769. or state.status_msg is not None
  770. ]
  771. if states:
  772. self.federation.send_presence_to_destinations(
  773. states=states, destinations=[get_domain_from_id(user_id)]
  774. )
  775. def should_notify(old_state, new_state):
  776. """Decides if a presence state change should be sent to interested parties.
  777. """
  778. if old_state == new_state:
  779. return False
  780. if old_state.status_msg != new_state.status_msg:
  781. notify_reason_counter.labels("status_msg_change").inc()
  782. return True
  783. if old_state.state != new_state.state:
  784. notify_reason_counter.labels("state_change").inc()
  785. state_transition_counter.labels(old_state.state, new_state.state).inc()
  786. return True
  787. if old_state.state == PresenceState.ONLINE:
  788. if new_state.currently_active != old_state.currently_active:
  789. notify_reason_counter.labels("current_active_change").inc()
  790. return True
  791. if (
  792. new_state.last_active_ts - old_state.last_active_ts
  793. > LAST_ACTIVE_GRANULARITY
  794. ):
  795. # Only notify about last active bumps if we're not currently acive
  796. if not new_state.currently_active:
  797. notify_reason_counter.labels("last_active_change_online").inc()
  798. return True
  799. elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
  800. # Always notify for a transition where last active gets bumped.
  801. notify_reason_counter.labels("last_active_change_not_online").inc()
  802. return True
  803. return False
  804. def format_user_presence_state(state, now, include_user_id=True):
  805. """Convert UserPresenceState to a format that can be sent down to clients
  806. and to other servers.
  807. The "user_id" is optional so that this function can be used to format presence
  808. updates for client /sync responses and for federation /send requests.
  809. """
  810. content = {"presence": state.state}
  811. if include_user_id:
  812. content["user_id"] = state.user_id
  813. if state.last_active_ts:
  814. content["last_active_ago"] = now - state.last_active_ts
  815. if state.status_msg and state.state != PresenceState.OFFLINE:
  816. content["status_msg"] = state.status_msg
  817. if state.state == PresenceState.ONLINE:
  818. content["currently_active"] = state.currently_active
  819. return content
  820. class PresenceEventSource:
  821. def __init__(self, hs):
  822. # We can't call get_presence_handler here because there's a cycle:
  823. #
  824. # Presence -> Notifier -> PresenceEventSource -> Presence
  825. #
  826. self.get_presence_handler = hs.get_presence_handler
  827. self.clock = hs.get_clock()
  828. self.store = hs.get_datastore()
  829. self.state = hs.get_state_handler()
  830. @log_function
  831. async def get_new_events(
  832. self,
  833. user,
  834. from_key,
  835. room_ids=None,
  836. include_offline=True,
  837. explicit_room_id=None,
  838. **kwargs
  839. ):
  840. # The process for getting presence events are:
  841. # 1. Get the rooms the user is in.
  842. # 2. Get the list of user in the rooms.
  843. # 3. Get the list of users that are in the user's presence list.
  844. # 4. If there is a from_key set, cross reference the list of users
  845. # with the `presence_stream_cache` to see which ones we actually
  846. # need to check.
  847. # 5. Load current state for the users.
  848. #
  849. # We don't try and limit the presence updates by the current token, as
  850. # sending down the rare duplicate is not a concern.
  851. with Measure(self.clock, "presence.get_new_events"):
  852. if from_key is not None:
  853. from_key = int(from_key)
  854. max_token = self.store.get_current_presence_token()
  855. if from_key == max_token:
  856. # This is necessary as due to the way stream ID generators work
  857. # we may get updates that have a stream ID greater than the max
  858. # token (e.g. max_token is N but stream generator may return
  859. # results for N+2, due to N+1 not having finished being
  860. # persisted yet).
  861. #
  862. # This is usually fine, as it just means that we may send down
  863. # some presence updates multiple times. However, we need to be
  864. # careful that the sync stream either actually does make some
  865. # progress or doesn't return, otherwise clients will end up
  866. # tight looping calling /sync due to it immediately returning
  867. # the same token repeatedly.
  868. #
  869. # Hence this guard where we just return nothing so that the sync
  870. # doesn't return. C.f. #5503.
  871. return [], max_token
  872. presence = self.get_presence_handler()
  873. stream_change_cache = self.store.presence_stream_cache
  874. users_interested_in = await self._get_interested_in(user, explicit_room_id)
  875. user_ids_changed = set()
  876. changed = None
  877. if from_key:
  878. changed = stream_change_cache.get_all_entities_changed(from_key)
  879. if changed is not None and len(changed) < 500:
  880. # For small deltas, its quicker to get all changes and then
  881. # work out if we share a room or they're in our presence list
  882. get_updates_counter.labels("stream").inc()
  883. for other_user_id in changed:
  884. if other_user_id in users_interested_in:
  885. user_ids_changed.add(other_user_id)
  886. else:
  887. # Too many possible updates. Find all users we can see and check
  888. # if any of them have changed.
  889. get_updates_counter.labels("full").inc()
  890. if from_key:
  891. user_ids_changed = stream_change_cache.get_entities_changed(
  892. users_interested_in, from_key
  893. )
  894. else:
  895. user_ids_changed = users_interested_in
  896. updates = await presence.current_state_for_users(user_ids_changed)
  897. if include_offline:
  898. return (list(updates.values()), max_token)
  899. else:
  900. return (
  901. [s for s in updates.values() if s.state != PresenceState.OFFLINE],
  902. max_token,
  903. )
  904. def get_current_key(self):
  905. return self.store.get_current_presence_token()
  906. async def get_pagination_rows(self, user, pagination_config, key):
  907. return await self.get_new_events(user, from_key=None, include_offline=False)
  908. @cached(num_args=2, cache_context=True)
  909. async def _get_interested_in(self, user, explicit_room_id, cache_context):
  910. """Returns the set of users that the given user should see presence
  911. updates for
  912. """
  913. user_id = user.to_string()
  914. users_interested_in = set()
  915. users_interested_in.add(user_id) # So that we receive our own presence
  916. users_who_share_room = await self.store.get_users_who_share_room_with_user(
  917. user_id, on_invalidate=cache_context.invalidate
  918. )
  919. users_interested_in.update(users_who_share_room)
  920. if explicit_room_id:
  921. user_ids = await self.store.get_users_in_room(
  922. explicit_room_id, on_invalidate=cache_context.invalidate
  923. )
  924. users_interested_in.update(user_ids)
  925. return users_interested_in
  926. def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
  927. """Checks the presence of users that have timed out and updates as
  928. appropriate.
  929. Args:
  930. user_states(list): List of UserPresenceState's to check.
  931. is_mine_fn (fn): Function that returns if a user_id is ours
  932. syncing_user_ids (set): Set of user_ids with active syncs.
  933. now (int): Current time in ms.
  934. Returns:
  935. List of UserPresenceState updates
  936. """
  937. changes = {} # Actual changes we need to notify people about
  938. for state in user_states:
  939. is_mine = is_mine_fn(state.user_id)
  940. new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
  941. if new_state:
  942. changes[state.user_id] = new_state
  943. return list(changes.values())
  944. def handle_timeout(state, is_mine, syncing_user_ids, now):
  945. """Checks the presence of the user to see if any of the timers have elapsed
  946. Args:
  947. state (UserPresenceState)
  948. is_mine (bool): Whether the user is ours
  949. syncing_user_ids (set): Set of user_ids with active syncs.
  950. now (int): Current time in ms.
  951. Returns:
  952. A UserPresenceState update or None if no update.
  953. """
  954. if state.state == PresenceState.OFFLINE:
  955. # No timeouts are associated with offline states.
  956. return None
  957. changed = False
  958. user_id = state.user_id
  959. if is_mine:
  960. if state.state == PresenceState.ONLINE:
  961. if now - state.last_active_ts > IDLE_TIMER:
  962. # Currently online, but last activity ages ago so auto
  963. # idle
  964. state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
  965. changed = True
  966. elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
  967. # So that we send down a notification that we've
  968. # stopped updating.
  969. changed = True
  970. if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
  971. # Need to send ping to other servers to ensure they don't
  972. # timeout and set us to offline
  973. changed = True
  974. # If there are have been no sync for a while (and none ongoing),
  975. # set presence to offline
  976. if user_id not in syncing_user_ids:
  977. # If the user has done something recently but hasn't synced,
  978. # don't set them as offline.
  979. sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
  980. if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
  981. state = state.copy_and_replace(
  982. state=PresenceState.OFFLINE, status_msg=None
  983. )
  984. changed = True
  985. else:
  986. # We expect to be poked occasionally by the other side.
  987. # This is to protect against forgetful/buggy servers, so that
  988. # no one gets stuck online forever.
  989. if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
  990. # The other side seems to have disappeared.
  991. state = state.copy_and_replace(state=PresenceState.OFFLINE, status_msg=None)
  992. changed = True
  993. return state if changed else None
  994. def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
  995. """Given a presence update:
  996. 1. Add any appropriate timers.
  997. 2. Check if we should notify anyone.
  998. Args:
  999. prev_state (UserPresenceState)
  1000. new_state (UserPresenceState)
  1001. is_mine (bool): Whether the user is ours
  1002. wheel_timer (WheelTimer)
  1003. now (int): Time now in ms
  1004. Returns:
  1005. 3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
  1006. - new_state: is the state to actually persist
  1007. - persist_and_notify (bool): whether to persist and notify people
  1008. - federation_ping (bool): whether we should send a ping over federation
  1009. """
  1010. user_id = new_state.user_id
  1011. persist_and_notify = False
  1012. federation_ping = False
  1013. # If the users are ours then we want to set up a bunch of timers
  1014. # to time things out.
  1015. if is_mine:
  1016. if new_state.state == PresenceState.ONLINE:
  1017. # Idle timer
  1018. wheel_timer.insert(
  1019. now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER
  1020. )
  1021. active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
  1022. new_state = new_state.copy_and_replace(currently_active=active)
  1023. if active:
  1024. wheel_timer.insert(
  1025. now=now,
  1026. obj=user_id,
  1027. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  1028. )
  1029. if new_state.state != PresenceState.OFFLINE:
  1030. # User has stopped syncing
  1031. wheel_timer.insert(
  1032. now=now,
  1033. obj=user_id,
  1034. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  1035. )
  1036. last_federate = new_state.last_federation_update_ts
  1037. if now - last_federate > FEDERATION_PING_INTERVAL:
  1038. # Been a while since we've poked remote servers
  1039. new_state = new_state.copy_and_replace(last_federation_update_ts=now)
  1040. federation_ping = True
  1041. else:
  1042. wheel_timer.insert(
  1043. now=now,
  1044. obj=user_id,
  1045. then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
  1046. )
  1047. # Check whether the change was something worth notifying about
  1048. if should_notify(prev_state, new_state):
  1049. new_state = new_state.copy_and_replace(last_federation_update_ts=now)
  1050. persist_and_notify = True
  1051. return new_state, persist_and_notify, federation_ping
  1052. async def get_interested_parties(
  1053. store: DataStore, states: List[UserPresenceState]
  1054. ) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]:
  1055. """Given a list of states return which entities (rooms, users)
  1056. are interested in the given states.
  1057. Args:
  1058. store
  1059. states
  1060. Returns:
  1061. A 2-tuple of `(room_ids_to_states, users_to_states)`,
  1062. with each item being a dict of `entity_name` -> `[UserPresenceState]`
  1063. """
  1064. room_ids_to_states = {} # type: Dict[str, List[UserPresenceState]]
  1065. users_to_states = {} # type: Dict[str, List[UserPresenceState]]
  1066. for state in states:
  1067. room_ids = await store.get_rooms_for_user(state.user_id)
  1068. for room_id in room_ids:
  1069. room_ids_to_states.setdefault(room_id, []).append(state)
  1070. # Always notify self
  1071. users_to_states.setdefault(state.user_id, []).append(state)
  1072. return room_ids_to_states, users_to_states
  1073. async def get_interested_remotes(
  1074. store: DataStore, states: List[UserPresenceState], state_handler: StateHandler
  1075. ) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
  1076. """Given a list of presence states figure out which remote servers
  1077. should be sent which.
  1078. All the presence states should be for local users only.
  1079. Args:
  1080. store
  1081. states
  1082. state_handler
  1083. Returns:
  1084. A list of 2-tuples of destinations and states, where for
  1085. each tuple the list of UserPresenceState should be sent to each
  1086. destination
  1087. """
  1088. hosts_and_states = [] # type: List[Tuple[Collection[str], List[UserPresenceState]]]
  1089. # First we look up the rooms each user is in (as well as any explicit
  1090. # subscriptions), then for each distinct room we look up the remote
  1091. # hosts in those rooms.
  1092. room_ids_to_states, users_to_states = await get_interested_parties(store, states)
  1093. for room_id, states in room_ids_to_states.items():
  1094. hosts = await state_handler.get_current_hosts_in_room(room_id)
  1095. hosts_and_states.append((hosts, states))
  1096. for user_id, states in users_to_states.items():
  1097. host = get_domain_from_id(user_id)
  1098. hosts_and_states.append(([host], states))
  1099. return hosts_and_states