123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551 |
- # -*- coding: utf-8 -*-
- # Copyright 2014-2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- """A federation sender that forwards things to be sent across replication to
- a worker process.
- It assumes there is a single worker process feeding off of it.
- Each row in the replication stream consists of a type and some json, where the
- types indicate whether they are presence, or edus, etc.
- Ephemeral or non-event data are queued up in-memory. When the worker requests
- updates since a particular point, all in-memory data since before that point is
- dropped. We also expire things in the queue after 5 minutes, to ensure that a
- dead worker doesn't cause the queues to grow limitlessly.
- Events are replicated via a separate events stream.
- """
- import logging
- from collections import namedtuple
- from typing import Dict, List, Tuple, Type
- from sortedcontainers import SortedDict
- from twisted.internet import defer
- from synapse.api.presence import UserPresenceState
- from synapse.metrics import LaterGauge
- from synapse.util.metrics import Measure
- from .units import Edu
- logger = logging.getLogger(__name__)
- class FederationRemoteSendQueue:
- """A drop in replacement for FederationSender"""
- def __init__(self, hs):
- self.server_name = hs.hostname
- self.clock = hs.get_clock()
- self.notifier = hs.get_notifier()
- self.is_mine_id = hs.is_mine_id
- # We may have multiple federation sender instances, so we need to track
- # their positions separately.
- self._sender_instances = hs.config.worker.federation_shard_config.instances
- self._sender_positions = {}
- # Pending presence map user_id -> UserPresenceState
- self.presence_map = {} # type: Dict[str, UserPresenceState]
- # Stream position -> list[user_id]
- self.presence_changed = SortedDict() # type: SortedDict[int, List[str]]
- # Stores the destinations we need to explicitly send presence to about a
- # given user.
- # Stream position -> (user_id, destinations)
- self.presence_destinations = (
- SortedDict()
- ) # type: SortedDict[int, Tuple[str, List[str]]]
- # (destination, key) -> EDU
- self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
- # stream position -> (destination, key)
- self.keyed_edu_changed = (
- SortedDict()
- ) # type: SortedDict[int, Tuple[str, tuple]]
- self.edus = SortedDict() # type: SortedDict[int, Edu]
- # stream ID for the next entry into presence_changed/keyed_edu_changed/edus.
- self.pos = 1
- # map from stream ID to the time that stream entry was generated, so that we
- # can clear out entries after a while
- self.pos_time = SortedDict() # type: SortedDict[int, int]
- # EVERYTHING IS SAD. In particular, python only makes new scopes when
- # we make a new function, so we need to make a new function so the inner
- # lambda binds to the queue rather than to the name of the queue which
- # changes. ARGH.
- def register(name, queue):
- LaterGauge(
- "synapse_federation_send_queue_%s_size" % (queue_name,),
- "",
- [],
- lambda: len(queue),
- )
- for queue_name in [
- "presence_map",
- "presence_changed",
- "keyed_edu",
- "keyed_edu_changed",
- "edus",
- "pos_time",
- "presence_destinations",
- ]:
- register(queue_name, getattr(self, queue_name))
- self.clock.looping_call(self._clear_queue, 30 * 1000)
- def _next_pos(self):
- pos = self.pos
- self.pos += 1
- self.pos_time[self.clock.time_msec()] = pos
- return pos
- def _clear_queue(self):
- """Clear the queues for anything older than N minutes"""
- FIVE_MINUTES_AGO = 5 * 60 * 1000
- now = self.clock.time_msec()
- keys = self.pos_time.keys()
- time = self.pos_time.bisect_left(now - FIVE_MINUTES_AGO)
- if not keys[:time]:
- return
- position_to_delete = max(keys[:time])
- for key in keys[:time]:
- del self.pos_time[key]
- self._clear_queue_before_pos(position_to_delete)
- def _clear_queue_before_pos(self, position_to_delete):
- """Clear all the queues from before a given position"""
- with Measure(self.clock, "send_queue._clear"):
- # Delete things out of presence maps
- keys = self.presence_changed.keys()
- i = self.presence_changed.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.presence_changed[key]
- user_ids = {
- user_id for uids in self.presence_changed.values() for user_id in uids
- }
- keys = self.presence_destinations.keys()
- i = self.presence_destinations.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.presence_destinations[key]
- user_ids.update(
- user_id for user_id, _ in self.presence_destinations.values()
- )
- to_del = [
- user_id for user_id in self.presence_map if user_id not in user_ids
- ]
- for user_id in to_del:
- del self.presence_map[user_id]
- # Delete things out of keyed edus
- keys = self.keyed_edu_changed.keys()
- i = self.keyed_edu_changed.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.keyed_edu_changed[key]
- live_keys = set()
- for edu_key in self.keyed_edu_changed.values():
- live_keys.add(edu_key)
- keys_to_del = [
- edu_key for edu_key in self.keyed_edu if edu_key not in live_keys
- ]
- for edu_key in keys_to_del:
- del self.keyed_edu[edu_key]
- # Delete things out of edu map
- keys = self.edus.keys()
- i = self.edus.bisect_left(position_to_delete)
- for key in keys[:i]:
- del self.edus[key]
- def notify_new_events(self, max_token):
- """As per FederationSender"""
- # We don't need to replicate this as it gets sent down a different
- # stream.
- pass
- def build_and_send_edu(self, destination, edu_type, content, key=None):
- """As per FederationSender"""
- if destination == self.server_name:
- logger.info("Not sending EDU to ourselves")
- return
- pos = self._next_pos()
- edu = Edu(
- origin=self.server_name,
- destination=destination,
- edu_type=edu_type,
- content=content,
- )
- if key:
- assert isinstance(key, tuple)
- self.keyed_edu[(destination, key)] = edu
- self.keyed_edu_changed[pos] = (destination, key)
- else:
- self.edus[pos] = edu
- self.notifier.on_new_replication_data()
- def send_read_receipt(self, receipt):
- """As per FederationSender
- Args:
- receipt (synapse.types.ReadReceipt):
- """
- # nothing to do here: the replication listener will handle it.
- return defer.succeed(None)
- def send_presence(self, states):
- """As per FederationSender
- Args:
- states (list(UserPresenceState))
- """
- pos = self._next_pos()
- # We only want to send presence for our own users, so lets always just
- # filter here just in case.
- local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states))
- self.presence_map.update({state.user_id: state for state in local_states})
- self.presence_changed[pos] = [state.user_id for state in local_states]
- self.notifier.on_new_replication_data()
- def send_presence_to_destinations(self, states, destinations):
- """As per FederationSender
- Args:
- states (list[UserPresenceState])
- destinations (list[str])
- """
- for state in states:
- pos = self._next_pos()
- self.presence_map.update({state.user_id: state for state in states})
- self.presence_destinations[pos] = (state.user_id, destinations)
- self.notifier.on_new_replication_data()
- def send_device_messages(self, destination):
- """As per FederationSender"""
- # We don't need to replicate this as it gets sent down a different
- # stream.
- def get_current_token(self):
- return self.pos - 1
- def federation_ack(self, instance_name, token):
- if self._sender_instances:
- # If we have configured multiple federation sender instances we need
- # to track their positions separately, and only clear the queue up
- # to the token all instances have acked.
- self._sender_positions[instance_name] = token
- token = min(self._sender_positions.values())
- self._clear_queue_before_pos(token)
- async def get_replication_rows(
- self, instance_name: str, from_token: int, to_token: int, target_row_count: int
- ) -> Tuple[List[Tuple[int, Tuple]], int, bool]:
- """Get rows to be sent over federation between the two tokens
- Args:
- instance_name: the name of the current process
- from_token: the previous stream token: the starting point for fetching the
- updates
- to_token: the new stream token: the point to get updates up to
- target_row_count: a target for the number of rows to be returned.
- Returns: a triplet `(updates, new_last_token, limited)`, where:
- * `updates` is a list of `(token, row)` entries.
- * `new_last_token` is the new position in stream.
- * `limited` is whether there are more updates to fetch.
- """
- # TODO: Handle target_row_count.
- # To handle restarts where we wrap around
- if from_token > self.pos:
- from_token = -1
- # list of tuple(int, BaseFederationRow), where the first is the position
- # of the federation stream.
- rows = [] # type: List[Tuple[int, BaseFederationRow]]
- # Fetch changed presence
- i = self.presence_changed.bisect_right(from_token)
- j = self.presence_changed.bisect_right(to_token) + 1
- dest_user_ids = [
- (pos, user_id)
- for pos, user_id_list in self.presence_changed.items()[i:j]
- for user_id in user_id_list
- ]
- for (key, user_id) in dest_user_ids:
- rows.append((key, PresenceRow(state=self.presence_map[user_id])))
- # Fetch presence to send to destinations
- i = self.presence_destinations.bisect_right(from_token)
- j = self.presence_destinations.bisect_right(to_token) + 1
- for pos, (user_id, dests) in self.presence_destinations.items()[i:j]:
- rows.append(
- (
- pos,
- PresenceDestinationsRow(
- state=self.presence_map[user_id], destinations=list(dests)
- ),
- )
- )
- # Fetch changes keyed edus
- i = self.keyed_edu_changed.bisect_right(from_token)
- j = self.keyed_edu_changed.bisect_right(to_token) + 1
- # We purposefully clobber based on the key here, python dict comprehensions
- # always use the last value, so this will correctly point to the last
- # stream position.
- keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
- for ((destination, edu_key), pos) in keyed_edus.items():
- rows.append(
- (
- pos,
- KeyedEduRow(
- key=edu_key, edu=self.keyed_edu[(destination, edu_key)]
- ),
- )
- )
- # Fetch changed edus
- i = self.edus.bisect_right(from_token)
- j = self.edus.bisect_right(to_token) + 1
- edus = self.edus.items()[i:j]
- for (pos, edu) in edus:
- rows.append((pos, EduRow(edu)))
- # Sort rows based on pos
- rows.sort()
- return (
- [(pos, (row.TypeId, row.to_data())) for pos, row in rows],
- to_token,
- False,
- )
- class BaseFederationRow:
- """Base class for rows to be sent in the federation stream.
- Specifies how to identify, serialize and deserialize the different types.
- """
- TypeId = "" # Unique string that ids the type. Must be overridden in sub classes.
- @staticmethod
- def from_data(data):
- """Parse the data from the federation stream into a row.
- Args:
- data: The value of ``data`` from FederationStreamRow.data, type
- depends on the type of stream
- """
- raise NotImplementedError()
- def to_data(self):
- """Serialize this row to be sent over the federation stream.
- Returns:
- The value to be sent in FederationStreamRow.data. The type depends
- on the type of stream.
- """
- raise NotImplementedError()
- def add_to_buffer(self, buff):
- """Add this row to the appropriate field in the buffer ready for this
- to be sent over federation.
- We use a buffer so that we can batch up events that have come in at
- the same time and send them all at once.
- Args:
- buff (BufferedToSend)
- """
- raise NotImplementedError()
- class PresenceRow(
- BaseFederationRow, namedtuple("PresenceRow", ("state",)) # UserPresenceState
- ):
- TypeId = "p"
- @staticmethod
- def from_data(data):
- return PresenceRow(state=UserPresenceState.from_dict(data))
- def to_data(self):
- return self.state.as_dict()
- def add_to_buffer(self, buff):
- buff.presence.append(self.state)
- class PresenceDestinationsRow(
- BaseFederationRow,
- namedtuple(
- "PresenceDestinationsRow",
- ("state", "destinations"), # UserPresenceState # list[str]
- ),
- ):
- TypeId = "pd"
- @staticmethod
- def from_data(data):
- return PresenceDestinationsRow(
- state=UserPresenceState.from_dict(data["state"]), destinations=data["dests"]
- )
- def to_data(self):
- return {"state": self.state.as_dict(), "dests": self.destinations}
- def add_to_buffer(self, buff):
- buff.presence_destinations.append((self.state, self.destinations))
- class KeyedEduRow(
- BaseFederationRow,
- namedtuple(
- "KeyedEduRow",
- ("key", "edu"), # tuple(str) - the edu key passed to send_edu # Edu
- ),
- ):
- """Streams EDUs that have an associated key that is ued to clobber. For example,
- typing EDUs clobber based on room_id.
- """
- TypeId = "k"
- @staticmethod
- def from_data(data):
- return KeyedEduRow(key=tuple(data["key"]), edu=Edu(**data["edu"]))
- def to_data(self):
- return {"key": self.key, "edu": self.edu.get_internal_dict()}
- def add_to_buffer(self, buff):
- buff.keyed_edus.setdefault(self.edu.destination, {})[self.key] = self.edu
- class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
- """Streams EDUs that don't have keys. See KeyedEduRow"""
- TypeId = "e"
- @staticmethod
- def from_data(data):
- return EduRow(Edu(**data))
- def to_data(self):
- return self.edu.get_internal_dict()
- def add_to_buffer(self, buff):
- buff.edus.setdefault(self.edu.destination, []).append(self.edu)
- _rowtypes = (
- PresenceRow,
- PresenceDestinationsRow,
- KeyedEduRow,
- EduRow,
- ) # type: Tuple[Type[BaseFederationRow], ...]
- TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
- ParsedFederationStreamData = namedtuple(
- "ParsedFederationStreamData",
- (
- "presence", # list(UserPresenceState)
- "presence_destinations", # list of tuples of UserPresenceState and destinations
- "keyed_edus", # dict of destination -> { key -> Edu }
- "edus", # dict of destination -> [Edu]
- ),
- )
- def process_rows_for_federation(transaction_queue, rows):
- """Parse a list of rows from the federation stream and put them in the
- transaction queue ready for sending to the relevant homeservers.
- Args:
- transaction_queue (FederationSender)
- rows (list(synapse.replication.tcp.streams.federation.FederationStream.FederationStreamRow))
- """
- # The federation stream contains a bunch of different types of
- # rows that need to be handled differently. We parse the rows, put
- # them into the appropriate collection and then send them off.
- buff = ParsedFederationStreamData(
- presence=[],
- presence_destinations=[],
- keyed_edus={},
- edus={},
- )
- # Parse the rows in the stream and add to the buffer
- for row in rows:
- if row.type not in TypeToRow:
- logger.error("Unrecognized federation row type %r", row.type)
- continue
- RowType = TypeToRow[row.type]
- parsed_row = RowType.from_data(row.data)
- parsed_row.add_to_buffer(buff)
- if buff.presence:
- transaction_queue.send_presence(buff.presence)
- for state, destinations in buff.presence_destinations:
- transaction_queue.send_presence_to_destinations(
- states=[state], destinations=destinations
- )
- for destination, edu_map in buff.keyed_edus.items():
- for key, edu in edu_map.items():
- transaction_queue.send_edu(edu, key)
- for destination, edu_list in buff.edus.items():
- for edu in edu_list:
- transaction_queue.send_edu(edu, None)
|