|
@@ -14,26 +14,19 @@
|
|
|
|
|
|
import abc
|
|
|
import logging
|
|
|
-from typing import (
|
|
|
- TYPE_CHECKING,
|
|
|
- Collection,
|
|
|
- Dict,
|
|
|
- Hashable,
|
|
|
- Iterable,
|
|
|
- List,
|
|
|
- Optional,
|
|
|
- Set,
|
|
|
- Tuple,
|
|
|
-)
|
|
|
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
|
|
|
|
|
|
from prometheus_client import Counter
|
|
|
|
|
|
+from twisted.internet import defer
|
|
|
+
|
|
|
import synapse.metrics
|
|
|
from synapse.api.presence import UserPresenceState
|
|
|
from synapse.events import EventBase
|
|
|
from synapse.federation.sender.per_destination_queue import PerDestinationQueue
|
|
|
from synapse.federation.sender.transaction_manager import TransactionManager
|
|
|
from synapse.federation.units import Edu
|
|
|
+from synapse.logging.context import make_deferred_yieldable, run_in_background
|
|
|
from synapse.metrics import (
|
|
|
LaterGauge,
|
|
|
event_processing_loop_counter,
|
|
@@ -262,27 +255,15 @@ class FederationSender(AbstractFederationSender):
|
|
|
if not events and next_token >= self._last_poked_id:
|
|
|
break
|
|
|
|
|
|
- async def get_destinations_for_event(
|
|
|
- event: EventBase,
|
|
|
- ) -> Collection[str]:
|
|
|
- """Computes the destinations to which this event must be sent.
|
|
|
-
|
|
|
- This returns an empty tuple when there are no destinations to send to,
|
|
|
- or if this event is not from this homeserver and it is not sending
|
|
|
- it on behalf of another server.
|
|
|
-
|
|
|
- Will also filter out destinations which this sender is not responsible for,
|
|
|
- if multiple federation senders exist.
|
|
|
- """
|
|
|
-
|
|
|
+ async def handle_event(event: EventBase) -> None:
|
|
|
# Only send events for this server.
|
|
|
send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
|
|
|
is_mine = self.is_mine_id(event.sender)
|
|
|
if not is_mine and send_on_behalf_of is None:
|
|
|
- return ()
|
|
|
+ return
|
|
|
|
|
|
if not event.internal_metadata.should_proactively_send():
|
|
|
- return ()
|
|
|
+ return
|
|
|
|
|
|
destinations = None # type: Optional[Set[str]]
|
|
|
if not event.prev_event_ids():
|
|
@@ -317,7 +298,7 @@ class FederationSender(AbstractFederationSender):
|
|
|
"Failed to calculate hosts in room for event: %s",
|
|
|
event.event_id,
|
|
|
)
|
|
|
- return ()
|
|
|
+ return
|
|
|
|
|
|
destinations = {
|
|
|
d
|
|
@@ -327,15 +308,17 @@ class FederationSender(AbstractFederationSender):
|
|
|
)
|
|
|
}
|
|
|
|
|
|
- destinations.discard(self.server_name)
|
|
|
-
|
|
|
if send_on_behalf_of is not None:
|
|
|
# If we are sending the event on behalf of another server
|
|
|
# then it already has the event and there is no reason to
|
|
|
# send the event to it.
|
|
|
destinations.discard(send_on_behalf_of)
|
|
|
|
|
|
+ logger.debug("Sending %s to %r", event, destinations)
|
|
|
+
|
|
|
if destinations:
|
|
|
+ await self._send_pdu(event, destinations)
|
|
|
+
|
|
|
now = self.clock.time_msec()
|
|
|
ts = await self.store.get_received_ts(event.event_id)
|
|
|
|
|
@@ -343,29 +326,24 @@ class FederationSender(AbstractFederationSender):
|
|
|
"federation_sender"
|
|
|
).observe((now - ts) / 1000)
|
|
|
|
|
|
- return destinations
|
|
|
- return ()
|
|
|
-
|
|
|
- async def get_federatable_events_and_destinations(
|
|
|
- events: Iterable[EventBase],
|
|
|
- ) -> List[Tuple[EventBase, Collection[str]]]:
|
|
|
- with Measure(self.clock, "get_destinations_for_events"):
|
|
|
- # Fetch federation destinations per event,
|
|
|
- # skip if get_destinations_for_event returns an empty collection,
|
|
|
- # return list of event->destinations pairs.
|
|
|
- return [
|
|
|
- (event, dests)
|
|
|
- for (event, dests) in [
|
|
|
- (event, await get_destinations_for_event(event))
|
|
|
- for event in events
|
|
|
- ]
|
|
|
- if dests
|
|
|
- ]
|
|
|
-
|
|
|
- events_and_dests = await get_federatable_events_and_destinations(events)
|
|
|
-
|
|
|
- # Send corresponding events to each destination queue
|
|
|
- await self._distribute_events(events_and_dests)
|
|
|
+ async def handle_room_events(events: Iterable[EventBase]) -> None:
|
|
|
+ with Measure(self.clock, "handle_room_events"):
|
|
|
+ for event in events:
|
|
|
+ await handle_event(event)
|
|
|
+
|
|
|
+ events_by_room = {} # type: Dict[str, List[EventBase]]
|
|
|
+ for event in events:
|
|
|
+ events_by_room.setdefault(event.room_id, []).append(event)
|
|
|
+
|
|
|
+ await make_deferred_yieldable(
|
|
|
+ defer.gatherResults(
|
|
|
+ [
|
|
|
+ run_in_background(handle_room_events, evs)
|
|
|
+ for evs in events_by_room.values()
|
|
|
+ ],
|
|
|
+ consumeErrors=True,
|
|
|
+ )
|
|
|
+ )
|
|
|
|
|
|
await self.store.update_federation_out_pos("events", next_token)
|
|
|
|
|
@@ -383,7 +361,7 @@ class FederationSender(AbstractFederationSender):
|
|
|
events_processed_counter.inc(len(events))
|
|
|
|
|
|
event_processing_loop_room_count.labels("federation_sender").inc(
|
|
|
- len({event.room_id for event in events})
|
|
|
+ len(events_by_room)
|
|
|
)
|
|
|
|
|
|
event_processing_loop_counter.labels("federation_sender").inc()
|
|
@@ -395,53 +373,34 @@ class FederationSender(AbstractFederationSender):
|
|
|
finally:
|
|
|
self._is_processing = False
|
|
|
|
|
|
- async def _distribute_events(
|
|
|
- self,
|
|
|
- events_and_dests: Iterable[Tuple[EventBase, Collection[str]]],
|
|
|
- ) -> None:
|
|
|
- """Distribute events to the respective per_destination queues.
|
|
|
-
|
|
|
- Also persists last-seen per-room stream_ordering to 'destination_rooms'.
|
|
|
-
|
|
|
- Args:
|
|
|
- events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]).
|
|
|
- Every event is paired with its intended destinations (in federation).
|
|
|
- """
|
|
|
- # Tuples of room_id + destination to their max-seen stream_ordering
|
|
|
- room_with_dest_stream_ordering = {} # type: Dict[Tuple[str, str], int]
|
|
|
-
|
|
|
- # List of events to send to each destination
|
|
|
- events_by_dest = {} # type: Dict[str, List[EventBase]]
|
|
|
+ async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
|
|
|
+ # We loop through all destinations to see whether we already have
|
|
|
+ # a transaction in progress. If we do, stick it in the pending_pdus
|
|
|
+ # table and we'll get back to it later.
|
|
|
|
|
|
- # For each event-destinations pair...
|
|
|
- for event, destinations in events_and_dests:
|
|
|
+ destinations = set(destinations)
|
|
|
+ destinations.discard(self.server_name)
|
|
|
+ logger.debug("Sending to: %s", str(destinations))
|
|
|
|
|
|
- # (we got this from the database, it's filled)
|
|
|
- assert event.internal_metadata.stream_ordering
|
|
|
-
|
|
|
- sent_pdus_destination_dist_total.inc(len(destinations))
|
|
|
- sent_pdus_destination_dist_count.inc()
|
|
|
+ if not destinations:
|
|
|
+ return
|
|
|
|
|
|
- # ...iterate over those destinations..
|
|
|
- for destination in destinations:
|
|
|
- # ...update their stream-ordering...
|
|
|
- room_with_dest_stream_ordering[(event.room_id, destination)] = max(
|
|
|
- event.internal_metadata.stream_ordering,
|
|
|
- room_with_dest_stream_ordering.get((event.room_id, destination), 0),
|
|
|
- )
|
|
|
+ sent_pdus_destination_dist_total.inc(len(destinations))
|
|
|
+ sent_pdus_destination_dist_count.inc()
|
|
|
|
|
|
- # ...and add the event to each destination queue.
|
|
|
- events_by_dest.setdefault(destination, []).append(event)
|
|
|
+ assert pdu.internal_metadata.stream_ordering
|
|
|
|
|
|
- # Bulk-store destination_rooms stream_ids
|
|
|
- await self.store.bulk_store_destination_rooms_entries(
|
|
|
- room_with_dest_stream_ordering
|
|
|
+ # track the fact that we have a PDU for these destinations,
|
|
|
+ # to allow us to perform catch-up later on if the remote is unreachable
|
|
|
+ # for a while.
|
|
|
+ await self.store.store_destination_rooms_entries(
|
|
|
+ destinations,
|
|
|
+ pdu.room_id,
|
|
|
+ pdu.internal_metadata.stream_ordering,
|
|
|
)
|
|
|
|
|
|
- for destination, pdus in events_by_dest.items():
|
|
|
- logger.debug("Sending %d pdus to %s", len(pdus), destination)
|
|
|
-
|
|
|
- self._get_per_destination_queue(destination).send_pdus(pdus)
|
|
|
+ for destination in destinations:
|
|
|
+ self._get_per_destination_queue(destination).send_pdu(pdu)
|
|
|
|
|
|
async def send_read_receipt(self, receipt: ReadReceipt) -> None:
|
|
|
"""Send a RR to any other servers in the room
|