presence.py 47 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """This module is responsible for keeping track of presence status of local
  16. and remote users.
  17. The methods that define policy are:
  18. - PresenceHandler._update_states
  19. - PresenceHandler._handle_timeouts
  20. - should_notify
  21. """
  22. import logging
  23. from contextlib import contextmanager
  24. from six import iteritems, itervalues
  25. from prometheus_client import Counter
  26. from twisted.internet import defer
  27. import synapse.metrics
  28. from synapse.api.constants import EventTypes, Membership, PresenceState
  29. from synapse.api.errors import SynapseError
  30. from synapse.logging.context import run_in_background
  31. from synapse.logging.utils import log_function
  32. from synapse.metrics import LaterGauge
  33. from synapse.metrics.background_process_metrics import run_as_background_process
  34. from synapse.storage.presence import UserPresenceState
  35. from synapse.types import UserID, get_domain_from_id
  36. from synapse.util.async_helpers import Linearizer
  37. from synapse.util.caches.descriptors import cachedInlineCallbacks
  38. from synapse.util.metrics import Measure
  39. from synapse.util.wheel_timer import WheelTimer
  40. logger = logging.getLogger(__name__)
  41. notified_presence_counter = Counter("synapse_handler_presence_notified_presence", "")
  42. federation_presence_out_counter = Counter(
  43. "synapse_handler_presence_federation_presence_out", ""
  44. )
  45. presence_updates_counter = Counter("synapse_handler_presence_presence_updates", "")
  46. timers_fired_counter = Counter("synapse_handler_presence_timers_fired", "")
  47. federation_presence_counter = Counter(
  48. "synapse_handler_presence_federation_presence", ""
  49. )
  50. bump_active_time_counter = Counter("synapse_handler_presence_bump_active_time", "")
  51. get_updates_counter = Counter("synapse_handler_presence_get_updates", "", ["type"])
  52. notify_reason_counter = Counter(
  53. "synapse_handler_presence_notify_reason", "", ["reason"]
  54. )
  55. state_transition_counter = Counter(
  56. "synapse_handler_presence_state_transition", "", ["from", "to"]
  57. )
  58. # If a user was last active in the last LAST_ACTIVE_GRANULARITY, consider them
  59. # "currently_active"
  60. LAST_ACTIVE_GRANULARITY = 60 * 1000
  61. # How long to wait until a new /events or /sync request before assuming
  62. # the client has gone.
  63. SYNC_ONLINE_TIMEOUT = 30 * 1000
  64. # How long to wait before marking the user as idle. Compared against last active
  65. IDLE_TIMER = 5 * 60 * 1000
  66. # How often we expect remote servers to resend us presence.
  67. FEDERATION_TIMEOUT = 30 * 60 * 1000
  68. # How often to resend presence to remote servers
  69. FEDERATION_PING_INTERVAL = 25 * 60 * 1000
  70. # How long we will wait before assuming that the syncs from an external process
  71. # are dead.
  72. EXTERNAL_PROCESS_EXPIRY = 5 * 60 * 1000
  73. assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
  74. class PresenceHandler(object):
  75. def __init__(self, hs):
  76. """
  77. Args:
  78. hs (synapse.server.HomeServer):
  79. """
  80. self.hs = hs
  81. self.is_mine = hs.is_mine
  82. self.is_mine_id = hs.is_mine_id
  83. self.server_name = hs.hostname
  84. self.clock = hs.get_clock()
  85. self.store = hs.get_datastore()
  86. self.wheel_timer = WheelTimer()
  87. self.notifier = hs.get_notifier()
  88. self.federation = hs.get_federation_sender()
  89. self.state = hs.get_state_handler()
  90. federation_registry = hs.get_federation_registry()
  91. federation_registry.register_edu_handler("m.presence", self.incoming_presence)
  92. active_presence = self.store.take_presence_startup_info()
  93. # A dictionary of the current state of users. This is prefilled with
  94. # non-offline presence from the DB. We should fetch from the DB if
  95. # we can't find a users presence in here.
  96. self.user_to_current_state = {state.user_id: state for state in active_presence}
  97. LaterGauge(
  98. "synapse_handlers_presence_user_to_current_state_size",
  99. "",
  100. [],
  101. lambda: len(self.user_to_current_state),
  102. )
  103. now = self.clock.time_msec()
  104. for state in active_presence:
  105. self.wheel_timer.insert(
  106. now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER
  107. )
  108. self.wheel_timer.insert(
  109. now=now,
  110. obj=state.user_id,
  111. then=state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  112. )
  113. if self.is_mine_id(state.user_id):
  114. self.wheel_timer.insert(
  115. now=now,
  116. obj=state.user_id,
  117. then=state.last_federation_update_ts + FEDERATION_PING_INTERVAL,
  118. )
  119. else:
  120. self.wheel_timer.insert(
  121. now=now,
  122. obj=state.user_id,
  123. then=state.last_federation_update_ts + FEDERATION_TIMEOUT,
  124. )
  125. # Set of users who have presence in the `user_to_current_state` that
  126. # have not yet been persisted
  127. self.unpersisted_users_changes = set()
  128. hs.get_reactor().addSystemEventTrigger(
  129. "before",
  130. "shutdown",
  131. run_as_background_process,
  132. "presence.on_shutdown",
  133. self._on_shutdown,
  134. )
  135. self.serial_to_user = {}
  136. self._next_serial = 1
  137. # Keeps track of the number of *ongoing* syncs on this process. While
  138. # this is non zero a user will never go offline.
  139. self.user_to_num_current_syncs = {}
  140. # Keeps track of the number of *ongoing* syncs on other processes.
  141. # While any sync is ongoing on another process the user will never
  142. # go offline.
  143. # Each process has a unique identifier and an update frequency. If
  144. # no update is received from that process within the update period then
  145. # we assume that all the sync requests on that process have stopped.
  146. # Stored as a dict from process_id to set of user_id, and a dict of
  147. # process_id to millisecond timestamp last updated.
  148. self.external_process_to_current_syncs = {}
  149. self.external_process_last_updated_ms = {}
  150. self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
  151. # Start a LoopingCall in 30s that fires every 5s.
  152. # The initial delay is to allow disconnected clients a chance to
  153. # reconnect before we treat them as offline.
  154. def run_timeout_handler():
  155. return run_as_background_process(
  156. "handle_presence_timeouts", self._handle_timeouts
  157. )
  158. self.clock.call_later(30, self.clock.looping_call, run_timeout_handler, 5000)
  159. def run_persister():
  160. return run_as_background_process(
  161. "persist_presence_changes", self._persist_unpersisted_changes
  162. )
  163. self.clock.call_later(60, self.clock.looping_call, run_persister, 60 * 1000)
  164. LaterGauge(
  165. "synapse_handlers_presence_wheel_timer_size",
  166. "",
  167. [],
  168. lambda: len(self.wheel_timer),
  169. )
  170. # Used to handle sending of presence to newly joined users/servers
  171. if hs.config.use_presence:
  172. self.notifier.add_replication_callback(self.notify_new_event)
  173. # Presence is best effort and quickly heals itself, so lets just always
  174. # stream from the current state when we restart.
  175. self._event_pos = self.store.get_current_events_token()
  176. self._event_processing = False
  177. @defer.inlineCallbacks
  178. def _on_shutdown(self):
  179. """Gets called when shutting down. This lets us persist any updates that
  180. we haven't yet persisted, e.g. updates that only changes some internal
  181. timers. This allows changes to persist across startup without having to
  182. persist every single change.
  183. If this does not run it simply means that some of the timers will fire
  184. earlier than they should when synapse is restarted. This affect of this
  185. is some spurious presence changes that will self-correct.
  186. """
  187. # If the DB pool has already terminated, don't try updating
  188. if not self.hs.get_db_pool().running:
  189. return
  190. logger.info(
  191. "Performing _on_shutdown. Persisting %d unpersisted changes",
  192. len(self.user_to_current_state),
  193. )
  194. if self.unpersisted_users_changes:
  195. yield self.store.update_presence(
  196. [
  197. self.user_to_current_state[user_id]
  198. for user_id in self.unpersisted_users_changes
  199. ]
  200. )
  201. logger.info("Finished _on_shutdown")
  202. @defer.inlineCallbacks
  203. def _persist_unpersisted_changes(self):
  204. """We periodically persist the unpersisted changes, as otherwise they
  205. may stack up and slow down shutdown times.
  206. """
  207. unpersisted = self.unpersisted_users_changes
  208. self.unpersisted_users_changes = set()
  209. if unpersisted:
  210. logger.info("Persisting %d upersisted presence updates", len(unpersisted))
  211. yield self.store.update_presence(
  212. [self.user_to_current_state[user_id] for user_id in unpersisted]
  213. )
  214. @defer.inlineCallbacks
  215. def _update_states(self, new_states):
  216. """Updates presence of users. Sets the appropriate timeouts. Pokes
  217. the notifier and federation if and only if the changed presence state
  218. should be sent to clients/servers.
  219. """
  220. now = self.clock.time_msec()
  221. with Measure(self.clock, "presence_update_states"):
  222. # NOTE: We purposefully don't yield between now and when we've
  223. # calculated what we want to do with the new states, to avoid races.
  224. to_notify = {} # Changes we want to notify everyone about
  225. to_federation_ping = {} # These need sending keep-alives
  226. # Only bother handling the last presence change for each user
  227. new_states_dict = {}
  228. for new_state in new_states:
  229. new_states_dict[new_state.user_id] = new_state
  230. new_state = new_states_dict.values()
  231. for new_state in new_states:
  232. user_id = new_state.user_id
  233. # Its fine to not hit the database here, as the only thing not in
  234. # the current state cache are OFFLINE states, where the only field
  235. # of interest is last_active which is safe enough to assume is 0
  236. # here.
  237. prev_state = self.user_to_current_state.get(
  238. user_id, UserPresenceState.default(user_id)
  239. )
  240. new_state, should_notify, should_ping = handle_update(
  241. prev_state,
  242. new_state,
  243. is_mine=self.is_mine_id(user_id),
  244. wheel_timer=self.wheel_timer,
  245. now=now,
  246. )
  247. self.user_to_current_state[user_id] = new_state
  248. if should_notify:
  249. to_notify[user_id] = new_state
  250. elif should_ping:
  251. to_federation_ping[user_id] = new_state
  252. # TODO: We should probably ensure there are no races hereafter
  253. presence_updates_counter.inc(len(new_states))
  254. if to_notify:
  255. notified_presence_counter.inc(len(to_notify))
  256. yield self._persist_and_notify(list(to_notify.values()))
  257. self.unpersisted_users_changes |= set(s.user_id for s in new_states)
  258. self.unpersisted_users_changes -= set(to_notify.keys())
  259. to_federation_ping = {
  260. user_id: state
  261. for user_id, state in to_federation_ping.items()
  262. if user_id not in to_notify
  263. }
  264. if to_federation_ping:
  265. federation_presence_out_counter.inc(len(to_federation_ping))
  266. self._push_to_remotes(to_federation_ping.values())
  267. def _handle_timeouts(self):
  268. """Checks the presence of users that have timed out and updates as
  269. appropriate.
  270. """
  271. logger.info("Handling presence timeouts")
  272. now = self.clock.time_msec()
  273. # Fetch the list of users that *may* have timed out. Things may have
  274. # changed since the timeout was set, so we won't necessarily have to
  275. # take any action.
  276. users_to_check = set(self.wheel_timer.fetch(now))
  277. # Check whether the lists of syncing processes from an external
  278. # process have expired.
  279. expired_process_ids = [
  280. process_id
  281. for process_id, last_update in self.external_process_last_updated_ms.items()
  282. if now - last_update > EXTERNAL_PROCESS_EXPIRY
  283. ]
  284. for process_id in expired_process_ids:
  285. users_to_check.update(
  286. self.external_process_last_updated_ms.pop(process_id, ())
  287. )
  288. self.external_process_last_update.pop(process_id)
  289. states = [
  290. self.user_to_current_state.get(user_id, UserPresenceState.default(user_id))
  291. for user_id in users_to_check
  292. ]
  293. timers_fired_counter.inc(len(states))
  294. changes = handle_timeouts(
  295. states,
  296. is_mine_fn=self.is_mine_id,
  297. syncing_user_ids=self.get_currently_syncing_users(),
  298. now=now,
  299. )
  300. return self._update_states(changes)
  301. @defer.inlineCallbacks
  302. def bump_presence_active_time(self, user):
  303. """We've seen the user do something that indicates they're interacting
  304. with the app.
  305. """
  306. # If presence is disabled, no-op
  307. if not self.hs.config.use_presence:
  308. return
  309. user_id = user.to_string()
  310. bump_active_time_counter.inc()
  311. prev_state = yield self.current_state_for_user(user_id)
  312. new_fields = {"last_active_ts": self.clock.time_msec()}
  313. if prev_state.state == PresenceState.UNAVAILABLE:
  314. new_fields["state"] = PresenceState.ONLINE
  315. yield self._update_states([prev_state.copy_and_replace(**new_fields)])
  316. @defer.inlineCallbacks
  317. def user_syncing(self, user_id, affect_presence=True):
  318. """Returns a context manager that should surround any stream requests
  319. from the user.
  320. This allows us to keep track of who is currently streaming and who isn't
  321. without having to have timers outside of this module to avoid flickering
  322. when users disconnect/reconnect.
  323. Args:
  324. user_id (str)
  325. affect_presence (bool): If false this function will be a no-op.
  326. Useful for streams that are not associated with an actual
  327. client that is being used by a user.
  328. """
  329. # Override if it should affect the user's presence, if presence is
  330. # disabled.
  331. if not self.hs.config.use_presence:
  332. affect_presence = False
  333. if affect_presence:
  334. curr_sync = self.user_to_num_current_syncs.get(user_id, 0)
  335. self.user_to_num_current_syncs[user_id] = curr_sync + 1
  336. prev_state = yield self.current_state_for_user(user_id)
  337. if prev_state.state == PresenceState.OFFLINE:
  338. # If they're currently offline then bring them online, otherwise
  339. # just update the last sync times.
  340. yield self._update_states(
  341. [
  342. prev_state.copy_and_replace(
  343. state=PresenceState.ONLINE,
  344. last_active_ts=self.clock.time_msec(),
  345. last_user_sync_ts=self.clock.time_msec(),
  346. )
  347. ]
  348. )
  349. else:
  350. yield self._update_states(
  351. [
  352. prev_state.copy_and_replace(
  353. last_user_sync_ts=self.clock.time_msec()
  354. )
  355. ]
  356. )
  357. @defer.inlineCallbacks
  358. def _end():
  359. try:
  360. self.user_to_num_current_syncs[user_id] -= 1
  361. prev_state = yield self.current_state_for_user(user_id)
  362. yield self._update_states(
  363. [
  364. prev_state.copy_and_replace(
  365. last_user_sync_ts=self.clock.time_msec()
  366. )
  367. ]
  368. )
  369. except Exception:
  370. logger.exception("Error updating presence after sync")
  371. @contextmanager
  372. def _user_syncing():
  373. try:
  374. yield
  375. finally:
  376. if affect_presence:
  377. run_in_background(_end)
  378. return _user_syncing()
  379. def get_currently_syncing_users(self):
  380. """Get the set of user ids that are currently syncing on this HS.
  381. Returns:
  382. set(str): A set of user_id strings.
  383. """
  384. if self.hs.config.use_presence:
  385. syncing_user_ids = {
  386. user_id
  387. for user_id, count in self.user_to_num_current_syncs.items()
  388. if count
  389. }
  390. for user_ids in self.external_process_to_current_syncs.values():
  391. syncing_user_ids.update(user_ids)
  392. return syncing_user_ids
  393. else:
  394. return set()
  395. @defer.inlineCallbacks
  396. def update_external_syncs_row(
  397. self, process_id, user_id, is_syncing, sync_time_msec
  398. ):
  399. """Update the syncing users for an external process as a delta.
  400. Args:
  401. process_id (str): An identifier for the process the users are
  402. syncing against. This allows synapse to process updates
  403. as user start and stop syncing against a given process.
  404. user_id (str): The user who has started or stopped syncing
  405. is_syncing (bool): Whether or not the user is now syncing
  406. sync_time_msec(int): Time in ms when the user was last syncing
  407. """
  408. with (yield self.external_sync_linearizer.queue(process_id)):
  409. prev_state = yield self.current_state_for_user(user_id)
  410. process_presence = self.external_process_to_current_syncs.setdefault(
  411. process_id, set()
  412. )
  413. updates = []
  414. if is_syncing and user_id not in process_presence:
  415. if prev_state.state == PresenceState.OFFLINE:
  416. updates.append(
  417. prev_state.copy_and_replace(
  418. state=PresenceState.ONLINE,
  419. last_active_ts=sync_time_msec,
  420. last_user_sync_ts=sync_time_msec,
  421. )
  422. )
  423. else:
  424. updates.append(
  425. prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
  426. )
  427. process_presence.add(user_id)
  428. elif user_id in process_presence:
  429. updates.append(
  430. prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec)
  431. )
  432. if not is_syncing:
  433. process_presence.discard(user_id)
  434. if updates:
  435. yield self._update_states(updates)
  436. self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
  437. @defer.inlineCallbacks
  438. def update_external_syncs_clear(self, process_id):
  439. """Marks all users that had been marked as syncing by a given process
  440. as offline.
  441. Used when the process has stopped/disappeared.
  442. """
  443. with (yield self.external_sync_linearizer.queue(process_id)):
  444. process_presence = self.external_process_to_current_syncs.pop(
  445. process_id, set()
  446. )
  447. prev_states = yield self.current_state_for_users(process_presence)
  448. time_now_ms = self.clock.time_msec()
  449. yield self._update_states(
  450. [
  451. prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
  452. for prev_state in itervalues(prev_states)
  453. ]
  454. )
  455. self.external_process_last_updated_ms.pop(process_id, None)
  456. @defer.inlineCallbacks
  457. def current_state_for_user(self, user_id):
  458. """Get the current presence state for a user.
  459. """
  460. res = yield self.current_state_for_users([user_id])
  461. return res[user_id]
  462. @defer.inlineCallbacks
  463. def current_state_for_users(self, user_ids):
  464. """Get the current presence state for multiple users.
  465. Returns:
  466. dict: `user_id` -> `UserPresenceState`
  467. """
  468. states = {
  469. user_id: self.user_to_current_state.get(user_id, None)
  470. for user_id in user_ids
  471. }
  472. missing = [user_id for user_id, state in iteritems(states) if not state]
  473. if missing:
  474. # There are things not in our in memory cache. Lets pull them out of
  475. # the database.
  476. res = yield self.store.get_presence_for_users(missing)
  477. states.update(res)
  478. missing = [user_id for user_id, state in iteritems(states) if not state]
  479. if missing:
  480. new = {
  481. user_id: UserPresenceState.default(user_id) for user_id in missing
  482. }
  483. states.update(new)
  484. self.user_to_current_state.update(new)
  485. return states
  486. @defer.inlineCallbacks
  487. def _persist_and_notify(self, states):
  488. """Persist states in the database, poke the notifier and send to
  489. interested remote servers
  490. """
  491. stream_id, max_token = yield self.store.update_presence(states)
  492. parties = yield get_interested_parties(self.store, states)
  493. room_ids_to_states, users_to_states = parties
  494. self.notifier.on_new_event(
  495. "presence_key",
  496. stream_id,
  497. rooms=room_ids_to_states.keys(),
  498. users=[UserID.from_string(u) for u in users_to_states],
  499. )
  500. self._push_to_remotes(states)
  501. @defer.inlineCallbacks
  502. def notify_for_states(self, state, stream_id):
  503. parties = yield get_interested_parties(self.store, [state])
  504. room_ids_to_states, users_to_states = parties
  505. self.notifier.on_new_event(
  506. "presence_key",
  507. stream_id,
  508. rooms=room_ids_to_states.keys(),
  509. users=[UserID.from_string(u) for u in users_to_states],
  510. )
  511. def _push_to_remotes(self, states):
  512. """Sends state updates to remote servers.
  513. Args:
  514. states (list(UserPresenceState))
  515. """
  516. self.federation.send_presence(states)
  517. @defer.inlineCallbacks
  518. def incoming_presence(self, origin, content):
  519. """Called when we receive a `m.presence` EDU from a remote server.
  520. """
  521. now = self.clock.time_msec()
  522. updates = []
  523. for push in content.get("push", []):
  524. # A "push" contains a list of presence that we are probably interested
  525. # in.
  526. # TODO: Actually check if we're interested, rather than blindly
  527. # accepting presence updates.
  528. user_id = push.get("user_id", None)
  529. if not user_id:
  530. logger.info(
  531. "Got presence update from %r with no 'user_id': %r", origin, push
  532. )
  533. continue
  534. if get_domain_from_id(user_id) != origin:
  535. logger.info(
  536. "Got presence update from %r with bad 'user_id': %r",
  537. origin,
  538. user_id,
  539. )
  540. continue
  541. presence_state = push.get("presence", None)
  542. if not presence_state:
  543. logger.info(
  544. "Got presence update from %r with no 'presence_state': %r",
  545. origin,
  546. push,
  547. )
  548. continue
  549. new_fields = {"state": presence_state, "last_federation_update_ts": now}
  550. last_active_ago = push.get("last_active_ago", None)
  551. if last_active_ago is not None:
  552. new_fields["last_active_ts"] = now - last_active_ago
  553. new_fields["status_msg"] = push.get("status_msg", None)
  554. new_fields["currently_active"] = push.get("currently_active", False)
  555. prev_state = yield self.current_state_for_user(user_id)
  556. updates.append(prev_state.copy_and_replace(**new_fields))
  557. if updates:
  558. federation_presence_counter.inc(len(updates))
  559. yield self._update_states(updates)
  560. @defer.inlineCallbacks
  561. def get_state(self, target_user, as_event=False):
  562. results = yield self.get_states([target_user.to_string()], as_event=as_event)
  563. return results[0]
  564. @defer.inlineCallbacks
  565. def get_states(self, target_user_ids, as_event=False):
  566. """Get the presence state for users.
  567. Args:
  568. target_user_ids (list)
  569. as_event (bool): Whether to format it as a client event or not.
  570. Returns:
  571. list
  572. """
  573. updates = yield self.current_state_for_users(target_user_ids)
  574. updates = list(updates.values())
  575. for user_id in set(target_user_ids) - set(u.user_id for u in updates):
  576. updates.append(UserPresenceState.default(user_id))
  577. now = self.clock.time_msec()
  578. if as_event:
  579. return [
  580. {
  581. "type": "m.presence",
  582. "content": format_user_presence_state(state, now),
  583. }
  584. for state in updates
  585. ]
  586. else:
  587. return updates
  588. @defer.inlineCallbacks
  589. def set_state(self, target_user, state, ignore_status_msg=False):
  590. """Set the presence state of the user.
  591. """
  592. status_msg = state.get("status_msg", None)
  593. presence = state["presence"]
  594. valid_presence = (
  595. PresenceState.ONLINE,
  596. PresenceState.UNAVAILABLE,
  597. PresenceState.OFFLINE,
  598. )
  599. if presence not in valid_presence:
  600. raise SynapseError(400, "Invalid presence state")
  601. user_id = target_user.to_string()
  602. prev_state = yield self.current_state_for_user(user_id)
  603. new_fields = {"state": presence}
  604. if not ignore_status_msg:
  605. msg = status_msg if presence != PresenceState.OFFLINE else None
  606. new_fields["status_msg"] = msg
  607. if presence == PresenceState.ONLINE:
  608. new_fields["last_active_ts"] = self.clock.time_msec()
  609. yield self._update_states([prev_state.copy_and_replace(**new_fields)])
  610. @defer.inlineCallbacks
  611. def is_visible(self, observed_user, observer_user):
  612. """Returns whether a user can see another user's presence.
  613. """
  614. observer_room_ids = yield self.store.get_rooms_for_user(
  615. observer_user.to_string()
  616. )
  617. observed_room_ids = yield self.store.get_rooms_for_user(
  618. observed_user.to_string()
  619. )
  620. if observer_room_ids & observed_room_ids:
  621. return True
  622. return False
  623. @defer.inlineCallbacks
  624. def get_all_presence_updates(self, last_id, current_id):
  625. """
  626. Gets a list of presence update rows from between the given stream ids.
  627. Each row has:
  628. - stream_id(str)
  629. - user_id(str)
  630. - state(str)
  631. - last_active_ts(int)
  632. - last_federation_update_ts(int)
  633. - last_user_sync_ts(int)
  634. - status_msg(int)
  635. - currently_active(int)
  636. """
  637. # TODO(markjh): replicate the unpersisted changes.
  638. # This could use the in-memory stores for recent changes.
  639. rows = yield self.store.get_all_presence_updates(last_id, current_id)
  640. return rows
  641. def notify_new_event(self):
  642. """Called when new events have happened. Handles users and servers
  643. joining rooms and require being sent presence.
  644. """
  645. if self._event_processing:
  646. return
  647. @defer.inlineCallbacks
  648. def _process_presence():
  649. assert not self._event_processing
  650. self._event_processing = True
  651. try:
  652. yield self._unsafe_process()
  653. finally:
  654. self._event_processing = False
  655. run_as_background_process("presence.notify_new_event", _process_presence)
  656. @defer.inlineCallbacks
  657. def _unsafe_process(self):
  658. # Loop round handling deltas until we're up to date
  659. while True:
  660. with Measure(self.clock, "presence_delta"):
  661. deltas = yield self.store.get_current_state_deltas(self._event_pos)
  662. if not deltas:
  663. return
  664. yield self._handle_state_delta(deltas)
  665. self._event_pos = deltas[-1]["stream_id"]
  666. # Expose current event processing position to prometheus
  667. synapse.metrics.event_processing_positions.labels("presence").set(
  668. self._event_pos
  669. )
  670. @defer.inlineCallbacks
  671. def _handle_state_delta(self, deltas):
  672. """Process current state deltas to find new joins that need to be
  673. handled.
  674. """
  675. for delta in deltas:
  676. typ = delta["type"]
  677. state_key = delta["state_key"]
  678. room_id = delta["room_id"]
  679. event_id = delta["event_id"]
  680. prev_event_id = delta["prev_event_id"]
  681. logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
  682. if typ != EventTypes.Member:
  683. continue
  684. if event_id is None:
  685. # state has been deleted, so this is not a join. We only care about
  686. # joins.
  687. continue
  688. event = yield self.store.get_event(event_id, allow_none=True)
  689. if not event or event.content.get("membership") != Membership.JOIN:
  690. # We only care about joins
  691. continue
  692. if prev_event_id:
  693. prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
  694. if (
  695. prev_event
  696. and prev_event.content.get("membership") == Membership.JOIN
  697. ):
  698. # Ignore changes to join events.
  699. continue
  700. yield self._on_user_joined_room(room_id, state_key)
  701. @defer.inlineCallbacks
  702. def _on_user_joined_room(self, room_id, user_id):
  703. """Called when we detect a user joining the room via the current state
  704. delta stream.
  705. Args:
  706. room_id (str)
  707. user_id (str)
  708. Returns:
  709. Deferred
  710. """
  711. if self.is_mine_id(user_id):
  712. # If this is a local user then we need to send their presence
  713. # out to hosts in the room (who don't already have it)
  714. # TODO: We should be able to filter the hosts down to those that
  715. # haven't previously seen the user
  716. state = yield self.current_state_for_user(user_id)
  717. hosts = yield self.state.get_current_hosts_in_room(room_id)
  718. # Filter out ourselves.
  719. hosts = set(host for host in hosts if host != self.server_name)
  720. self.federation.send_presence_to_destinations(
  721. states=[state], destinations=hosts
  722. )
  723. else:
  724. # A remote user has joined the room, so we need to:
  725. # 1. Check if this is a new server in the room
  726. # 2. If so send any presence they don't already have for
  727. # local users in the room.
  728. # TODO: We should be able to filter the users down to those that
  729. # the server hasn't previously seen
  730. # TODO: Check that this is actually a new server joining the
  731. # room.
  732. user_ids = yield self.state.get_current_users_in_room(room_id)
  733. user_ids = list(filter(self.is_mine_id, user_ids))
  734. states = yield self.current_state_for_users(user_ids)
  735. # Filter out old presence, i.e. offline presence states where
  736. # the user hasn't been active for a week. We can change this
  737. # depending on what we want the UX to be, but at the least we
  738. # should filter out offline presence where the state is just the
  739. # default state.
  740. now = self.clock.time_msec()
  741. states = [
  742. state
  743. for state in states.values()
  744. if state.state != PresenceState.OFFLINE
  745. or now - state.last_active_ts < 7 * 24 * 60 * 60 * 1000
  746. or state.status_msg is not None
  747. ]
  748. if states:
  749. self.federation.send_presence_to_destinations(
  750. states=states, destinations=[get_domain_from_id(user_id)]
  751. )
  752. def should_notify(old_state, new_state):
  753. """Decides if a presence state change should be sent to interested parties.
  754. """
  755. if old_state == new_state:
  756. return False
  757. if old_state.status_msg != new_state.status_msg:
  758. notify_reason_counter.labels("status_msg_change").inc()
  759. return True
  760. if old_state.state != new_state.state:
  761. notify_reason_counter.labels("state_change").inc()
  762. state_transition_counter.labels(old_state.state, new_state.state).inc()
  763. return True
  764. if old_state.state == PresenceState.ONLINE:
  765. if new_state.currently_active != old_state.currently_active:
  766. notify_reason_counter.labels("current_active_change").inc()
  767. return True
  768. if (
  769. new_state.last_active_ts - old_state.last_active_ts
  770. > LAST_ACTIVE_GRANULARITY
  771. ):
  772. # Only notify about last active bumps if we're not currently acive
  773. if not new_state.currently_active:
  774. notify_reason_counter.labels("last_active_change_online").inc()
  775. return True
  776. elif new_state.last_active_ts - old_state.last_active_ts > LAST_ACTIVE_GRANULARITY:
  777. # Always notify for a transition where last active gets bumped.
  778. notify_reason_counter.labels("last_active_change_not_online").inc()
  779. return True
  780. return False
  781. def format_user_presence_state(state, now, include_user_id=True):
  782. """Convert UserPresenceState to a format that can be sent down to clients
  783. and to other servers.
  784. The "user_id" is optional so that this function can be used to format presence
  785. updates for client /sync responses and for federation /send requests.
  786. """
  787. content = {"presence": state.state}
  788. if include_user_id:
  789. content["user_id"] = state.user_id
  790. if state.last_active_ts:
  791. content["last_active_ago"] = now - state.last_active_ts
  792. if state.status_msg and state.state != PresenceState.OFFLINE:
  793. content["status_msg"] = state.status_msg
  794. if state.state == PresenceState.ONLINE:
  795. content["currently_active"] = state.currently_active
  796. return content
  797. class PresenceEventSource(object):
  798. def __init__(self, hs):
  799. # We can't call get_presence_handler here because there's a cycle:
  800. #
  801. # Presence -> Notifier -> PresenceEventSource -> Presence
  802. #
  803. self.get_presence_handler = hs.get_presence_handler
  804. self.clock = hs.get_clock()
  805. self.store = hs.get_datastore()
  806. self.state = hs.get_state_handler()
  807. @defer.inlineCallbacks
  808. @log_function
  809. def get_new_events(
  810. self,
  811. user,
  812. from_key,
  813. room_ids=None,
  814. include_offline=True,
  815. explicit_room_id=None,
  816. **kwargs
  817. ):
  818. # The process for getting presence events are:
  819. # 1. Get the rooms the user is in.
  820. # 2. Get the list of user in the rooms.
  821. # 3. Get the list of users that are in the user's presence list.
  822. # 4. If there is a from_key set, cross reference the list of users
  823. # with the `presence_stream_cache` to see which ones we actually
  824. # need to check.
  825. # 5. Load current state for the users.
  826. #
  827. # We don't try and limit the presence updates by the current token, as
  828. # sending down the rare duplicate is not a concern.
  829. with Measure(self.clock, "presence.get_new_events"):
  830. if from_key is not None:
  831. from_key = int(from_key)
  832. max_token = self.store.get_current_presence_token()
  833. if from_key == max_token:
  834. # This is necessary as due to the way stream ID generators work
  835. # we may get updates that have a stream ID greater than the max
  836. # token (e.g. max_token is N but stream generator may return
  837. # results for N+2, due to N+1 not having finished being
  838. # persisted yet).
  839. #
  840. # This is usually fine, as it just means that we may send down
  841. # some presence updates multiple times. However, we need to be
  842. # careful that the sync stream either actually does make some
  843. # progress or doesn't return, otherwise clients will end up
  844. # tight looping calling /sync due to it immediately returning
  845. # the same token repeatedly.
  846. #
  847. # Hence this guard where we just return nothing so that the sync
  848. # doesn't return. C.f. #5503.
  849. return ([], max_token)
  850. presence = self.get_presence_handler()
  851. stream_change_cache = self.store.presence_stream_cache
  852. users_interested_in = yield self._get_interested_in(user, explicit_room_id)
  853. user_ids_changed = set()
  854. changed = None
  855. if from_key:
  856. changed = stream_change_cache.get_all_entities_changed(from_key)
  857. if changed is not None and len(changed) < 500:
  858. # For small deltas, its quicker to get all changes and then
  859. # work out if we share a room or they're in our presence list
  860. get_updates_counter.labels("stream").inc()
  861. for other_user_id in changed:
  862. if other_user_id in users_interested_in:
  863. user_ids_changed.add(other_user_id)
  864. else:
  865. # Too many possible updates. Find all users we can see and check
  866. # if any of them have changed.
  867. get_updates_counter.labels("full").inc()
  868. if from_key:
  869. user_ids_changed = stream_change_cache.get_entities_changed(
  870. users_interested_in, from_key
  871. )
  872. else:
  873. user_ids_changed = users_interested_in
  874. updates = yield presence.current_state_for_users(user_ids_changed)
  875. if include_offline:
  876. return (list(updates.values()), max_token)
  877. else:
  878. return (
  879. [s for s in itervalues(updates) if s.state != PresenceState.OFFLINE],
  880. max_token,
  881. )
  882. def get_current_key(self):
  883. return self.store.get_current_presence_token()
  884. def get_pagination_rows(self, user, pagination_config, key):
  885. return self.get_new_events(user, from_key=None, include_offline=False)
  886. @cachedInlineCallbacks(num_args=2, cache_context=True)
  887. def _get_interested_in(self, user, explicit_room_id, cache_context):
  888. """Returns the set of users that the given user should see presence
  889. updates for
  890. """
  891. user_id = user.to_string()
  892. users_interested_in = set()
  893. users_interested_in.add(user_id) # So that we receive our own presence
  894. users_who_share_room = yield self.store.get_users_who_share_room_with_user(
  895. user_id, on_invalidate=cache_context.invalidate
  896. )
  897. users_interested_in.update(users_who_share_room)
  898. if explicit_room_id:
  899. user_ids = yield self.store.get_users_in_room(
  900. explicit_room_id, on_invalidate=cache_context.invalidate
  901. )
  902. users_interested_in.update(user_ids)
  903. return users_interested_in
  904. def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
  905. """Checks the presence of users that have timed out and updates as
  906. appropriate.
  907. Args:
  908. user_states(list): List of UserPresenceState's to check.
  909. is_mine_fn (fn): Function that returns if a user_id is ours
  910. syncing_user_ids (set): Set of user_ids with active syncs.
  911. now (int): Current time in ms.
  912. Returns:
  913. List of UserPresenceState updates
  914. """
  915. changes = {} # Actual changes we need to notify people about
  916. for state in user_states:
  917. is_mine = is_mine_fn(state.user_id)
  918. new_state = handle_timeout(state, is_mine, syncing_user_ids, now)
  919. if new_state:
  920. changes[state.user_id] = new_state
  921. return list(changes.values())
  922. def handle_timeout(state, is_mine, syncing_user_ids, now):
  923. """Checks the presence of the user to see if any of the timers have elapsed
  924. Args:
  925. state (UserPresenceState)
  926. is_mine (bool): Whether the user is ours
  927. syncing_user_ids (set): Set of user_ids with active syncs.
  928. now (int): Current time in ms.
  929. Returns:
  930. A UserPresenceState update or None if no update.
  931. """
  932. if state.state == PresenceState.OFFLINE:
  933. # No timeouts are associated with offline states.
  934. return None
  935. changed = False
  936. user_id = state.user_id
  937. if is_mine:
  938. if state.state == PresenceState.ONLINE:
  939. if now - state.last_active_ts > IDLE_TIMER:
  940. # Currently online, but last activity ages ago so auto
  941. # idle
  942. state = state.copy_and_replace(state=PresenceState.UNAVAILABLE)
  943. changed = True
  944. elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY:
  945. # So that we send down a notification that we've
  946. # stopped updating.
  947. changed = True
  948. if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL:
  949. # Need to send ping to other servers to ensure they don't
  950. # timeout and set us to offline
  951. changed = True
  952. # If there are have been no sync for a while (and none ongoing),
  953. # set presence to offline
  954. if user_id not in syncing_user_ids:
  955. # If the user has done something recently but hasn't synced,
  956. # don't set them as offline.
  957. sync_or_active = max(state.last_user_sync_ts, state.last_active_ts)
  958. if now - sync_or_active > SYNC_ONLINE_TIMEOUT:
  959. state = state.copy_and_replace(
  960. state=PresenceState.OFFLINE, status_msg=None
  961. )
  962. changed = True
  963. else:
  964. # We expect to be poked occasionally by the other side.
  965. # This is to protect against forgetful/buggy servers, so that
  966. # no one gets stuck online forever.
  967. if now - state.last_federation_update_ts > FEDERATION_TIMEOUT:
  968. # The other side seems to have disappeared.
  969. state = state.copy_and_replace(state=PresenceState.OFFLINE, status_msg=None)
  970. changed = True
  971. return state if changed else None
  972. def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
  973. """Given a presence update:
  974. 1. Add any appropriate timers.
  975. 2. Check if we should notify anyone.
  976. Args:
  977. prev_state (UserPresenceState)
  978. new_state (UserPresenceState)
  979. is_mine (bool): Whether the user is ours
  980. wheel_timer (WheelTimer)
  981. now (int): Time now in ms
  982. Returns:
  983. 3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
  984. - new_state: is the state to actually persist
  985. - persist_and_notify (bool): whether to persist and notify people
  986. - federation_ping (bool): whether we should send a ping over federation
  987. """
  988. user_id = new_state.user_id
  989. persist_and_notify = False
  990. federation_ping = False
  991. # If the users are ours then we want to set up a bunch of timers
  992. # to time things out.
  993. if is_mine:
  994. if new_state.state == PresenceState.ONLINE:
  995. # Idle timer
  996. wheel_timer.insert(
  997. now=now, obj=user_id, then=new_state.last_active_ts + IDLE_TIMER
  998. )
  999. active = now - new_state.last_active_ts < LAST_ACTIVE_GRANULARITY
  1000. new_state = new_state.copy_and_replace(currently_active=active)
  1001. if active:
  1002. wheel_timer.insert(
  1003. now=now,
  1004. obj=user_id,
  1005. then=new_state.last_active_ts + LAST_ACTIVE_GRANULARITY,
  1006. )
  1007. if new_state.state != PresenceState.OFFLINE:
  1008. # User has stopped syncing
  1009. wheel_timer.insert(
  1010. now=now,
  1011. obj=user_id,
  1012. then=new_state.last_user_sync_ts + SYNC_ONLINE_TIMEOUT,
  1013. )
  1014. last_federate = new_state.last_federation_update_ts
  1015. if now - last_federate > FEDERATION_PING_INTERVAL:
  1016. # Been a while since we've poked remote servers
  1017. new_state = new_state.copy_and_replace(last_federation_update_ts=now)
  1018. federation_ping = True
  1019. else:
  1020. wheel_timer.insert(
  1021. now=now,
  1022. obj=user_id,
  1023. then=new_state.last_federation_update_ts + FEDERATION_TIMEOUT,
  1024. )
  1025. # Check whether the change was something worth notifying about
  1026. if should_notify(prev_state, new_state):
  1027. new_state = new_state.copy_and_replace(last_federation_update_ts=now)
  1028. persist_and_notify = True
  1029. return new_state, persist_and_notify, federation_ping
  1030. @defer.inlineCallbacks
  1031. def get_interested_parties(store, states):
  1032. """Given a list of states return which entities (rooms, users)
  1033. are interested in the given states.
  1034. Args:
  1035. states (list(UserPresenceState))
  1036. Returns:
  1037. 2-tuple: `(room_ids_to_states, users_to_states)`,
  1038. with each item being a dict of `entity_name` -> `[UserPresenceState]`
  1039. """
  1040. room_ids_to_states = {}
  1041. users_to_states = {}
  1042. for state in states:
  1043. room_ids = yield store.get_rooms_for_user(state.user_id)
  1044. for room_id in room_ids:
  1045. room_ids_to_states.setdefault(room_id, []).append(state)
  1046. # Always notify self
  1047. users_to_states.setdefault(state.user_id, []).append(state)
  1048. return (room_ids_to_states, users_to_states)
  1049. @defer.inlineCallbacks
  1050. def get_interested_remotes(store, states, state_handler):
  1051. """Given a list of presence states figure out which remote servers
  1052. should be sent which.
  1053. All the presence states should be for local users only.
  1054. Args:
  1055. store (DataStore)
  1056. states (list(UserPresenceState))
  1057. Returns:
  1058. Deferred list of ([destinations], [UserPresenceState]), where for
  1059. each row the list of UserPresenceState should be sent to each
  1060. destination
  1061. """
  1062. hosts_and_states = []
  1063. # First we look up the rooms each user is in (as well as any explicit
  1064. # subscriptions), then for each distinct room we look up the remote
  1065. # hosts in those rooms.
  1066. room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
  1067. for room_id, states in iteritems(room_ids_to_states):
  1068. hosts = yield state_handler.get_current_hosts_in_room(room_id)
  1069. hosts_and_states.append((hosts, states))
  1070. for user_id, states in iteritems(users_to_states):
  1071. host = get_domain_from_id(user_id)
  1072. hosts_and_states.append(([host], states))
  1073. return hosts_and_states