123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537 |
- # Copyright 2014-2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- import random
- from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
- import attr
- from synapse.api.constants import EduTypes
- from synapse.api.errors import AuthError, ShadowBanError, SynapseError
- from synapse.appservice import ApplicationService
- from synapse.metrics.background_process_metrics import (
- run_as_background_process,
- wrap_as_background_process,
- )
- from synapse.replication.tcp.streams import TypingStream
- from synapse.streams import EventSource
- from synapse.types import JsonDict, Requester, StreamKeyType, UserID
- from synapse.util.caches.stream_change_cache import StreamChangeCache
- from synapse.util.metrics import Measure
- from synapse.util.wheel_timer import WheelTimer
- if TYPE_CHECKING:
- from synapse.server import HomeServer
- logger = logging.getLogger(__name__)
- # A tiny object useful for storing a user's membership in a room, as a mapping
- # key
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class RoomMember:
- room_id: str
- user_id: str
- # How often we expect remote servers to resend us presence.
- FEDERATION_TIMEOUT = 60 * 1000
- # How often to resend typing across federation.
- FEDERATION_PING_INTERVAL = 40 * 1000
- class FollowerTypingHandler:
- """A typing handler on a different process than the writer that is updated
- via replication.
- """
- def __init__(self, hs: "HomeServer"):
- self.store = hs.get_datastores().main
- self._storage_controllers = hs.get_storage_controllers()
- self.server_name = hs.config.server.server_name
- self.clock = hs.get_clock()
- self.is_mine_id = hs.is_mine_id
- self.federation = None
- if hs.should_send_federation():
- self.federation = hs.get_federation_sender()
- if hs.get_instance_name() not in hs.config.worker.writers.typing:
- hs.get_federation_registry().register_instances_for_edu(
- EduTypes.TYPING,
- hs.config.worker.writers.typing,
- )
- # map room IDs to serial numbers
- self._room_serials: Dict[str, int] = {}
- # map room IDs to sets of users currently typing
- self._room_typing: Dict[str, Set[str]] = {}
- self._member_last_federation_poke: Dict[RoomMember, int] = {}
- self.wheel_timer: WheelTimer[RoomMember] = WheelTimer(bucket_size=5000)
- self._latest_room_serial = 0
- self.clock.looping_call(self._handle_timeouts, 5000)
- def _reset(self) -> None:
- """Reset the typing handler's data caches."""
- # map room IDs to serial numbers
- self._room_serials = {}
- # map room IDs to sets of users currently typing
- self._room_typing = {}
- self._member_last_federation_poke = {}
- self.wheel_timer = WheelTimer(bucket_size=5000)
- @wrap_as_background_process("typing._handle_timeouts")
- async def _handle_timeouts(self) -> None:
- logger.debug("Checking for typing timeouts")
- now = self.clock.time_msec()
- members = set(self.wheel_timer.fetch(now))
- for member in members:
- self._handle_timeout_for_member(now, member)
- def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
- if not self.is_typing(member):
- # Nothing to do if they're no longer typing
- return
- # Check if we need to resend a keep alive over federation for this
- # user.
- if self.federation and self.is_mine_id(member.user_id):
- last_fed_poke = self._member_last_federation_poke.get(member, None)
- if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
- run_as_background_process(
- "typing._push_remote", self._push_remote, member=member, typing=True
- )
- # Add a paranoia timer to ensure that we always have a timer for
- # each person typing.
- self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
- def is_typing(self, member: RoomMember) -> bool:
- return member.user_id in self._room_typing.get(member.room_id, set())
- async def _push_remote(self, member: RoomMember, typing: bool) -> None:
- if not self.federation:
- return
- try:
- self._member_last_federation_poke[member] = self.clock.time_msec()
- now = self.clock.time_msec()
- self.wheel_timer.insert(
- now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
- )
- hosts = await self._storage_controllers.state.get_current_hosts_in_room(
- member.room_id
- )
- for domain in hosts:
- if domain != self.server_name:
- logger.debug("sending typing update to %s", domain)
- self.federation.build_and_send_edu(
- destination=domain,
- edu_type=EduTypes.TYPING,
- content={
- "room_id": member.room_id,
- "user_id": member.user_id,
- "typing": typing,
- },
- key=member,
- )
- except Exception:
- logger.exception("Error pushing typing notif to remotes")
- def process_replication_rows(
- self, token: int, rows: List[TypingStream.TypingStreamRow]
- ) -> None:
- """Should be called whenever we receive updates for typing stream."""
- if self._latest_room_serial > token:
- # The typing worker has gone backwards (e.g. it may have restarted).
- # To prevent inconsistent data, just clear everything.
- logger.info("Typing handler stream went backwards; resetting")
- self._reset()
- # Set the latest serial token to whatever the server gave us.
- self._latest_room_serial = token
- for row in rows:
- self._room_serials[row.room_id] = token
- prev_typing = self._room_typing.get(row.room_id, set())
- now_typing = set(row.user_ids)
- self._room_typing[row.room_id] = now_typing
- if self.federation:
- run_as_background_process(
- "_send_changes_in_typing_to_remotes",
- self._send_changes_in_typing_to_remotes,
- row.room_id,
- prev_typing,
- now_typing,
- )
- async def _send_changes_in_typing_to_remotes(
- self, room_id: str, prev_typing: Set[str], now_typing: Set[str]
- ) -> None:
- """Process a change in typing of a room from replication, sending EDUs
- for any local users.
- """
- if not self.federation:
- return
- for user_id in now_typing - prev_typing:
- if self.is_mine_id(user_id):
- await self._push_remote(RoomMember(room_id, user_id), True)
- for user_id in prev_typing - now_typing:
- if self.is_mine_id(user_id):
- await self._push_remote(RoomMember(room_id, user_id), False)
- def get_current_token(self) -> int:
- return self._latest_room_serial
- class TypingWriterHandler(FollowerTypingHandler):
- def __init__(self, hs: "HomeServer"):
- super().__init__(hs)
- assert hs.get_instance_name() in hs.config.worker.writers.typing
- self.auth = hs.get_auth()
- self.notifier = hs.get_notifier()
- self.event_auth_handler = hs.get_event_auth_handler()
- self.hs = hs
- hs.get_federation_registry().register_edu_handler(
- EduTypes.TYPING, self._recv_edu
- )
- hs.get_distributor().observe("user_left_room", self.user_left_room)
- # clock time we expect to stop
- self._member_typing_until: Dict[RoomMember, int] = {}
- # caches which room_ids changed at which serials
- self._typing_stream_change_cache = StreamChangeCache(
- "TypingStreamChangeCache", self._latest_room_serial
- )
- def _handle_timeout_for_member(self, now: int, member: RoomMember) -> None:
- super()._handle_timeout_for_member(now, member)
- if not self.is_typing(member):
- # Nothing to do if they're no longer typing
- return
- until = self._member_typing_until.get(member, None)
- if not until or until <= now:
- logger.info("Timing out typing for: %s", member.user_id)
- self._stopped_typing(member)
- return
- async def started_typing(
- self, target_user: UserID, requester: Requester, room_id: str, timeout: int
- ) -> None:
- target_user_id = target_user.to_string()
- if not self.is_mine_id(target_user_id):
- raise SynapseError(400, "User is not hosted on this homeserver")
- if target_user != requester.user:
- raise AuthError(400, "Cannot set another user's typing state")
- if requester.shadow_banned:
- # We randomly sleep a bit just to annoy the requester.
- await self.clock.sleep(random.randint(1, 10))
- raise ShadowBanError()
- await self.auth.check_user_in_room(room_id, requester)
- logger.debug("%s has started typing in %s", target_user_id, room_id)
- member = RoomMember(room_id=room_id, user_id=target_user_id)
- was_present = member.user_id in self._room_typing.get(room_id, set())
- now = self.clock.time_msec()
- self._member_typing_until[member] = now + timeout
- self.wheel_timer.insert(now=now, obj=member, then=now + timeout)
- if was_present:
- # No point sending another notification
- return
- self._push_update(member=member, typing=True)
- async def stopped_typing(
- self, target_user: UserID, requester: Requester, room_id: str
- ) -> None:
- target_user_id = target_user.to_string()
- if not self.is_mine_id(target_user_id):
- raise SynapseError(400, "User is not hosted on this homeserver")
- if target_user != requester.user:
- raise AuthError(400, "Cannot set another user's typing state")
- if requester.shadow_banned:
- # We randomly sleep a bit just to annoy the requester.
- await self.clock.sleep(random.randint(1, 10))
- raise ShadowBanError()
- await self.auth.check_user_in_room(room_id, requester)
- logger.debug("%s has stopped typing in %s", target_user_id, room_id)
- member = RoomMember(room_id=room_id, user_id=target_user_id)
- self._stopped_typing(member)
- def user_left_room(self, user: UserID, room_id: str) -> None:
- user_id = user.to_string()
- if self.is_mine_id(user_id):
- member = RoomMember(room_id=room_id, user_id=user_id)
- self._stopped_typing(member)
- def _stopped_typing(self, member: RoomMember) -> None:
- if member.user_id not in self._room_typing.get(member.room_id, set()):
- # No point
- return
- self._member_typing_until.pop(member, None)
- self._member_last_federation_poke.pop(member, None)
- self._push_update(member=member, typing=False)
- def _push_update(self, member: RoomMember, typing: bool) -> None:
- if self.hs.is_mine_id(member.user_id):
- # Only send updates for changes to our own users.
- run_as_background_process(
- "typing._push_remote", self._push_remote, member, typing
- )
- self._push_update_local(member=member, typing=typing)
- async def _recv_edu(self, origin: str, content: JsonDict) -> None:
- room_id = content["room_id"]
- user_id = content["user_id"]
- # If we're not in the room just ditch the event entirely. This is
- # probably an old server that has come back and thinks we're still in
- # the room (or we've been rejoined to the room by a state reset).
- is_in_room = await self.event_auth_handler.is_host_in_room(
- room_id, self.server_name
- )
- if not is_in_room:
- logger.info(
- "Ignoring typing update for room %r from server %s as we're not in the room",
- room_id,
- origin,
- )
- return
- member = RoomMember(user_id=user_id, room_id=room_id)
- # Check that the string is a valid user id
- user = UserID.from_string(user_id)
- if user.domain != origin:
- logger.info(
- "Got typing update from %r with bad 'user_id': %r", origin, user_id
- )
- return
- # Let's check that the origin server is in the room before accepting the typing
- # event. We don't want to block waiting on a partial state so take an
- # approximation if needed.
- domains = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation(
- room_id
- )
- if user.domain in domains:
- logger.info("Got typing update from %s: %r", user_id, content)
- now = self.clock.time_msec()
- self._member_typing_until[member] = now + FEDERATION_TIMEOUT
- self.wheel_timer.insert(now=now, obj=member, then=now + FEDERATION_TIMEOUT)
- self._push_update_local(member=member, typing=content["typing"])
- def _push_update_local(self, member: RoomMember, typing: bool) -> None:
- room_set = self._room_typing.setdefault(member.room_id, set())
- if typing:
- room_set.add(member.user_id)
- else:
- room_set.discard(member.user_id)
- self._latest_room_serial += 1
- self._room_serials[member.room_id] = self._latest_room_serial
- self._typing_stream_change_cache.entity_has_changed(
- member.room_id, self._latest_room_serial
- )
- self.notifier.on_new_event(
- StreamKeyType.TYPING, self._latest_room_serial, rooms=[member.room_id]
- )
- async def get_all_typing_updates(
- self, instance_name: str, last_id: int, current_id: int, limit: int
- ) -> Tuple[List[Tuple[int, list]], int, bool]:
- """Get updates for typing replication stream.
- Args:
- instance_name: The writer we want to fetch updates from. Unused
- here since there is only ever one writer.
- last_id: The token to fetch updates from. Exclusive.
- current_id: The token to fetch updates up to. Inclusive.
- limit: The requested limit for the number of rows to return. The
- function may return more or fewer rows.
- Returns:
- A tuple consisting of: the updates, a token to use to fetch
- subsequent updates, and whether we returned fewer rows than exists
- between the requested tokens due to the limit.
- The token returned can be used in a subsequent call to this
- function to get further updates.
- The updates are a list of 2-tuples of stream ID and the row data
- """
- if last_id == current_id:
- return [], current_id, False
- changed_rooms: Optional[
- Iterable[str]
- ] = self._typing_stream_change_cache.get_all_entities_changed(last_id)
- if changed_rooms is None:
- changed_rooms = self._room_serials
- rows = []
- for room_id in changed_rooms:
- serial = self._room_serials[room_id]
- if last_id < serial <= current_id:
- typing = self._room_typing[room_id]
- rows.append((serial, [room_id, list(typing)]))
- rows.sort()
- limited = False
- # We, unusually, use a strict limit here as we have all the rows in
- # memory rather than pulling them out of the database with a `LIMIT ?`
- # clause.
- if len(rows) > limit:
- rows = rows[:limit]
- current_id = rows[-1][0]
- limited = True
- return rows, current_id, limited
- def process_replication_rows(
- self, token: int, rows: List[TypingStream.TypingStreamRow]
- ) -> None:
- # The writing process should never get updates from replication.
- raise Exception("Typing writer instance got typing info over replication")
- class TypingNotificationEventSource(EventSource[int, JsonDict]):
- def __init__(self, hs: "HomeServer"):
- self._main_store = hs.get_datastores().main
- self.clock = hs.get_clock()
- # We can't call get_typing_handler here because there's a cycle:
- #
- # Typing -> Notifier -> TypingNotificationEventSource -> Typing
- #
- self.get_typing_handler = hs.get_typing_handler
- def _make_event_for(self, room_id: str) -> JsonDict:
- typing = self.get_typing_handler()._room_typing[room_id]
- return {
- "type": EduTypes.TYPING,
- "room_id": room_id,
- "content": {"user_ids": list(typing)},
- }
- async def get_new_events_as(
- self, from_key: int, service: ApplicationService
- ) -> Tuple[List[JsonDict], int]:
- """Returns a set of new typing events that an appservice
- may be interested in.
- Args:
- from_key: the stream position at which events should be fetched from.
- service: The appservice which may be interested.
- Returns:
- A two-tuple containing the following:
- * A list of json dictionaries derived from typing events that the
- appservice may be interested in.
- * The latest known room serial.
- """
- with Measure(self.clock, "typing.get_new_events_as"):
- handler = self.get_typing_handler()
- events = []
- # Work on a copy of things here as these may change in the handler while
- # waiting for the AS `is_interested_in_room` call to complete.
- # Shallow copy is safe as no nested data is present.
- latest_room_serial = handler._latest_room_serial
- room_serials = handler._room_serials.copy()
- for room_id, serial in room_serials.items():
- if serial <= from_key:
- continue
- if not await service.is_interested_in_room(room_id, self._main_store):
- continue
- events.append(self._make_event_for(room_id))
- return events, latest_room_serial
- async def get_new_events(
- self,
- user: UserID,
- from_key: int,
- limit: int,
- room_ids: Iterable[str],
- is_guest: bool,
- explicit_room_id: Optional[str] = None,
- ) -> Tuple[List[JsonDict], int]:
- with Measure(self.clock, "typing.get_new_events"):
- from_key = int(from_key)
- handler = self.get_typing_handler()
- events = []
- for room_id in room_ids:
- if room_id not in handler._room_serials:
- continue
- if handler._room_serials[room_id] <= from_key:
- continue
- events.append(self._make_event_for(room_id))
- return events, handler._latest_room_serial
- def get_current_key(self) -> int:
- return self.get_typing_handler()._latest_room_serial
|