123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180 |
- # Copyright 2022 The Matrix.org Foundation C.I.C.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- from collections import defaultdict
- from typing import Collection, Dict, Set
- from twisted.internet import defer
- from twisted.internet.defer import Deferred
- from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
- from synapse.storage.databases.main.events_worker import EventsWorkerStore
- from synapse.storage.databases.main.room import RoomWorkerStore
- from synapse.util import unwrapFirstError
- logger = logging.getLogger(__name__)
- class PartialStateEventsTracker:
- """Keeps track of which events have partial state, after a partial-state join"""
- def __init__(self, store: EventsWorkerStore):
- self._store = store
- # a map from event id to a set of Deferreds which are waiting for that event to be
- # un-partial-stated.
- self._observers: Dict[str, Set[Deferred[None]]] = defaultdict(set)
- def notify_un_partial_stated(self, event_id: str) -> None:
- """Notify that we now have full state for a given event
- Called by the state-resynchronization loop whenever we resynchronize the state
- for a particular event. Unblocks any callers to await_full_state() for that
- event.
- Args:
- event_id: the event that now has full state.
- """
- observers = self._observers.pop(event_id, None)
- if not observers:
- return
- logger.info(
- "Notifying %i things waiting for un-partial-stating of event %s",
- len(observers),
- event_id,
- )
- with PreserveLoggingContext():
- for o in observers:
- o.callback(None)
- async def await_full_state(self, event_ids: Collection[str]) -> None:
- """Wait for all the given events to have full state.
- Args:
- event_ids: the list of event ids that we want full state for
- """
- # first try the happy path: if there are no partial-state events, we can return
- # quickly
- partial_state_event_ids = [
- ev
- for ev, p in (await self._store.get_partial_state_events(event_ids)).items()
- if p
- ]
- if not partial_state_event_ids:
- return
- logger.info(
- "Awaiting un-partial-stating of events %s",
- partial_state_event_ids,
- stack_info=True,
- )
- # create an observer for each lazy-joined event
- observers: Dict[str, Deferred[None]] = {
- event_id: Deferred() for event_id in partial_state_event_ids
- }
- for event_id, observer in observers.items():
- self._observers[event_id].add(observer)
- try:
- # some of them may have been un-lazy-joined between us checking the db and
- # registering the observer, in which case we'd wait forever for the
- # notification. Call back the observers now.
- for event_id, partial in (
- await self._store.get_partial_state_events(observers.keys())
- ).items():
- # there may have been a call to notify_un_partial_stated during the
- # db query, so the observers may already have been called.
- if not partial and not observers[event_id].called:
- observers[event_id].callback(None)
- await make_deferred_yieldable(
- defer.gatherResults(
- observers.values(),
- consumeErrors=True,
- )
- ).addErrback(unwrapFirstError)
- logger.info("Events %s all un-partial-stated", observers.keys())
- finally:
- # remove any observers we created. This should happen when the notification
- # is received, but that might not happen for two reasons:
- # (a) we're bailing out early on an exception (including us being
- # cancelled during the await)
- # (b) the event got de-lazy-joined before we set up the observer.
- for event_id, observer in observers.items():
- observer_set = self._observers.get(event_id)
- if observer_set:
- observer_set.discard(observer)
- if not observer_set:
- del self._observers[event_id]
- class PartialCurrentStateTracker:
- """Keeps track of which rooms have partial state, after partial-state joins"""
- def __init__(self, store: RoomWorkerStore):
- self._store = store
- # a map from room id to a set of Deferreds which are waiting for that room to be
- # un-partial-stated.
- self._observers: Dict[str, Set[Deferred[None]]] = defaultdict(set)
- def notify_un_partial_stated(self, room_id: str) -> None:
- """Notify that we now have full current state for a given room
- Unblocks any callers to await_full_state() for that room.
- Args:
- room_id: the room that now has full current state.
- """
- observers = self._observers.pop(room_id, None)
- if not observers:
- return
- logger.info(
- "Notifying %i things waiting for un-partial-stating of room %s",
- len(observers),
- room_id,
- )
- with PreserveLoggingContext():
- for o in observers:
- o.callback(None)
- async def await_full_state(self, room_id: str) -> None:
- # We add the deferred immediately so that the DB call to check for
- # partial state doesn't race when we unpartial the room.
- d: Deferred[None] = Deferred()
- self._observers.setdefault(room_id, set()).add(d)
- try:
- # Check if the room has partial current state or not.
- has_partial_state = await self._store.is_partial_state_room(room_id)
- if not has_partial_state:
- return
- logger.info(
- "Awaiting un-partial-stating of room %s",
- room_id,
- )
- await make_deferred_yieldable(d)
- logger.info("Room has un-partial-stated")
- finally:
- # Remove the added observer, and remove the room entry if its empty.
- ds = self._observers.get(room_id)
- if ds is not None:
- ds.discard(d)
- if not ds:
- self._observers.pop(room_id, None)
|