|
@@ -18,7 +18,6 @@
|
|
|
|
|
|
import itertools
|
|
|
import logging
|
|
|
-import sys
|
|
|
|
|
|
import six
|
|
|
from six import iteritems, itervalues
|
|
@@ -54,7 +53,7 @@ from synapse.replication.http.federation import (
|
|
|
ReplicationFederationSendEventsRestServlet,
|
|
|
)
|
|
|
from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
|
|
|
-from synapse.state import resolve_events_with_factory
|
|
|
+from synapse.state import StateResolutionStore, resolve_events_with_store
|
|
|
from synapse.types import UserID, get_domain_from_id
|
|
|
from synapse.util import logcontext, unwrapFirstError
|
|
|
from synapse.util.async_helpers import Linearizer
|
|
@@ -69,6 +68,27 @@ from ._base import BaseHandler
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
+def shortstr(iterable, maxitems=5):
|
|
|
+ """If iterable has maxitems or fewer, return the stringification of a list
|
|
|
+ containing those items.
|
|
|
+
|
|
|
+ Otherwise, return the stringification of a a list with the first maxitems items,
|
|
|
+ followed by "...".
|
|
|
+
|
|
|
+ Args:
|
|
|
+ iterable (Iterable): iterable to truncate
|
|
|
+ maxitems (int): number of items to return before truncating
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ unicode
|
|
|
+ """
|
|
|
+
|
|
|
+ items = list(itertools.islice(iterable, maxitems + 1))
|
|
|
+ if len(items) <= maxitems:
|
|
|
+ return str(items)
|
|
|
+ return u"[" + u", ".join(repr(r) for r in items[:maxitems]) + u", ...]"
|
|
|
+
|
|
|
+
|
|
|
class FederationHandler(BaseHandler):
|
|
|
"""Handles events that originated from federation.
|
|
|
Responsible for:
|
|
@@ -85,7 +105,7 @@ class FederationHandler(BaseHandler):
|
|
|
|
|
|
self.hs = hs
|
|
|
|
|
|
- self.store = hs.get_datastore()
|
|
|
+ self.store = hs.get_datastore() # type: synapse.storage.DataStore
|
|
|
self.federation_client = hs.get_federation_client()
|
|
|
self.state_handler = hs.get_state_handler()
|
|
|
self.server_name = hs.hostname
|
|
@@ -114,9 +134,8 @@ class FederationHandler(BaseHandler):
|
|
|
self._room_pdu_linearizer = Linearizer("fed_room_pdu")
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
- @log_function
|
|
|
def on_receive_pdu(
|
|
|
- self, origin, pdu, get_missing=True, sent_to_us_directly=False,
|
|
|
+ self, origin, pdu, sent_to_us_directly=False,
|
|
|
):
|
|
|
""" Process a PDU received via a federation /send/ transaction, or
|
|
|
via backfill of missing prev_events
|
|
@@ -125,14 +144,23 @@ class FederationHandler(BaseHandler):
|
|
|
origin (str): server which initiated the /send/ transaction. Will
|
|
|
be used to fetch missing events or state.
|
|
|
pdu (FrozenEvent): received PDU
|
|
|
- get_missing (bool): True if we should fetch missing prev_events
|
|
|
+ sent_to_us_directly (bool): True if this event was pushed to us; False if
|
|
|
+ we pulled it as the result of a missing prev_event.
|
|
|
|
|
|
Returns (Deferred): completes with None
|
|
|
"""
|
|
|
|
|
|
+ room_id = pdu.room_id
|
|
|
+ event_id = pdu.event_id
|
|
|
+
|
|
|
+ logger.info(
|
|
|
+ "[%s %s] handling received PDU: %s",
|
|
|
+ room_id, event_id, pdu,
|
|
|
+ )
|
|
|
+
|
|
|
# We reprocess pdus when we have seen them only as outliers
|
|
|
existing = yield self.store.get_event(
|
|
|
- pdu.event_id,
|
|
|
+ event_id,
|
|
|
allow_none=True,
|
|
|
allow_rejected=True,
|
|
|
)
|
|
@@ -147,7 +175,7 @@ class FederationHandler(BaseHandler):
|
|
|
)
|
|
|
)
|
|
|
if already_seen:
|
|
|
- logger.debug("Already seen pdu %s", pdu.event_id)
|
|
|
+ logger.debug("[%s %s]: Already seen pdu", room_id, event_id)
|
|
|
return
|
|
|
|
|
|
# do some initial sanity-checking of the event. In particular, make
|
|
@@ -156,6 +184,7 @@ class FederationHandler(BaseHandler):
|
|
|
try:
|
|
|
self._sanity_check_event(pdu)
|
|
|
except SynapseError as err:
|
|
|
+ logger.warn("[%s %s] Received event failed sanity checks", room_id, event_id)
|
|
|
raise FederationError(
|
|
|
"ERROR",
|
|
|
err.code,
|
|
@@ -165,10 +194,12 @@ class FederationHandler(BaseHandler):
|
|
|
|
|
|
# If we are currently in the process of joining this room, then we
|
|
|
# queue up events for later processing.
|
|
|
- if pdu.room_id in self.room_queues:
|
|
|
- logger.info("Ignoring PDU %s for room %s from %s for now; join "
|
|
|
- "in progress", pdu.event_id, pdu.room_id, origin)
|
|
|
- self.room_queues[pdu.room_id].append((pdu, origin))
|
|
|
+ if room_id in self.room_queues:
|
|
|
+ logger.info(
|
|
|
+ "[%s %s] Queuing PDU from %s for now: join in progress",
|
|
|
+ room_id, event_id, origin,
|
|
|
+ )
|
|
|
+ self.room_queues[room_id].append((pdu, origin))
|
|
|
return
|
|
|
|
|
|
# If we're no longer in the room just ditch the event entirely. This
|
|
@@ -179,7 +210,7 @@ class FederationHandler(BaseHandler):
|
|
|
# we should check if we *are* in fact in the room. If we are then we
|
|
|
# can magically rejoin the room.
|
|
|
is_in_room = yield self.auth.check_host_in_room(
|
|
|
- pdu.room_id,
|
|
|
+ room_id,
|
|
|
self.server_name
|
|
|
)
|
|
|
if not is_in_room:
|
|
@@ -188,8 +219,8 @@ class FederationHandler(BaseHandler):
|
|
|
)
|
|
|
if was_in_room:
|
|
|
logger.info(
|
|
|
- "Ignoring PDU %s for room %s from %s as we've left the room!",
|
|
|
- pdu.event_id, pdu.room_id, origin,
|
|
|
+ "[%s %s] Ignoring PDU from %s as we've left the room",
|
|
|
+ room_id, event_id, origin,
|
|
|
)
|
|
|
defer.returnValue(None)
|
|
|
|
|
@@ -204,8 +235,8 @@ class FederationHandler(BaseHandler):
|
|
|
)
|
|
|
|
|
|
logger.debug(
|
|
|
- "_handle_new_pdu min_depth for %s: %d",
|
|
|
- pdu.room_id, min_depth
|
|
|
+ "[%s %s] min_depth: %d",
|
|
|
+ room_id, event_id, min_depth,
|
|
|
)
|
|
|
|
|
|
prevs = {e_id for e_id, _ in pdu.prev_events}
|
|
@@ -218,17 +249,18 @@ class FederationHandler(BaseHandler):
|
|
|
# send to the clients.
|
|
|
pdu.internal_metadata.outlier = True
|
|
|
elif min_depth and pdu.depth > min_depth:
|
|
|
- if get_missing and prevs - seen:
|
|
|
+ missing_prevs = prevs - seen
|
|
|
+ if sent_to_us_directly and missing_prevs:
|
|
|
# If we're missing stuff, ensure we only fetch stuff one
|
|
|
# at a time.
|
|
|
logger.info(
|
|
|
- "Acquiring lock for room %r to fetch %d missing events: %r...",
|
|
|
- pdu.room_id, len(prevs - seen), list(prevs - seen)[:5],
|
|
|
+ "[%s %s] Acquiring room lock to fetch %d missing prev_events: %s",
|
|
|
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
|
|
|
)
|
|
|
with (yield self._room_pdu_linearizer.queue(pdu.room_id)):
|
|
|
logger.info(
|
|
|
- "Acquired lock for room %r to fetch %d missing events",
|
|
|
- pdu.room_id, len(prevs - seen),
|
|
|
+ "[%s %s] Acquired room lock to fetch %d missing prev_events",
|
|
|
+ room_id, event_id, len(missing_prevs),
|
|
|
)
|
|
|
|
|
|
yield self._get_missing_events_for_pdu(
|
|
@@ -241,68 +273,150 @@ class FederationHandler(BaseHandler):
|
|
|
|
|
|
if not prevs - seen:
|
|
|
logger.info(
|
|
|
- "Found all missing prev events for %s", pdu.event_id
|
|
|
+ "[%s %s] Found all missing prev_events",
|
|
|
+ room_id, event_id,
|
|
|
)
|
|
|
- elif prevs - seen:
|
|
|
+ elif missing_prevs:
|
|
|
logger.info(
|
|
|
- "Not fetching %d missing events for room %r,event %s: %r...",
|
|
|
- len(prevs - seen), pdu.room_id, pdu.event_id,
|
|
|
- list(prevs - seen)[:5],
|
|
|
+ "[%s %s] Not recursively fetching %d missing prev_events: %s",
|
|
|
+ room_id, event_id, len(missing_prevs), shortstr(missing_prevs),
|
|
|
)
|
|
|
|
|
|
- if sent_to_us_directly and prevs - seen:
|
|
|
- # If they have sent it to us directly, and the server
|
|
|
- # isn't telling us about the auth events that it's
|
|
|
- # made a message referencing, we explode
|
|
|
- raise FederationError(
|
|
|
- "ERROR",
|
|
|
- 403,
|
|
|
- (
|
|
|
- "Your server isn't divulging details about prev_events "
|
|
|
- "referenced in this event."
|
|
|
- ),
|
|
|
- affected=pdu.event_id,
|
|
|
- )
|
|
|
- elif prevs - seen:
|
|
|
- # Calculate the state of the previous events, and
|
|
|
- # de-conflict them to find the current state.
|
|
|
- state_groups = []
|
|
|
+ if prevs - seen:
|
|
|
+ # We've still not been able to get all of the prev_events for this event.
|
|
|
+ #
|
|
|
+ # In this case, we need to fall back to asking another server in the
|
|
|
+ # federation for the state at this event. That's ok provided we then
|
|
|
+ # resolve the state against other bits of the DAG before using it (which
|
|
|
+ # will ensure that you can't just take over a room by sending an event,
|
|
|
+ # withholding its prev_events, and declaring yourself to be an admin in
|
|
|
+ # the subsequent state request).
|
|
|
+ #
|
|
|
+ # Now, if we're pulling this event as a missing prev_event, then clearly
|
|
|
+ # this event is not going to become the only forward-extremity and we are
|
|
|
+ # guaranteed to resolve its state against our existing forward
|
|
|
+ # extremities, so that should be fine.
|
|
|
+ #
|
|
|
+ # On the other hand, if this event was pushed to us, it is possible for
|
|
|
+ # it to become the only forward-extremity in the room, and we would then
|
|
|
+ # trust its state to be the state for the whole room. This is very bad.
|
|
|
+ # Further, if the event was pushed to us, there is no excuse for us not to
|
|
|
+ # have all the prev_events. We therefore reject any such events.
|
|
|
+ #
|
|
|
+ # XXX this really feels like it could/should be merged with the above,
|
|
|
+ # but there is an interaction with min_depth that I'm not really
|
|
|
+ # following.
|
|
|
+
|
|
|
+ if sent_to_us_directly:
|
|
|
+ logger.warn(
|
|
|
+ "[%s %s] Rejecting: failed to fetch %d prev events: %s",
|
|
|
+ room_id, event_id, len(prevs - seen), shortstr(prevs - seen)
|
|
|
+ )
|
|
|
+ raise FederationError(
|
|
|
+ "ERROR",
|
|
|
+ 403,
|
|
|
+ (
|
|
|
+ "Your server isn't divulging details about prev_events "
|
|
|
+ "referenced in this event."
|
|
|
+ ),
|
|
|
+ affected=pdu.event_id,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Calculate the state after each of the previous events, and
|
|
|
+ # resolve them to find the correct state at the current event.
|
|
|
auth_chains = set()
|
|
|
+ event_map = {
|
|
|
+ event_id: pdu,
|
|
|
+ }
|
|
|
try:
|
|
|
# Get the state of the events we know about
|
|
|
- ours = yield self.store.get_state_groups(pdu.room_id, list(seen))
|
|
|
- state_groups.append(ours)
|
|
|
+ ours = yield self.store.get_state_groups_ids(room_id, seen)
|
|
|
+
|
|
|
+ # state_maps is a list of mappings from (type, state_key) to event_id
|
|
|
+ # type: list[dict[tuple[str, str], str]]
|
|
|
+ state_maps = list(ours.values())
|
|
|
+
|
|
|
+ # we don't need this any more, let's delete it.
|
|
|
+ del ours
|
|
|
|
|
|
# Ask the remote server for the states we don't
|
|
|
# know about
|
|
|
for p in prevs - seen:
|
|
|
- state, got_auth_chain = (
|
|
|
- yield self.federation_client.get_state_for_room(
|
|
|
- origin, pdu.room_id, p
|
|
|
- )
|
|
|
- )
|
|
|
- auth_chains.update(got_auth_chain)
|
|
|
- state_group = {(x.type, x.state_key): x.event_id for x in state}
|
|
|
- state_groups.append(state_group)
|
|
|
-
|
|
|
- # Resolve any conflicting state
|
|
|
- def fetch(ev_ids):
|
|
|
- return self.store.get_events(
|
|
|
- ev_ids, get_prev_content=False, check_redacted=False
|
|
|
+ logger.info(
|
|
|
+ "[%s %s] Requesting state at missing prev_event %s",
|
|
|
+ room_id, event_id, p,
|
|
|
)
|
|
|
|
|
|
- state_map = yield resolve_events_with_factory(
|
|
|
- state_groups, {pdu.event_id: pdu}, fetch
|
|
|
+ with logcontext.nested_logging_context(p):
|
|
|
+ # note that if any of the missing prevs share missing state or
|
|
|
+ # auth events, the requests to fetch those events are deduped
|
|
|
+ # by the get_pdu_cache in federation_client.
|
|
|
+ remote_state, got_auth_chain = (
|
|
|
+ yield self.federation_client.get_state_for_room(
|
|
|
+ origin, room_id, p,
|
|
|
+ )
|
|
|
+ )
|
|
|
+
|
|
|
+ # we want the state *after* p; get_state_for_room returns the
|
|
|
+ # state *before* p.
|
|
|
+ remote_event = yield self.federation_client.get_pdu(
|
|
|
+ [origin], p, outlier=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ if remote_event is None:
|
|
|
+ raise Exception(
|
|
|
+ "Unable to get missing prev_event %s" % (p, )
|
|
|
+ )
|
|
|
+
|
|
|
+ if remote_event.is_state():
|
|
|
+ remote_state.append(remote_event)
|
|
|
+
|
|
|
+ # XXX hrm I'm not convinced that duplicate events will compare
|
|
|
+ # for equality, so I'm not sure this does what the author
|
|
|
+ # hoped.
|
|
|
+ auth_chains.update(got_auth_chain)
|
|
|
+
|
|
|
+ remote_state_map = {
|
|
|
+ (x.type, x.state_key): x.event_id for x in remote_state
|
|
|
+ }
|
|
|
+ state_maps.append(remote_state_map)
|
|
|
+
|
|
|
+ for x in remote_state:
|
|
|
+ event_map[x.event_id] = x
|
|
|
+
|
|
|
+ room_version = yield self.store.get_room_version(room_id)
|
|
|
+ state_map = yield resolve_events_with_store(
|
|
|
+ room_version, state_maps, event_map,
|
|
|
+ state_res_store=StateResolutionStore(self.store),
|
|
|
+ )
|
|
|
+
|
|
|
+ # We need to give _process_received_pdu the actual state events
|
|
|
+ # rather than event ids, so generate that now.
|
|
|
+
|
|
|
+ # First though we need to fetch all the events that are in
|
|
|
+ # state_map, so we can build up the state below.
|
|
|
+ evs = yield self.store.get_events(
|
|
|
+ list(state_map.values()),
|
|
|
+ get_prev_content=False,
|
|
|
+ check_redacted=False,
|
|
|
)
|
|
|
+ event_map.update(evs)
|
|
|
|
|
|
- state = (yield self.store.get_events(state_map.values())).values()
|
|
|
+ state = [
|
|
|
+ event_map[e] for e in six.itervalues(state_map)
|
|
|
+ ]
|
|
|
auth_chain = list(auth_chains)
|
|
|
except Exception:
|
|
|
+ logger.warn(
|
|
|
+ "[%s %s] Error attempting to resolve state at missing "
|
|
|
+ "prev_events",
|
|
|
+ room_id, event_id, exc_info=True,
|
|
|
+ )
|
|
|
raise FederationError(
|
|
|
"ERROR",
|
|
|
403,
|
|
|
"We can't get valid state history.",
|
|
|
- affected=pdu.event_id,
|
|
|
+ affected=event_id,
|
|
|
)
|
|
|
|
|
|
yield self._process_received_pdu(
|
|
@@ -321,15 +435,16 @@ class FederationHandler(BaseHandler):
|
|
|
prevs (set(str)): List of event ids which we are missing
|
|
|
min_depth (int): Minimum depth of events to return.
|
|
|
"""
|
|
|
- # We recalculate seen, since it may have changed.
|
|
|
+
|
|
|
+ room_id = pdu.room_id
|
|
|
+ event_id = pdu.event_id
|
|
|
+
|
|
|
seen = yield self.store.have_seen_events(prevs)
|
|
|
|
|
|
if not prevs - seen:
|
|
|
return
|
|
|
|
|
|
- latest = yield self.store.get_latest_event_ids_in_room(
|
|
|
- pdu.room_id
|
|
|
- )
|
|
|
+ latest = yield self.store.get_latest_event_ids_in_room(room_id)
|
|
|
|
|
|
# We add the prev events that we have seen to the latest
|
|
|
# list to ensure the remote server doesn't give them to us
|
|
@@ -337,8 +452,8 @@ class FederationHandler(BaseHandler):
|
|
|
latest |= seen
|
|
|
|
|
|
logger.info(
|
|
|
- "Missing %d events for room %r pdu %s: %r...",
|
|
|
- len(prevs - seen), pdu.room_id, pdu.event_id, list(prevs - seen)[:5]
|
|
|
+ "[%s %s]: Requesting missing events between %s and %s",
|
|
|
+ room_id, event_id, shortstr(latest), event_id,
|
|
|
)
|
|
|
|
|
|
# XXX: we set timeout to 10s to help workaround
|
|
@@ -359,49 +474,88 @@ class FederationHandler(BaseHandler):
|
|
|
# apparently.
|
|
|
#
|
|
|
# see https://github.com/matrix-org/synapse/pull/1744
|
|
|
+ #
|
|
|
+ # ----
|
|
|
+ #
|
|
|
+ # Update richvdh 2018/09/18: There are a number of problems with timing this
|
|
|
+ # request out agressively on the client side:
|
|
|
+ #
|
|
|
+ # - it plays badly with the server-side rate-limiter, which starts tarpitting you
|
|
|
+ # if you send too many requests at once, so you end up with the server carefully
|
|
|
+ # working through the backlog of your requests, which you have already timed
|
|
|
+ # out.
|
|
|
+ #
|
|
|
+ # - for this request in particular, we now (as of
|
|
|
+ # https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
|
|
|
+ # server can't produce a plausible-looking set of prev_events - so we becone
|
|
|
+ # much more likely to reject the event.
|
|
|
+ #
|
|
|
+ # - contrary to what it says above, we do *not* fall back to fetching fresh state
|
|
|
+ # for the room if get_missing_events times out. Rather, we give up processing
|
|
|
+ # the PDU whose prevs we are missing, which then makes it much more likely that
|
|
|
+ # we'll end up back here for the *next* PDU in the list, which exacerbates the
|
|
|
+ # problem.
|
|
|
+ #
|
|
|
+ # - the agressive 10s timeout was introduced to deal with incoming federation
|
|
|
+ # requests taking 8 hours to process. It's not entirely clear why that was going
|
|
|
+ # on; certainly there were other issues causing traffic storms which are now
|
|
|
+ # resolved, and I think in any case we may be more sensible about our locking
|
|
|
+ # now. We're *certainly* more sensible about our logging.
|
|
|
+ #
|
|
|
+ # All that said: Let's try increasing the timout to 60s and see what happens.
|
|
|
|
|
|
missing_events = yield self.federation_client.get_missing_events(
|
|
|
origin,
|
|
|
- pdu.room_id,
|
|
|
+ room_id,
|
|
|
earliest_events_ids=list(latest),
|
|
|
latest_events=[pdu],
|
|
|
limit=10,
|
|
|
min_depth=min_depth,
|
|
|
- timeout=10000,
|
|
|
+ timeout=60000,
|
|
|
)
|
|
|
|
|
|
logger.info(
|
|
|
- "Got %d events: %r...",
|
|
|
- len(missing_events), [e.event_id for e in missing_events[:5]]
|
|
|
+ "[%s %s]: Got %d prev_events: %s",
|
|
|
+ room_id, event_id, len(missing_events), shortstr(missing_events),
|
|
|
)
|
|
|
|
|
|
# We want to sort these by depth so we process them and
|
|
|
# tell clients about them in order.
|
|
|
missing_events.sort(key=lambda x: x.depth)
|
|
|
|
|
|
- for e in missing_events:
|
|
|
- logger.info("Handling found event %s", e.event_id)
|
|
|
- try:
|
|
|
- yield self.on_receive_pdu(
|
|
|
- origin,
|
|
|
- e,
|
|
|
- get_missing=False
|
|
|
- )
|
|
|
- except FederationError as e:
|
|
|
- if e.code == 403:
|
|
|
- logger.warn("Event %s failed history check.")
|
|
|
- else:
|
|
|
- raise
|
|
|
+ for ev in missing_events:
|
|
|
+ logger.info(
|
|
|
+ "[%s %s] Handling received prev_event %s",
|
|
|
+ room_id, event_id, ev.event_id,
|
|
|
+ )
|
|
|
+ with logcontext.nested_logging_context(ev.event_id):
|
|
|
+ try:
|
|
|
+ yield self.on_receive_pdu(
|
|
|
+ origin,
|
|
|
+ ev,
|
|
|
+ sent_to_us_directly=False,
|
|
|
+ )
|
|
|
+ except FederationError as e:
|
|
|
+ if e.code == 403:
|
|
|
+ logger.warn(
|
|
|
+ "[%s %s] Received prev_event %s failed history check.",
|
|
|
+ room_id, event_id, ev.event_id,
|
|
|
+ )
|
|
|
+ else:
|
|
|
+ raise
|
|
|
|
|
|
- @log_function
|
|
|
@defer.inlineCallbacks
|
|
|
- def _process_received_pdu(self, origin, pdu, state, auth_chain):
|
|
|
+ def _process_received_pdu(self, origin, event, state, auth_chain):
|
|
|
""" Called when we have a new pdu. We need to do auth checks and put it
|
|
|
through the StateHandler.
|
|
|
"""
|
|
|
- event = pdu
|
|
|
+ room_id = event.room_id
|
|
|
+ event_id = event.event_id
|
|
|
|
|
|
- logger.debug("Processing event: %s", event)
|
|
|
+ logger.debug(
|
|
|
+ "[%s %s] Processing event: %s",
|
|
|
+ room_id, event_id, event,
|
|
|
+ )
|
|
|
|
|
|
# FIXME (erikj): Awful hack to make the case where we are not currently
|
|
|
# in the room work
|
|
@@ -410,15 +564,16 @@ class FederationHandler(BaseHandler):
|
|
|
# event.
|
|
|
if state and auth_chain and not event.internal_metadata.is_outlier():
|
|
|
is_in_room = yield self.auth.check_host_in_room(
|
|
|
- event.room_id,
|
|
|
+ room_id,
|
|
|
self.server_name
|
|
|
)
|
|
|
else:
|
|
|
is_in_room = True
|
|
|
+
|
|
|
if not is_in_room:
|
|
|
logger.info(
|
|
|
- "Got event for room we're not in: %r %r",
|
|
|
- event.room_id, event.event_id
|
|
|
+ "[%s %s] Got event for room we're not in",
|
|
|
+ room_id, event_id,
|
|
|
)
|
|
|
|
|
|
try:
|
|
@@ -430,7 +585,7 @@ class FederationHandler(BaseHandler):
|
|
|
"ERROR",
|
|
|
e.code,
|
|
|
e.msg,
|
|
|
- affected=event.event_id,
|
|
|
+ affected=event_id,
|
|
|
)
|
|
|
|
|
|
else:
|
|
@@ -463,6 +618,10 @@ class FederationHandler(BaseHandler):
|
|
|
})
|
|
|
seen_ids.add(e.event_id)
|
|
|
|
|
|
+ logger.info(
|
|
|
+ "[%s %s] persisting newly-received auth/state events %s",
|
|
|
+ room_id, event_id, [e["event"].event_id for e in event_infos]
|
|
|
+ )
|
|
|
yield self._handle_new_events(origin, event_infos)
|
|
|
|
|
|
try:
|
|
@@ -479,12 +638,12 @@ class FederationHandler(BaseHandler):
|
|
|
affected=event.event_id,
|
|
|
)
|
|
|
|
|
|
- room = yield self.store.get_room(event.room_id)
|
|
|
+ room = yield self.store.get_room(room_id)
|
|
|
|
|
|
if not room:
|
|
|
try:
|
|
|
yield self.store.store_room(
|
|
|
- room_id=event.room_id,
|
|
|
+ room_id=room_id,
|
|
|
room_creator_user_id="",
|
|
|
is_public=False,
|
|
|
)
|
|
@@ -512,7 +671,7 @@ class FederationHandler(BaseHandler):
|
|
|
|
|
|
if newly_joined:
|
|
|
user = UserID.from_string(event.state_key)
|
|
|
- yield self.user_joined_room(user, event.room_id)
|
|
|
+ yield self.user_joined_room(user, room_id)
|
|
|
|
|
|
@log_function
|
|
|
@defer.inlineCallbacks
|
|
@@ -593,7 +752,7 @@ class FederationHandler(BaseHandler):
|
|
|
|
|
|
required_auth = set(
|
|
|
a_id
|
|
|
- for event in events + state_events.values() + auth_events.values()
|
|
|
+ for event in events + list(state_events.values()) + list(auth_events.values())
|
|
|
for a_id, _ in event.auth_events
|
|
|
)
|
|
|
auth_events.update({
|
|
@@ -801,7 +960,7 @@ class FederationHandler(BaseHandler):
|
|
|
)
|
|
|
continue
|
|
|
except NotRetryingDestination as e:
|
|
|
- logger.info(e.message)
|
|
|
+ logger.info(str(e))
|
|
|
continue
|
|
|
except FederationDeniedError as e:
|
|
|
logger.info(e)
|
|
@@ -1026,7 +1185,8 @@ class FederationHandler(BaseHandler):
|
|
|
try:
|
|
|
logger.info("Processing queued PDU %s which was received "
|
|
|
"while we were joining %s", p.event_id, p.room_id)
|
|
|
- yield self.on_receive_pdu(origin, p)
|
|
|
+ with logcontext.nested_logging_context(p.event_id):
|
|
|
+ yield self.on_receive_pdu(origin, p, sent_to_us_directly=True)
|
|
|
except Exception as e:
|
|
|
logger.warn(
|
|
|
"Error handling queued PDU %s from %s: %s",
|
|
@@ -1357,7 +1517,7 @@ class FederationHandler(BaseHandler):
|
|
|
)
|
|
|
|
|
|
if state_groups:
|
|
|
- _, state = state_groups.items().pop()
|
|
|
+ _, state = list(state_groups.items()).pop()
|
|
|
results = state
|
|
|
|
|
|
if event.is_state():
|
|
@@ -1429,12 +1589,10 @@ class FederationHandler(BaseHandler):
|
|
|
else:
|
|
|
defer.returnValue(None)
|
|
|
|
|
|
- @log_function
|
|
|
def get_min_depth_for_context(self, context):
|
|
|
return self.store.get_min_depth(context)
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
- @log_function
|
|
|
def _handle_new_event(self, origin, event, state=None, auth_events=None,
|
|
|
backfilled=False):
|
|
|
context = yield self._prep_event(
|
|
@@ -1443,6 +1601,9 @@ class FederationHandler(BaseHandler):
|
|
|
auth_events=auth_events,
|
|
|
)
|
|
|
|
|
|
+ # reraise does not allow inlineCallbacks to preserve the stacktrace, so we
|
|
|
+ # hack around with a try/finally instead.
|
|
|
+ success = False
|
|
|
try:
|
|
|
if not event.internal_metadata.is_outlier() and not backfilled:
|
|
|
yield self.action_generator.handle_push_actions_for_event(
|
|
@@ -1453,15 +1614,13 @@ class FederationHandler(BaseHandler):
|
|
|
[(event, context)],
|
|
|
backfilled=backfilled,
|
|
|
)
|
|
|
- except: # noqa: E722, as we reraise the exception this is fine.
|
|
|
- tp, value, tb = sys.exc_info()
|
|
|
-
|
|
|
- logcontext.run_in_background(
|
|
|
- self.store.remove_push_actions_from_staging,
|
|
|
- event.event_id,
|
|
|
- )
|
|
|
-
|
|
|
- six.reraise(tp, value, tb)
|
|
|
+ success = True
|
|
|
+ finally:
|
|
|
+ if not success:
|
|
|
+ logcontext.run_in_background(
|
|
|
+ self.store.remove_push_actions_from_staging,
|
|
|
+ event.event_id,
|
|
|
+ )
|
|
|
|
|
|
defer.returnValue(context)
|
|
|
|
|
@@ -1474,15 +1633,22 @@ class FederationHandler(BaseHandler):
|
|
|
|
|
|
Notifies about the events where appropriate.
|
|
|
"""
|
|
|
- contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
|
|
- [
|
|
|
- logcontext.run_in_background(
|
|
|
- self._prep_event,
|
|
|
+
|
|
|
+ @defer.inlineCallbacks
|
|
|
+ def prep(ev_info):
|
|
|
+ event = ev_info["event"]
|
|
|
+ with logcontext.nested_logging_context(suffix=event.event_id):
|
|
|
+ res = yield self._prep_event(
|
|
|
origin,
|
|
|
- ev_info["event"],
|
|
|
+ event,
|
|
|
state=ev_info.get("state"),
|
|
|
auth_events=ev_info.get("auth_events"),
|
|
|
)
|
|
|
+ defer.returnValue(res)
|
|
|
+
|
|
|
+ contexts = yield logcontext.make_deferred_yieldable(defer.gatherResults(
|
|
|
+ [
|
|
|
+ logcontext.run_in_background(prep, ev_info)
|
|
|
for ev_info in event_infos
|
|
|
], consumeErrors=True,
|
|
|
))
|
|
@@ -1634,8 +1800,8 @@ class FederationHandler(BaseHandler):
|
|
|
)
|
|
|
except AuthError as e:
|
|
|
logger.warn(
|
|
|
- "Rejecting %s because %s",
|
|
|
- event.event_id, e.msg
|
|
|
+ "[%s %s] Rejecting: %s",
|
|
|
+ event.room_id, event.event_id, e.msg
|
|
|
)
|
|
|
|
|
|
context.rejected = RejectedReason.AUTH_ERROR
|
|
@@ -1686,7 +1852,7 @@ class FederationHandler(BaseHandler):
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
def on_get_missing_events(self, origin, room_id, earliest_events,
|
|
|
- latest_events, limit, min_depth):
|
|
|
+ latest_events, limit):
|
|
|
in_room = yield self.auth.check_host_in_room(
|
|
|
room_id,
|
|
|
origin
|
|
@@ -1695,14 +1861,12 @@ class FederationHandler(BaseHandler):
|
|
|
raise AuthError(403, "Host not in room.")
|
|
|
|
|
|
limit = min(limit, 20)
|
|
|
- min_depth = max(min_depth, 0)
|
|
|
|
|
|
missing_events = yield self.store.get_missing_events(
|
|
|
room_id=room_id,
|
|
|
earliest_events=earliest_events,
|
|
|
latest_events=latest_events,
|
|
|
limit=limit,
|
|
|
- min_depth=min_depth,
|
|
|
)
|
|
|
|
|
|
missing_events = yield filter_events_for_server(
|
|
@@ -1828,7 +1992,10 @@ class FederationHandler(BaseHandler):
|
|
|
(d.type, d.state_key): d for d in different_events if d
|
|
|
})
|
|
|
|
|
|
- new_state = self.state_handler.resolve_events(
|
|
|
+ room_version = yield self.store.get_room_version(event.room_id)
|
|
|
+
|
|
|
+ new_state = yield self.state_handler.resolve_events(
|
|
|
+ room_version,
|
|
|
[list(local_view.values()), list(remote_view.values())],
|
|
|
event
|
|
|
)
|
|
@@ -2353,7 +2520,7 @@ class FederationHandler(BaseHandler):
|
|
|
|
|
|
if not backfilled: # Never notify for backfilled events
|
|
|
for event, _ in event_and_contexts:
|
|
|
- self._notify_persisted_event(event, max_stream_id)
|
|
|
+ yield self._notify_persisted_event(event, max_stream_id)
|
|
|
|
|
|
def _notify_persisted_event(self, event, max_stream_id):
|
|
|
"""Checks to see if notifier/pushers should be notified about the
|
|
@@ -2386,7 +2553,7 @@ class FederationHandler(BaseHandler):
|
|
|
extra_users=extra_users
|
|
|
)
|
|
|
|
|
|
- self.pusher_pool.on_new_notifications(
|
|
|
+ return self.pusher_pool.on_new_notifications(
|
|
|
event_stream_id, max_stream_id,
|
|
|
)
|
|
|
|