123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- # Copyright 2019 The Matrix.org Foundation C.I.C.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import itertools
- import logging
- from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple
- from synapse.api.constants import EventTypes
- from synapse.replication.tcp.streams import BackfillStream, CachesStream
- from synapse.replication.tcp.streams.events import (
- EventsStream,
- EventsStreamCurrentStateRow,
- EventsStreamEventRow,
- EventsStreamRow,
- )
- from synapse.storage._base import SQLBaseStore
- from synapse.storage.database import (
- DatabasePool,
- LoggingDatabaseConnection,
- LoggingTransaction,
- )
- from synapse.storage.engines import PostgresEngine
- from synapse.storage.util.id_generators import MultiWriterIdGenerator
- from synapse.util.caches.descriptors import CachedFunction
- from synapse.util.iterutils import batch_iter
- if TYPE_CHECKING:
- from synapse.server import HomeServer
- logger = logging.getLogger(__name__)
- # This is a special cache name we use to batch multiple invalidations of caches
- # based on the current state when notifying workers over replication.
- CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
- # As above, but for invalidating event caches on history deletion
- PURGE_HISTORY_CACHE_NAME = "ph_cache_fake"
- # As above, but for invalidating room caches on room deletion
- DELETE_ROOM_CACHE_NAME = "dr_cache_fake"
- class CacheInvalidationWorkerStore(SQLBaseStore):
- def __init__(
- self,
- database: DatabasePool,
- db_conn: LoggingDatabaseConnection,
- hs: "HomeServer",
- ):
- super().__init__(database, db_conn, hs)
- self._instance_name = hs.get_instance_name()
- self.db_pool.updates.register_background_index_update(
- update_name="cache_invalidation_index_by_instance",
- index_name="cache_invalidation_stream_by_instance_instance_index",
- table="cache_invalidation_stream_by_instance",
- columns=("instance_name", "stream_id"),
- psql_only=True, # The table is only on postgres DBs.
- )
- self._cache_id_gen: Optional[MultiWriterIdGenerator]
- if isinstance(self.database_engine, PostgresEngine):
- # We set the `writers` to an empty list here as we don't care about
- # missing updates over restarts, as we'll not have anything in our
- # caches to invalidate. (This reduces the amount of writes to the DB
- # that happen).
- self._cache_id_gen = MultiWriterIdGenerator(
- db_conn,
- database,
- notifier=hs.get_replication_notifier(),
- stream_name="caches",
- instance_name=hs.get_instance_name(),
- tables=[
- (
- "cache_invalidation_stream_by_instance",
- "instance_name",
- "stream_id",
- )
- ],
- sequence_name="cache_invalidation_stream_seq",
- writers=[],
- )
- else:
- self._cache_id_gen = None
- async def get_all_updated_caches(
- self, instance_name: str, last_id: int, current_id: int, limit: int
- ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
- """Get updates for caches replication stream.
- Args:
- instance_name: The writer we want to fetch updates from. Unused
- here since there is only ever one writer.
- last_id: The token to fetch updates from. Exclusive.
- current_id: The token to fetch updates up to. Inclusive.
- limit: The requested limit for the number of rows to return. The
- function may return more or fewer rows.
- Returns:
- A tuple consisting of: the updates, a token to use to fetch
- subsequent updates, and whether we returned fewer rows than exists
- between the requested tokens due to the limit.
- The token returned can be used in a subsequent call to this
- function to get further updatees.
- The updates are a list of 2-tuples of stream ID and the row data
- """
- if last_id == current_id:
- return [], current_id, False
- def get_all_updated_caches_txn(
- txn: LoggingTransaction,
- ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
- # We purposefully don't bound by the current token, as we want to
- # send across cache invalidations as quickly as possible. Cache
- # invalidations are idempotent, so duplicates are fine.
- sql = """
- SELECT stream_id, cache_func, keys, invalidation_ts
- FROM cache_invalidation_stream_by_instance
- WHERE stream_id > ? AND instance_name = ?
- ORDER BY stream_id ASC
- LIMIT ?
- """
- txn.execute(sql, (last_id, instance_name, limit))
- updates = [(row[0], row[1:]) for row in txn]
- limited = False
- upto_token = current_id
- if len(updates) >= limit:
- upto_token = updates[-1][0]
- limited = True
- return updates, upto_token, limited
- return await self.db_pool.runInteraction(
- "get_all_updated_caches", get_all_updated_caches_txn
- )
- def process_replication_rows(
- self, stream_name: str, instance_name: str, token: int, rows: Iterable[Any]
- ) -> None:
- if stream_name == EventsStream.NAME:
- for row in rows:
- self._process_event_stream_row(token, row)
- elif stream_name == BackfillStream.NAME:
- for row in rows:
- self._invalidate_caches_for_event(
- -token,
- row.event_id,
- row.room_id,
- row.type,
- row.state_key,
- row.redacts,
- row.relates_to,
- backfilled=True,
- )
- elif stream_name == CachesStream.NAME:
- for row in rows:
- if row.cache_func == CURRENT_STATE_CACHE_NAME:
- if row.keys is None:
- raise Exception(
- "Can't send an 'invalidate all' for current state cache"
- )
- room_id = row.keys[0]
- members_changed = set(row.keys[1:])
- self._invalidate_state_caches(room_id, members_changed)
- elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
- if row.keys is None:
- raise Exception(
- "Can't send an 'invalidate all' for 'purge history' cache"
- )
- room_id = row.keys[0]
- self._invalidate_caches_for_room_events(room_id)
- elif row.cache_func == DELETE_ROOM_CACHE_NAME:
- if row.keys is None:
- raise Exception(
- "Can't send an 'invalidate all' for 'delete room' cache"
- )
- room_id = row.keys[0]
- self._invalidate_caches_for_room_events(room_id)
- self._invalidate_caches_for_room(room_id)
- else:
- self._attempt_to_invalidate_cache(row.cache_func, row.keys)
- super().process_replication_rows(stream_name, instance_name, token, rows)
- def process_replication_position(
- self, stream_name: str, instance_name: str, token: int
- ) -> None:
- if stream_name == CachesStream.NAME:
- if self._cache_id_gen:
- self._cache_id_gen.advance(instance_name, token)
- super().process_replication_position(stream_name, instance_name, token)
- def _process_event_stream_row(self, token: int, row: EventsStreamRow) -> None:
- data = row.data
- if row.type == EventsStreamEventRow.TypeId:
- assert isinstance(data, EventsStreamEventRow)
- self._invalidate_caches_for_event(
- token,
- data.event_id,
- data.room_id,
- data.type,
- data.state_key,
- data.redacts,
- data.relates_to,
- backfilled=False,
- )
- elif row.type == EventsStreamCurrentStateRow.TypeId:
- assert isinstance(data, EventsStreamCurrentStateRow)
- self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
- if data.type == EventTypes.Member:
- self.get_rooms_for_user_with_stream_ordering.invalidate( # type: ignore[attr-defined]
- (data.state_key,)
- )
- self.get_rooms_for_user.invalidate((data.state_key,)) # type: ignore[attr-defined]
- else:
- raise Exception("Unknown events stream row type %s" % (row.type,))
- def _invalidate_caches_for_event(
- self,
- stream_ordering: int,
- event_id: str,
- room_id: str,
- etype: str,
- state_key: Optional[str],
- redacts: Optional[str],
- relates_to: Optional[str],
- backfilled: bool,
- ) -> None:
- # XXX: If you add something to this function make sure you add it to
- # `_invalidate_caches_for_room_events` as well.
- # This invalidates any local in-memory cached event objects, the original
- # process triggering the invalidation is responsible for clearing any external
- # cached objects.
- self._invalidate_local_get_event_cache(event_id) # type: ignore[attr-defined]
- self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id))
- self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
- self._attempt_to_invalidate_cache(
- "get_unread_event_push_actions_by_room_for_user", (room_id,)
- )
- # The `_get_membership_from_event_id` is immutable, except for the
- # case where we look up an event *before* persisting it.
- self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))
- if not backfilled:
- self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined]
- if redacts:
- self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined]
- # Caches which might leak edits must be invalidated for the event being
- # redacted.
- self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,))
- self._attempt_to_invalidate_cache("get_applicable_edit", (redacts,))
- self._attempt_to_invalidate_cache("get_thread_id", (redacts,))
- self._attempt_to_invalidate_cache("get_thread_id_for_receipts", (redacts,))
- if etype == EventTypes.Member:
- self._membership_stream_cache.entity_has_changed(state_key, stream_ordering) # type: ignore[attr-defined]
- self._attempt_to_invalidate_cache(
- "get_invited_rooms_for_local_user", (state_key,)
- )
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", (state_key,)
- )
- self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
- self._attempt_to_invalidate_cache(
- "did_forget",
- (
- state_key,
- room_id,
- ),
- )
- self._attempt_to_invalidate_cache(
- "get_forgotten_rooms_for_user", (state_key,)
- )
- if relates_to:
- self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
- self._attempt_to_invalidate_cache("get_references_for_event", (relates_to,))
- self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,))
- self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,))
- self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
- self._attempt_to_invalidate_cache("get_threads", (room_id,))
- def _invalidate_caches_for_room_events_and_stream(
- self, txn: LoggingTransaction, room_id: str
- ) -> None:
- """Invalidate caches associated with events in a room, and stream to
- replication.
- Used when we delete events a room, but don't know which events we've
- deleted.
- """
- self._send_invalidation_to_replication(txn, PURGE_HISTORY_CACHE_NAME, [room_id])
- txn.call_after(self._invalidate_caches_for_room_events, room_id)
- def _invalidate_caches_for_room_events(self, room_id: str) -> None:
- """Invalidate caches associated with events in a room, and stream to
- replication.
- Used when we delete events in a room, but don't know which events we've
- deleted.
- """
- self._invalidate_local_get_event_cache_all() # type: ignore[attr-defined]
- self._attempt_to_invalidate_cache("have_seen_event", (room_id,))
- self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
- self._attempt_to_invalidate_cache(
- "get_unread_event_push_actions_by_room_for_user", (room_id,)
- )
- self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
- self._attempt_to_invalidate_cache("get_relations_for_event", None)
- self._attempt_to_invalidate_cache("get_applicable_edit", None)
- self._attempt_to_invalidate_cache("get_thread_id", None)
- self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
- self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
- self._attempt_to_invalidate_cache(
- "get_rooms_for_user_with_stream_ordering", None
- )
- self._attempt_to_invalidate_cache("get_rooms_for_user", None)
- self._attempt_to_invalidate_cache("did_forget", None)
- self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
- self._attempt_to_invalidate_cache("get_references_for_event", None)
- self._attempt_to_invalidate_cache("get_thread_summary", None)
- self._attempt_to_invalidate_cache("get_thread_participated", None)
- self._attempt_to_invalidate_cache("get_threads", (room_id,))
- self._attempt_to_invalidate_cache("_get_state_group_for_event", None)
- self._attempt_to_invalidate_cache("get_event_ordering", None)
- self._attempt_to_invalidate_cache("is_partial_state_event", None)
- self._attempt_to_invalidate_cache("_get_joined_profile_from_event_id", None)
- def _invalidate_caches_for_room_and_stream(
- self, txn: LoggingTransaction, room_id: str
- ) -> None:
- """Invalidate caches associated with rooms, and stream to replication.
- Used when we delete rooms.
- """
- self._send_invalidation_to_replication(txn, DELETE_ROOM_CACHE_NAME, [room_id])
- txn.call_after(self._invalidate_caches_for_room, room_id)
- def _invalidate_caches_for_room(self, room_id: str) -> None:
- """Invalidate caches associated with rooms.
- Used when we delete rooms.
- """
- # If we've deleted the room then we also need to purge all event caches.
- self._invalidate_caches_for_room_events(room_id)
- self._attempt_to_invalidate_cache("get_account_data_for_room", None)
- self._attempt_to_invalidate_cache("get_account_data_for_room_and_type", None)
- self._attempt_to_invalidate_cache("get_aliases_for_room", (room_id,))
- self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
- self._attempt_to_invalidate_cache("_get_forward_extremeties_for_room", None)
- self._attempt_to_invalidate_cache(
- "get_unread_event_push_actions_by_room_for_user", (room_id,)
- )
- self._attempt_to_invalidate_cache(
- "_get_linearized_receipts_for_room", (room_id,)
- )
- self._attempt_to_invalidate_cache("is_room_blocked", (room_id,))
- self._attempt_to_invalidate_cache("get_retention_policy_for_room", (room_id,))
- self._attempt_to_invalidate_cache(
- "_get_partial_state_servers_at_join", (room_id,)
- )
- self._attempt_to_invalidate_cache("is_partial_state_room", (room_id,))
- self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
- self._attempt_to_invalidate_cache(
- "get_current_hosts_in_room_ordered", (room_id,)
- )
- self._attempt_to_invalidate_cache("did_forget", None)
- self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
- self._attempt_to_invalidate_cache("_get_membership_from_event_id", None)
- self._attempt_to_invalidate_cache("get_room_version_id", (room_id,))
- # And delete state caches.
- self._invalidate_state_caches_all(room_id)
- async def invalidate_cache_and_stream(
- self, cache_name: str, keys: Tuple[Any, ...]
- ) -> None:
- """Invalidates the cache and adds it to the cache stream so other workers
- will know to invalidate their caches.
- This should only be used to invalidate caches where other workers won't
- otherwise have known from other replication streams that the cache should
- be invalidated.
- """
- cache_func = getattr(self, cache_name, None)
- if not cache_func:
- return
- cache_func.invalidate(keys)
- await self.send_invalidation_to_replication(
- cache_func.__name__,
- keys,
- )
- def _invalidate_cache_and_stream(
- self,
- txn: LoggingTransaction,
- cache_func: CachedFunction,
- keys: Tuple[Any, ...],
- ) -> None:
- """Invalidates the cache and adds it to the cache stream so other workers
- will know to invalidate their caches.
- This should only be used to invalidate caches where other workers won't
- otherwise have known from other replication streams that the cache should
- be invalidated.
- """
- txn.call_after(cache_func.invalidate, keys)
- self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
- def _invalidate_all_cache_and_stream(
- self, txn: LoggingTransaction, cache_func: CachedFunction
- ) -> None:
- """Invalidates the entire cache and adds it to the cache stream so other workers
- will know to invalidate their caches.
- """
- txn.call_after(cache_func.invalidate_all)
- self._send_invalidation_to_replication(txn, cache_func.__name__, None)
- def _invalidate_state_caches_and_stream(
- self, txn: LoggingTransaction, room_id: str, members_changed: Collection[str]
- ) -> None:
- """Special case invalidation of caches based on current state.
- We special case this so that we can batch the cache invalidations into a
- single replication poke.
- Args:
- txn
- room_id: Room where state changed
- members_changed: The user_ids of members that have changed
- """
- txn.call_after(self._invalidate_state_caches, room_id, members_changed)
- if members_changed:
- # We need to be careful that the size of the `members_changed` list
- # isn't so large that it causes problems sending over replication, so we
- # send them in chunks.
- # Max line length is 16K, and max user ID length is 255, so 50 should
- # be safe.
- for chunk in batch_iter(members_changed, 50):
- keys = itertools.chain([room_id], chunk)
- self._send_invalidation_to_replication(
- txn, CURRENT_STATE_CACHE_NAME, keys
- )
- else:
- # if no members changed, we still need to invalidate the other caches.
- self._send_invalidation_to_replication(
- txn, CURRENT_STATE_CACHE_NAME, [room_id]
- )
- async def send_invalidation_to_replication(
- self, cache_name: str, keys: Optional[Collection[Any]]
- ) -> None:
- await self.db_pool.runInteraction(
- "send_invalidation_to_replication",
- self._send_invalidation_to_replication,
- cache_name,
- keys,
- )
- def _send_invalidation_to_replication(
- self, txn: LoggingTransaction, cache_name: str, keys: Optional[Iterable[Any]]
- ) -> None:
- """Notifies replication that given cache has been invalidated.
- Note that this does *not* invalidate the cache locally.
- Args:
- txn
- cache_name
- keys: Entry to invalidate. If None will invalidate all.
- """
- if cache_name == CURRENT_STATE_CACHE_NAME and keys is None:
- raise Exception(
- "Can't stream invalidate all with magic current state cache"
- )
- if cache_name == PURGE_HISTORY_CACHE_NAME and keys is None:
- raise Exception(
- "Can't stream invalidate all with magic purge history cache"
- )
- if cache_name == DELETE_ROOM_CACHE_NAME and keys is None:
- raise Exception("Can't stream invalidate all with magic delete room cache")
- if isinstance(self.database_engine, PostgresEngine):
- assert self._cache_id_gen is not None
- # get_next() returns a context manager which is designed to wrap
- # the transaction. However, we want to only get an ID when we want
- # to use it, here, so we need to call __enter__ manually, and have
- # __exit__ called after the transaction finishes.
- stream_id = self._cache_id_gen.get_next_txn(txn)
- txn.call_after(self.hs.get_notifier().on_new_replication_data)
- if keys is not None:
- keys = list(keys)
- self.db_pool.simple_insert_txn(
- txn,
- table="cache_invalidation_stream_by_instance",
- values={
- "stream_id": stream_id,
- "instance_name": self._instance_name,
- "cache_func": cache_name,
- "keys": keys,
- "invalidation_ts": self._clock.time_msec(),
- },
- )
- def get_cache_stream_token_for_writer(self, instance_name: str) -> int:
- if self._cache_id_gen:
- return self._cache_id_gen.get_current_token_for_writer(instance_name)
- else:
- return 0
|