|
@@ -16,7 +16,7 @@ import abc
|
|
|
import logging
|
|
|
import random
|
|
|
from http import HTTPStatus
|
|
|
-from typing import TYPE_CHECKING, Iterable, List, Optional, Set, Tuple
|
|
|
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
|
|
|
|
|
|
from synapse import types
|
|
|
from synapse.api.constants import (
|
|
@@ -38,7 +38,10 @@ from synapse.event_auth import get_named_level, get_power_level_event
|
|
|
from synapse.events import EventBase
|
|
|
from synapse.events.snapshot import EventContext
|
|
|
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
|
|
|
+from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
|
|
|
from synapse.logging import opentracing
|
|
|
+from synapse.metrics import event_processing_positions
|
|
|
+from synapse.metrics.background_process_metrics import run_as_background_process
|
|
|
from synapse.module_api import NOT_SPAM
|
|
|
from synapse.types import (
|
|
|
JsonDict,
|
|
@@ -280,9 +283,25 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
|
|
|
"""
|
|
|
raise NotImplementedError()
|
|
|
|
|
|
- @abc.abstractmethod
|
|
|
async def forget(self, user: UserID, room_id: str) -> None:
|
|
|
- raise NotImplementedError()
|
|
|
+ user_id = user.to_string()
|
|
|
+
|
|
|
+ member = await self._storage_controllers.state.get_current_state_event(
|
|
|
+ room_id=room_id, event_type=EventTypes.Member, state_key=user_id
|
|
|
+ )
|
|
|
+ membership = member.membership if member else None
|
|
|
+
|
|
|
+ if membership is not None and membership not in [
|
|
|
+ Membership.LEAVE,
|
|
|
+ Membership.BAN,
|
|
|
+ ]:
|
|
|
+ raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
|
|
|
+
|
|
|
+ # In normal case this call is only required if `membership` is not `None`.
|
|
|
+ # But: After the last member had left the room, the background update
|
|
|
+ # `_background_remove_left_rooms` is deleting rows related to this room from
|
|
|
+ # the table `current_state_events` and `get_current_state_events` is `None`.
|
|
|
+ await self.store.forget(user_id, room_id)
|
|
|
|
|
|
async def ratelimit_multiple_invites(
|
|
|
self,
|
|
@@ -2046,25 +2065,141 @@ class RoomMemberMasterHandler(RoomMemberHandler):
|
|
|
"""Implements RoomMemberHandler._user_left_room"""
|
|
|
user_left_room(self.distributor, target, room_id)
|
|
|
|
|
|
- async def forget(self, user: UserID, room_id: str) -> None:
|
|
|
- user_id = user.to_string()
|
|
|
|
|
|
- member = await self._storage_controllers.state.get_current_state_event(
|
|
|
- room_id=room_id, event_type=EventTypes.Member, state_key=user_id
|
|
|
- )
|
|
|
- membership = member.membership if member else None
|
|
|
+class RoomForgetterHandler(StateDeltasHandler):
|
|
|
+ """Forgets rooms when they are left, when enabled in the homeserver config.
|
|
|
|
|
|
- if membership is not None and membership not in [
|
|
|
- Membership.LEAVE,
|
|
|
- Membership.BAN,
|
|
|
- ]:
|
|
|
- raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
|
|
|
+ For the purposes of this feature, kicks, bans and "leaves" via state resolution
|
|
|
+ weirdness are all considered to be leaves.
|
|
|
|
|
|
- # In normal case this call is only required if `membership` is not `None`.
|
|
|
- # But: After the last member had left the room, the background update
|
|
|
- # `_background_remove_left_rooms` is deleting rows related to this room from
|
|
|
- # the table `current_state_events` and `get_current_state_events` is `None`.
|
|
|
- await self.store.forget(user_id, room_id)
|
|
|
+ Derived from `StatsHandler` and `UserDirectoryHandler`.
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, hs: "HomeServer"):
|
|
|
+ super().__init__(hs)
|
|
|
+
|
|
|
+ self._hs = hs
|
|
|
+ self._store = hs.get_datastores().main
|
|
|
+ self._storage_controllers = hs.get_storage_controllers()
|
|
|
+ self._clock = hs.get_clock()
|
|
|
+ self._notifier = hs.get_notifier()
|
|
|
+ self._room_member_handler = hs.get_room_member_handler()
|
|
|
+
|
|
|
+ # The current position in the current_state_delta stream
|
|
|
+ self.pos: Optional[int] = None
|
|
|
+
|
|
|
+ # Guard to ensure we only process deltas one at a time
|
|
|
+ self._is_processing = False
|
|
|
+
|
|
|
+ if hs.config.worker.run_background_tasks:
|
|
|
+ self._notifier.add_replication_callback(self.notify_new_event)
|
|
|
+
|
|
|
+ # We kick this off to pick up outstanding work from before the last restart.
|
|
|
+ self._clock.call_later(0, self.notify_new_event)
|
|
|
+
|
|
|
+ def notify_new_event(self) -> None:
|
|
|
+ """Called when there may be more deltas to process"""
|
|
|
+ if self._is_processing:
|
|
|
+ return
|
|
|
+
|
|
|
+ self._is_processing = True
|
|
|
+
|
|
|
+ async def process() -> None:
|
|
|
+ try:
|
|
|
+ await self._unsafe_process()
|
|
|
+ finally:
|
|
|
+ self._is_processing = False
|
|
|
+
|
|
|
+ run_as_background_process("room_forgetter.notify_new_event", process)
|
|
|
+
|
|
|
+ async def _unsafe_process(self) -> None:
|
|
|
+ # If self.pos is None then means we haven't fetched it from DB
|
|
|
+ if self.pos is None:
|
|
|
+ self.pos = await self._store.get_room_forgetter_stream_pos()
|
|
|
+ room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
|
|
+ if self.pos > room_max_stream_ordering:
|
|
|
+ # apparently, we've processed more events than exist in the database!
|
|
|
+ # this can happen if events are removed with history purge or similar.
|
|
|
+ logger.warning(
|
|
|
+ "Event stream ordering appears to have gone backwards (%i -> %i): "
|
|
|
+ "rewinding room forgetter processor",
|
|
|
+ self.pos,
|
|
|
+ room_max_stream_ordering,
|
|
|
+ )
|
|
|
+ self.pos = room_max_stream_ordering
|
|
|
+
|
|
|
+ if not self._hs.config.room.forget_on_leave:
|
|
|
+ # Update the processing position, so that if the server admin turns the
|
|
|
+ # feature on at a later date, we don't decide to forget every room that
|
|
|
+ # has ever been left in the past.
|
|
|
+ self.pos = self._store.get_room_max_stream_ordering()
|
|
|
+ await self._store.update_room_forgetter_stream_pos(self.pos)
|
|
|
+ return
|
|
|
+
|
|
|
+ # Loop round handling deltas until we're up to date
|
|
|
+
|
|
|
+ while True:
|
|
|
+ # Be sure to read the max stream_ordering *before* checking if there are any outstanding
|
|
|
+ # deltas, since there is otherwise a chance that we could miss updates which arrive
|
|
|
+ # after we check the deltas.
|
|
|
+ room_max_stream_ordering = self._store.get_room_max_stream_ordering()
|
|
|
+ if self.pos == room_max_stream_ordering:
|
|
|
+ break
|
|
|
+
|
|
|
+ logger.debug(
|
|
|
+ "Processing room forgetting %s->%s", self.pos, room_max_stream_ordering
|
|
|
+ )
|
|
|
+ (
|
|
|
+ max_pos,
|
|
|
+ deltas,
|
|
|
+ ) = await self._storage_controllers.state.get_current_state_deltas(
|
|
|
+ self.pos, room_max_stream_ordering
|
|
|
+ )
|
|
|
+
|
|
|
+ logger.debug("Handling %d state deltas", len(deltas))
|
|
|
+ await self._handle_deltas(deltas)
|
|
|
+
|
|
|
+ self.pos = max_pos
|
|
|
+
|
|
|
+ # Expose current event processing position to prometheus
|
|
|
+ event_processing_positions.labels("room_forgetter").set(max_pos)
|
|
|
+
|
|
|
+ await self._store.update_room_forgetter_stream_pos(max_pos)
|
|
|
+
|
|
|
+ async def _handle_deltas(self, deltas: List[Dict[str, Any]]) -> None:
|
|
|
+ """Called with the state deltas to process"""
|
|
|
+ for delta in deltas:
|
|
|
+ typ = delta["type"]
|
|
|
+ state_key = delta["state_key"]
|
|
|
+ room_id = delta["room_id"]
|
|
|
+ event_id = delta["event_id"]
|
|
|
+ prev_event_id = delta["prev_event_id"]
|
|
|
+
|
|
|
+ if typ != EventTypes.Member:
|
|
|
+ continue
|
|
|
+
|
|
|
+ if not self._hs.is_mine_id(state_key):
|
|
|
+ continue
|
|
|
+
|
|
|
+ change = await self._get_key_change(
|
|
|
+ prev_event_id,
|
|
|
+ event_id,
|
|
|
+ key_name="membership",
|
|
|
+ public_value=Membership.JOIN,
|
|
|
+ )
|
|
|
+ is_leave = change is MatchChange.now_false
|
|
|
+
|
|
|
+ if is_leave:
|
|
|
+ try:
|
|
|
+ await self._room_member_handler.forget(
|
|
|
+ UserID.from_string(state_key), room_id
|
|
|
+ )
|
|
|
+ except SynapseError as e:
|
|
|
+ if e.code == 400:
|
|
|
+ # The user is back in the room.
|
|
|
+ pass
|
|
|
+ else:
|
|
|
+ raise
|
|
|
|
|
|
|
|
|
def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]:
|