123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612 |
- # Copyright 2017 Vector Creations Ltd
- # Copyright 2019 New Vector Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import heapq
- import logging
- from typing import (
- TYPE_CHECKING,
- Any,
- Awaitable,
- Callable,
- List,
- Optional,
- Tuple,
- TypeVar,
- )
- import attr
- from synapse.replication.http.streams import ReplicationGetStreamUpdates
- from synapse.types import JsonDict
- if TYPE_CHECKING:
- from synapse.server import HomeServer
- logger = logging.getLogger(__name__)
- # the number of rows to request from an update_function.
- _STREAM_UPDATE_TARGET_ROW_COUNT = 100
- # Some type aliases to make things a bit easier.
- # A stream position token
- Token = int
- # The type of a stream update row, after JSON deserialisation, but before
- # parsing with Stream.parse_row (which turns it into a `ROW_TYPE`). Normally it's
- # just a row from a database query, though this is dependent on the stream in question.
- #
- StreamRow = TypeVar("StreamRow", bound=Tuple)
- # The type returned by the update_function of a stream, as well as get_updates(),
- # get_updates_since, etc.
- #
- # It consists of a triplet `(updates, new_last_token, limited)`, where:
- # * `updates` is a list of `(token, row)` entries.
- # * `new_last_token` is the new position in stream.
- # * `limited` is whether there are more updates to fetch.
- #
- StreamUpdateResult = Tuple[List[Tuple[Token, StreamRow]], Token, bool]
- # The type of an update_function for a stream
- #
- # The arguments are:
- #
- # * instance_name: the writer of the stream
- # * from_token: the previous stream token: the starting point for fetching the
- # updates
- # * to_token: the new stream token: the point to get updates up to
- # * target_row_count: a target for the number of rows to be returned.
- #
- # The update_function is expected to return up to _approximately_ target_row_count rows.
- # If there are more updates available, it should set `limited` in the result, and
- # it will be called again to get the next batch.
- #
- UpdateFunction = Callable[[str, Token, Token, int], Awaitable[StreamUpdateResult]]
- class Stream:
- """Base class for the streams.
- Provides a `get_updates()` function that returns new updates since the last
- time it was called.
- """
- NAME: str # The name of the stream
- # The type of the row. Used by the default impl of parse_row.
- ROW_TYPE: Any = None
- @classmethod
- def parse_row(cls, row: StreamRow) -> Any:
- """Parse a row received over replication
- By default, assumes that the row data is an array object and passes its contents
- to the constructor of the ROW_TYPE for this stream.
- Args:
- row: row data from the incoming RDATA command, after json decoding
- Returns:
- ROW_TYPE object for this stream
- """
- return cls.ROW_TYPE(*row)
- def __init__(
- self,
- local_instance_name: str,
- current_token_function: Callable[[str], Token],
- update_function: UpdateFunction,
- ):
- """Instantiate a Stream
- `current_token_function` and `update_function` are callbacks which
- should be implemented by subclasses.
- `current_token_function` takes an instance name, which is a writer to
- the stream, and returns the position in the stream of the writer (as
- viewed from the current process). On the writer process this is where
- the writer has successfully written up to, whereas on other processes
- this is the position which we have received updates up to over
- replication. (Note that most streams have a single writer and so their
- implementations ignore the instance name passed in).
- `update_function` is called to get updates for this stream between a
- pair of stream tokens. See the `UpdateFunction` type definition for more
- info.
- Args:
- local_instance_name: The instance name of the current process
- current_token_function: callback to get the current token, as above
- update_function: callback go get stream updates, as above
- """
- self.local_instance_name = local_instance_name
- self.current_token = current_token_function
- self.update_function = update_function
- # The token from which we last asked for updates
- self.last_token = self.current_token(self.local_instance_name)
- def discard_updates_and_advance(self) -> None:
- """Called when the stream should advance but the updates would be discarded,
- e.g. when there are no currently connected workers.
- """
- self.last_token = self.current_token(self.local_instance_name)
- async def get_updates(self) -> StreamUpdateResult:
- """Gets all updates since the last time this function was called (or
- since the stream was constructed if it hadn't been called before).
- Returns:
- A triplet `(updates, new_last_token, limited)`, where `updates` is
- a list of `(token, row)` entries, `new_last_token` is the new
- position in stream, and `limited` is whether there are more updates
- to fetch.
- """
- current_token = self.current_token(self.local_instance_name)
- updates, current_token, limited = await self.get_updates_since(
- self.local_instance_name, self.last_token, current_token
- )
- self.last_token = current_token
- return updates, current_token, limited
- async def get_updates_since(
- self, instance_name: str, from_token: Token, upto_token: Token
- ) -> StreamUpdateResult:
- """Like get_updates except allows specifying from when we should
- stream updates
- Returns:
- A triplet `(updates, new_last_token, limited)`, where `updates` is
- a list of `(token, row)` entries, `new_last_token` is the new
- position in stream, and `limited` is whether there are more updates
- to fetch.
- """
- from_token = int(from_token)
- if from_token == upto_token:
- return [], upto_token, False
- updates, upto_token, limited = await self.update_function(
- instance_name,
- from_token,
- upto_token,
- _STREAM_UPDATE_TARGET_ROW_COUNT,
- )
- return updates, upto_token, limited
- def current_token_without_instance(
- current_token: Callable[[], int]
- ) -> Callable[[str], int]:
- """Takes a current token callback function for a single writer stream
- that doesn't take an instance name parameter and wraps it in a function that
- does accept an instance name parameter but ignores it.
- """
- return lambda instance_name: current_token()
- def make_http_update_function(hs: "HomeServer", stream_name: str) -> UpdateFunction:
- """Makes a suitable function for use as an `update_function` that queries
- the master process for updates.
- """
- client = ReplicationGetStreamUpdates.make_client(hs)
- async def update_function(
- instance_name: str, from_token: int, upto_token: int, limit: int
- ) -> StreamUpdateResult:
- result = await client(
- instance_name=instance_name,
- stream_name=stream_name,
- from_token=from_token,
- upto_token=upto_token,
- )
- return result["updates"], result["upto_token"], result["limited"]
- return update_function
- class BackfillStream(Stream):
- """We fetched some old events and either we had never seen that event before
- or it went from being an outlier to not.
- """
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class BackfillStreamRow:
- event_id: str
- room_id: str
- type: str
- state_key: Optional[str]
- redacts: Optional[str]
- relates_to: Optional[str]
- NAME = "backfill"
- ROW_TYPE = BackfillStreamRow
- def __init__(self, hs: "HomeServer"):
- self.store = hs.get_datastores().main
- super().__init__(
- hs.get_instance_name(),
- self._current_token,
- self.store.get_all_new_backfill_event_rows,
- )
- def _current_token(self, instance_name: str) -> int:
- # The backfill stream over replication operates on *positive* numbers,
- # which means we need to negate it.
- return -self.store._backfill_id_gen.get_current_token_for_writer(instance_name)
- class PresenceStream(Stream):
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class PresenceStreamRow:
- user_id: str
- state: str
- last_active_ts: int
- last_federation_update_ts: int
- last_user_sync_ts: int
- status_msg: str
- currently_active: bool
- NAME = "presence"
- ROW_TYPE = PresenceStreamRow
- def __init__(self, hs: "HomeServer"):
- store = hs.get_datastores().main
- if hs.get_instance_name() in hs.config.worker.writers.presence:
- # on the presence writer, query the presence handler
- presence_handler = hs.get_presence_handler()
- from synapse.handlers.presence import PresenceHandler
- assert isinstance(presence_handler, PresenceHandler)
- update_function: UpdateFunction = presence_handler.get_all_presence_updates
- else:
- # Query presence writer process
- update_function = make_http_update_function(hs, self.NAME)
- super().__init__(
- hs.get_instance_name(),
- current_token_without_instance(store.get_current_presence_token),
- update_function,
- )
- class PresenceFederationStream(Stream):
- """A stream used to send ad hoc presence updates over federation.
- Streams the remote destination and the user ID of the presence state to
- send.
- """
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class PresenceFederationStreamRow:
- destination: str
- user_id: str
- NAME = "presence_federation"
- ROW_TYPE = PresenceFederationStreamRow
- def __init__(self, hs: "HomeServer"):
- federation_queue = hs.get_presence_handler().get_federation_queue()
- super().__init__(
- hs.get_instance_name(),
- federation_queue.get_current_token,
- federation_queue.get_replication_rows,
- )
- class TypingStream(Stream):
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class TypingStreamRow:
- room_id: str
- user_ids: List[str]
- NAME = "typing"
- ROW_TYPE = TypingStreamRow
- def __init__(self, hs: "HomeServer"):
- if hs.get_instance_name() in hs.config.worker.writers.typing:
- # On the writer, query the typing handler
- typing_writer_handler = hs.get_typing_writer_handler()
- update_function: Callable[
- [str, int, int, int], Awaitable[Tuple[List[Tuple[int, Any]], int, bool]]
- ] = typing_writer_handler.get_all_typing_updates
- current_token_function = typing_writer_handler.get_current_token
- else:
- # Query the typing writer process
- update_function = make_http_update_function(hs, self.NAME)
- current_token_function = hs.get_typing_handler().get_current_token
- super().__init__(
- hs.get_instance_name(),
- current_token_without_instance(current_token_function),
- update_function,
- )
- class ReceiptsStream(Stream):
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class ReceiptsStreamRow:
- room_id: str
- receipt_type: str
- user_id: str
- event_id: str
- data: dict
- NAME = "receipts"
- ROW_TYPE = ReceiptsStreamRow
- def __init__(self, hs: "HomeServer"):
- store = hs.get_datastores().main
- super().__init__(
- hs.get_instance_name(),
- current_token_without_instance(store.get_max_receipt_stream_id),
- store.get_all_updated_receipts,
- )
- class PushRulesStream(Stream):
- """A user has changed their push rules"""
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class PushRulesStreamRow:
- user_id: str
- NAME = "push_rules"
- ROW_TYPE = PushRulesStreamRow
- def __init__(self, hs: "HomeServer"):
- self.store = hs.get_datastores().main
- super().__init__(
- hs.get_instance_name(),
- self._current_token,
- self.store.get_all_push_rule_updates,
- )
- def _current_token(self, instance_name: str) -> int:
- push_rules_token = self.store.get_max_push_rules_stream_id()
- return push_rules_token
- class PushersStream(Stream):
- """A user has added/changed/removed a pusher"""
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class PushersStreamRow:
- user_id: str
- app_id: str
- pushkey: str
- deleted: bool
- NAME = "pushers"
- ROW_TYPE = PushersStreamRow
- def __init__(self, hs: "HomeServer"):
- store = hs.get_datastores().main
- super().__init__(
- hs.get_instance_name(),
- current_token_without_instance(store.get_pushers_stream_token),
- store.get_all_updated_pushers_rows,
- )
- class CachesStream(Stream):
- """A cache was invalidated on the master and no other stream would invalidate
- the cache on the workers
- """
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class CachesStreamRow:
- """Stream to inform workers they should invalidate their cache.
- Attributes:
- cache_func: Name of the cached function.
- keys: The entry in the cache to invalidate. If None then will
- invalidate all.
- invalidation_ts: Timestamp of when the invalidation took place.
- """
- cache_func: str
- keys: Optional[List[Any]]
- invalidation_ts: int
- NAME = "caches"
- ROW_TYPE = CachesStreamRow
- def __init__(self, hs: "HomeServer"):
- store = hs.get_datastores().main
- super().__init__(
- hs.get_instance_name(),
- store.get_cache_stream_token_for_writer,
- store.get_all_updated_caches,
- )
- class DeviceListsStream(Stream):
- """Either a user has updated their devices or a remote server needs to be
- told about a device update.
- """
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class DeviceListsStreamRow:
- entity: str
- NAME = "device_lists"
- ROW_TYPE = DeviceListsStreamRow
- def __init__(self, hs: "HomeServer"):
- store = hs.get_datastores().main
- super().__init__(
- hs.get_instance_name(),
- current_token_without_instance(store.get_device_stream_token),
- store.get_all_device_list_changes_for_remotes,
- )
- class ToDeviceStream(Stream):
- """New to_device messages for a client"""
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class ToDeviceStreamRow:
- entity: str
- NAME = "to_device"
- ROW_TYPE = ToDeviceStreamRow
- def __init__(self, hs: "HomeServer"):
- store = hs.get_datastores().main
- super().__init__(
- hs.get_instance_name(),
- current_token_without_instance(store.get_to_device_stream_token),
- store.get_all_new_device_messages,
- )
- class TagAccountDataStream(Stream):
- """Someone added/removed a tag for a room"""
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class TagAccountDataStreamRow:
- user_id: str
- room_id: str
- data: JsonDict
- NAME = "tag_account_data"
- ROW_TYPE = TagAccountDataStreamRow
- def __init__(self, hs: "HomeServer"):
- store = hs.get_datastores().main
- super().__init__(
- hs.get_instance_name(),
- current_token_without_instance(store.get_max_account_data_stream_id),
- store.get_all_updated_tags,
- )
- class AccountDataStream(Stream):
- """Global or per room account data was changed"""
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class AccountDataStreamRow:
- user_id: str
- room_id: Optional[str]
- data_type: str
- NAME = "account_data"
- ROW_TYPE = AccountDataStreamRow
- def __init__(self, hs: "HomeServer"):
- self.store = hs.get_datastores().main
- super().__init__(
- hs.get_instance_name(),
- current_token_without_instance(self.store.get_max_account_data_stream_id),
- self._update_function,
- )
- async def _update_function(
- self, instance_name: str, from_token: int, to_token: int, limit: int
- ) -> StreamUpdateResult:
- limited = False
- global_results = await self.store.get_updated_global_account_data(
- from_token, to_token, limit
- )
- # if the global results hit the limit, we'll need to limit the room results to
- # the same stream token.
- if len(global_results) >= limit:
- to_token = global_results[-1][0]
- limited = True
- room_results = await self.store.get_updated_room_account_data(
- from_token, to_token, limit
- )
- # likewise, if the room results hit the limit, limit the global results to
- # the same stream token.
- if len(room_results) >= limit:
- to_token = room_results[-1][0]
- limited = True
- # convert the global results to the right format, and limit them to the to_token
- # at the same time
- global_rows = (
- (stream_id, (user_id, None, account_data_type))
- for stream_id, user_id, account_data_type in global_results
- if stream_id <= to_token
- )
- # we know that the room_results are already limited to `to_token` so no need
- # for a check on `stream_id` here.
- room_rows = (
- (stream_id, (user_id, room_id, account_data_type))
- for stream_id, user_id, room_id, account_data_type in room_results
- )
- # We need to return a sorted list, so merge them together.
- #
- # Note: We order only by the stream ID to work around a bug where the
- # same stream ID could appear in both `global_rows` and `room_rows`,
- # leading to a comparison between the data tuples. The comparison could
- # fail due to attempting to compare the `room_id` which results in a
- # `TypeError` from comparing a `str` vs `None`.
- updates = list(heapq.merge(room_rows, global_rows, key=lambda row: row[0]))
- return updates, to_token, limited
- class GroupServerStream(Stream):
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class GroupsStreamRow:
- group_id: str
- user_id: str
- type: str
- content: JsonDict
- NAME = "groups"
- ROW_TYPE = GroupsStreamRow
- def __init__(self, hs: "HomeServer"):
- store = hs.get_datastores().main
- super().__init__(
- hs.get_instance_name(),
- current_token_without_instance(store.get_group_stream_token),
- store.get_all_groups_changes,
- )
- class UserSignatureStream(Stream):
- """A user has signed their own device with their user-signing key"""
- @attr.s(slots=True, frozen=True, auto_attribs=True)
- class UserSignatureStreamRow:
- user_id: str
- NAME = "user_signature"
- ROW_TYPE = UserSignatureStreamRow
- def __init__(self, hs: "HomeServer"):
- store = hs.get_datastores().main
- super().__init__(
- hs.get_instance_name(),
- current_token_without_instance(store.get_device_stream_token),
- store.get_all_user_signature_changes_for_remotes,
- )
|