123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549 |
- # -*- coding: utf-8 -*-
- # Copyright 2014 - 2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- from collections import namedtuple
- from typing import Callable, List
- from prometheus_client import Counter
- from twisted.internet import defer
- import synapse.server
- from synapse.api.constants import EventTypes, Membership
- from synapse.api.errors import AuthError
- from synapse.handlers.presence import format_user_presence_state
- from synapse.logging.context import PreserveLoggingContext
- from synapse.logging.utils import log_function
- from synapse.metrics import LaterGauge
- from synapse.metrics.background_process_metrics import run_as_background_process
- from synapse.types import StreamToken
- from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
- from synapse.util.metrics import Measure
- from synapse.visibility import filter_events_for_client
- logger = logging.getLogger(__name__)
- notified_events_counter = Counter("synapse_notifier_notified_events", "")
- users_woken_by_stream_counter = Counter(
- "synapse_notifier_users_woken_by_stream", "", ["stream"]
- )
- # TODO(paul): Should be shared somewhere
- def count(func, l):
- """Return the number of items in l for which func returns true."""
- n = 0
- for x in l:
- if func(x):
- n += 1
- return n
- class _NotificationListener(object):
- """ This represents a single client connection to the events stream.
- The events stream handler will have yielded to the deferred, so to
- notify the handler it is sufficient to resolve the deferred.
- """
- __slots__ = ["deferred"]
- def __init__(self, deferred):
- self.deferred = deferred
- class _NotifierUserStream(object):
- """This represents a user connected to the event stream.
- It tracks the most recent stream token for that user.
- At a given point a user may have a number of streams listening for
- events.
- This listener will also keep track of which rooms it is listening in
- so that it can remove itself from the indexes in the Notifier class.
- """
- def __init__(self, user_id, rooms, current_token, time_now_ms):
- self.user_id = user_id
- self.rooms = set(rooms)
- self.current_token = current_token
- # The last token for which we should wake up any streams that have a
- # token that comes before it. This gets updated everytime we get poked.
- # We start it at the current token since if we get any streams
- # that have a token from before we have no idea whether they should be
- # woken up or not, so lets just wake them up.
- self.last_notified_token = current_token
- self.last_notified_ms = time_now_ms
- with PreserveLoggingContext():
- self.notify_deferred = ObservableDeferred(defer.Deferred())
- def notify(self, stream_key, stream_id, time_now_ms):
- """Notify any listeners for this user of a new event from an
- event source.
- Args:
- stream_key(str): The stream the event came from.
- stream_id(str): The new id for the stream the event came from.
- time_now_ms(int): The current time in milliseconds.
- """
- self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
- self.last_notified_token = self.current_token
- self.last_notified_ms = time_now_ms
- noify_deferred = self.notify_deferred
- users_woken_by_stream_counter.labels(stream_key).inc()
- with PreserveLoggingContext():
- self.notify_deferred = ObservableDeferred(defer.Deferred())
- noify_deferred.callback(self.current_token)
- def remove(self, notifier):
- """ Remove this listener from all the indexes in the Notifier
- it knows about.
- """
- for room in self.rooms:
- lst = notifier.room_to_user_streams.get(room, set())
- lst.discard(self)
- notifier.user_to_user_stream.pop(self.user_id)
- def count_listeners(self):
- return len(self.notify_deferred.observers())
- def new_listener(self, token):
- """Returns a deferred that is resolved when there is a new token
- greater than the given token.
- Args:
- token: The token from which we are streaming from, i.e. we shouldn't
- notify for things that happened before this.
- """
- # Immediately wake up stream if something has already since happened
- # since their last token.
- if self.last_notified_token.is_after(token):
- return _NotificationListener(defer.succeed(self.current_token))
- else:
- return _NotificationListener(self.notify_deferred.observe())
- class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
- def __nonzero__(self):
- return bool(self.events)
- __bool__ = __nonzero__ # python3
- class Notifier(object):
- """ This class is responsible for notifying any listeners when there are
- new events available for it.
- Primarily used from the /events stream.
- """
- UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
- def __init__(self, hs: "synapse.server.HomeServer"):
- self.user_to_user_stream = {}
- self.room_to_user_streams = {}
- self.hs = hs
- self.storage = hs.get_storage()
- self.event_sources = hs.get_event_sources()
- self.store = hs.get_datastore()
- self.pending_new_room_events = []
- # Called when there are new things to stream over replication
- self.replication_callbacks = [] # type: List[Callable[[], None]]
- # Called when remote servers have come back online after having been
- # down.
- self.remote_server_up_callbacks = [] # type: List[Callable[[str], None]]
- self.clock = hs.get_clock()
- self.appservice_handler = hs.get_application_service_handler()
- if hs.should_send_federation():
- self.federation_sender = hs.get_federation_sender()
- else:
- self.federation_sender = None
- self.state_handler = hs.get_state_handler()
- self.clock.looping_call(
- self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
- )
- # This is not a very cheap test to perform, but it's only executed
- # when rendering the metrics page, which is likely once per minute at
- # most when scraping it.
- def count_listeners():
- all_user_streams = set()
- for x in list(self.room_to_user_streams.values()):
- all_user_streams |= x
- for x in list(self.user_to_user_stream.values()):
- all_user_streams.add(x)
- return sum(stream.count_listeners() for stream in all_user_streams)
- LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
- LaterGauge(
- "synapse_notifier_rooms",
- "",
- [],
- lambda: count(bool, list(self.room_to_user_streams.values())),
- )
- LaterGauge(
- "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
- )
- def add_replication_callback(self, cb: Callable[[], None]):
- """Add a callback that will be called when some new data is available.
- Callback is not given any arguments. It should *not* return a Deferred - if
- it needs to do any asynchronous work, a background thread should be started and
- wrapped with run_as_background_process.
- """
- self.replication_callbacks.append(cb)
- def add_remote_server_up_callback(self, cb: Callable[[str], None]):
- """Add a callback that will be called when synapse detects a server
- has been
- """
- self.remote_server_up_callbacks.append(cb)
- def on_new_room_event(
- self, event, room_stream_id, max_room_stream_id, extra_users=[]
- ):
- """ Used by handlers to inform the notifier something has happened
- in the room, room event wise.
- This triggers the notifier to wake up any listeners that are
- listening to the room, and any listeners for the users in the
- `extra_users` param.
- The events can be peristed out of order. The notifier will wait
- until all previous events have been persisted before notifying
- the client streams.
- """
- self.pending_new_room_events.append((room_stream_id, event, extra_users))
- self._notify_pending_new_room_events(max_room_stream_id)
- self.notify_replication()
- def _notify_pending_new_room_events(self, max_room_stream_id):
- """Notify for the room events that were queued waiting for a previous
- event to be persisted.
- Args:
- max_room_stream_id(int): The highest stream_id below which all
- events have been persisted.
- """
- pending = self.pending_new_room_events
- self.pending_new_room_events = []
- for room_stream_id, event, extra_users in pending:
- if room_stream_id > max_room_stream_id:
- self.pending_new_room_events.append(
- (room_stream_id, event, extra_users)
- )
- else:
- self._on_new_room_event(event, room_stream_id, extra_users)
- def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
- """Notify any user streams that are interested in this room event"""
- # poke any interested application service.
- run_as_background_process(
- "notify_app_services", self._notify_app_services, room_stream_id
- )
- if self.federation_sender:
- self.federation_sender.notify_new_events(room_stream_id)
- if event.type == EventTypes.Member and event.membership == Membership.JOIN:
- self._user_joined_room(event.state_key, event.room_id)
- self.on_new_event(
- "room_key", room_stream_id, users=extra_users, rooms=[event.room_id]
- )
- @defer.inlineCallbacks
- def _notify_app_services(self, room_stream_id):
- try:
- yield self.appservice_handler.notify_interested_services(room_stream_id)
- except Exception:
- logger.exception("Error notifying application services of event")
- def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
- """ Used to inform listeners that something has happened event wise.
- Will wake up all listeners for the given users and rooms.
- """
- with PreserveLoggingContext():
- with Measure(self.clock, "on_new_event"):
- user_streams = set()
- for user in users:
- user_stream = self.user_to_user_stream.get(str(user))
- if user_stream is not None:
- user_streams.add(user_stream)
- for room in rooms:
- user_streams |= self.room_to_user_streams.get(room, set())
- time_now_ms = self.clock.time_msec()
- for user_stream in user_streams:
- try:
- user_stream.notify(stream_key, new_token, time_now_ms)
- except Exception:
- logger.exception("Failed to notify listener")
- self.notify_replication()
- def on_new_replication_data(self):
- """Used to inform replication listeners that something has happend
- without waking up any of the normal user event streams"""
- self.notify_replication()
- async def wait_for_events(
- self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START
- ):
- """Wait until the callback returns a non empty response or the
- timeout fires.
- """
- user_stream = self.user_to_user_stream.get(user_id)
- if user_stream is None:
- current_token = await self.event_sources.get_current_token()
- if room_ids is None:
- room_ids = await self.store.get_rooms_for_user(user_id)
- user_stream = _NotifierUserStream(
- user_id=user_id,
- rooms=room_ids,
- current_token=current_token,
- time_now_ms=self.clock.time_msec(),
- )
- self._register_with_keys(user_stream)
- result = None
- prev_token = from_token
- if timeout:
- end_time = self.clock.time_msec() + timeout
- while not result:
- try:
- now = self.clock.time_msec()
- if end_time <= now:
- break
- # Now we wait for the _NotifierUserStream to be told there
- # is a new token.
- listener = user_stream.new_listener(prev_token)
- listener.deferred = timeout_deferred(
- listener.deferred,
- (end_time - now) / 1000.0,
- self.hs.get_reactor(),
- )
- with PreserveLoggingContext():
- await listener.deferred
- current_token = user_stream.current_token
- result = await callback(prev_token, current_token)
- if result:
- break
- # Update the prev_token to the current_token since nothing
- # has happened between the old prev_token and the current_token
- prev_token = current_token
- except defer.TimeoutError:
- break
- except defer.CancelledError:
- break
- if result is None:
- # This happened if there was no timeout or if the timeout had
- # already expired.
- current_token = user_stream.current_token
- result = await callback(prev_token, current_token)
- return result
- async def get_events_for(
- self,
- user,
- pagination_config,
- timeout,
- only_keys=None,
- is_guest=False,
- explicit_room_id=None,
- ):
- """ For the given user and rooms, return any new events for them. If
- there are no new events wait for up to `timeout` milliseconds for any
- new events to happen before returning.
- If `only_keys` is not None, events from keys will be sent down.
- If explicit_room_id is not set, the user's joined rooms will be polled
- for events.
- If explicit_room_id is set, that room will be polled for events only if
- it is world readable or the user has joined the room.
- """
- from_token = pagination_config.from_token
- if not from_token:
- from_token = await self.event_sources.get_current_token()
- limit = pagination_config.limit
- room_ids, is_joined = await self._get_room_ids(user, explicit_room_id)
- is_peeking = not is_joined
- async def check_for_updates(before_token, after_token):
- if not after_token.is_after(before_token):
- return EventStreamResult([], (from_token, from_token))
- events = []
- end_token = from_token
- for name, source in self.event_sources.sources.items():
- keyname = "%s_key" % name
- before_id = getattr(before_token, keyname)
- after_id = getattr(after_token, keyname)
- if before_id == after_id:
- continue
- if only_keys and name not in only_keys:
- continue
- new_events, new_key = await source.get_new_events(
- user=user,
- from_key=getattr(from_token, keyname),
- limit=limit,
- is_guest=is_peeking,
- room_ids=room_ids,
- explicit_room_id=explicit_room_id,
- )
- if name == "room":
- new_events = await filter_events_for_client(
- self.storage,
- user.to_string(),
- new_events,
- is_peeking=is_peeking,
- )
- elif name == "presence":
- now = self.clock.time_msec()
- new_events[:] = [
- {
- "type": "m.presence",
- "content": format_user_presence_state(event, now),
- }
- for event in new_events
- ]
- events.extend(new_events)
- end_token = end_token.copy_and_replace(keyname, new_key)
- return EventStreamResult(events, (from_token, end_token))
- user_id_for_stream = user.to_string()
- if is_peeking:
- # Internally, the notifier keeps an event stream per user_id.
- # This is used by both /sync and /events.
- # We want /events to be used for peeking independently of /sync,
- # without polluting its contents. So we invent an illegal user ID
- # (which thus cannot clash with any real users) for keying peeking
- # over /events.
- #
- # I am sorry for what I have done.
- user_id_for_stream = "_PEEKING_%s_%s" % (
- explicit_room_id,
- user_id_for_stream,
- )
- result = await self.wait_for_events(
- user_id_for_stream,
- timeout,
- check_for_updates,
- room_ids=room_ids,
- from_token=from_token,
- )
- return result
- @defer.inlineCallbacks
- def _get_room_ids(self, user, explicit_room_id):
- joined_room_ids = yield self.store.get_rooms_for_user(user.to_string())
- if explicit_room_id:
- if explicit_room_id in joined_room_ids:
- return [explicit_room_id], True
- if (yield self._is_world_readable(explicit_room_id)):
- return [explicit_room_id], False
- raise AuthError(403, "Non-joined access not allowed")
- return joined_room_ids, True
- @defer.inlineCallbacks
- def _is_world_readable(self, room_id):
- state = yield self.state_handler.get_current_state(
- room_id, EventTypes.RoomHistoryVisibility, ""
- )
- if state and "history_visibility" in state.content:
- return state.content["history_visibility"] == "world_readable"
- else:
- return False
- @log_function
- def remove_expired_streams(self):
- time_now_ms = self.clock.time_msec()
- expired_streams = []
- expire_before_ts = time_now_ms - self.UNUSED_STREAM_EXPIRY_MS
- for stream in self.user_to_user_stream.values():
- if stream.count_listeners():
- continue
- if stream.last_notified_ms < expire_before_ts:
- expired_streams.append(stream)
- for expired_stream in expired_streams:
- expired_stream.remove(self)
- @log_function
- def _register_with_keys(self, user_stream):
- self.user_to_user_stream[user_stream.user_id] = user_stream
- for room in user_stream.rooms:
- s = self.room_to_user_streams.setdefault(room, set())
- s.add(user_stream)
- def _user_joined_room(self, user_id, room_id):
- new_user_stream = self.user_to_user_stream.get(user_id)
- if new_user_stream is not None:
- room_streams = self.room_to_user_streams.setdefault(room_id, set())
- room_streams.add(new_user_stream)
- new_user_stream.rooms.add(room_id)
- def notify_replication(self):
- """Notify the any replication listeners that there's a new event"""
- for cb in self.replication_callbacks:
- cb()
- def notify_remote_server_up(self, server: str):
- """Notify any replication that a remote server has come back up
- """
- # We call federation_sender directly rather than registering as a
- # callback as a) we already have a reference to it and b) it introduces
- # circular dependencies.
- if self.federation_sender:
- self.federation_sender.wake_destination(server)
- for cb in self.remote_server_up_callbacks:
- cb(server)
|