|
@@ -38,6 +38,7 @@ from synapse.storage.background_updates import BackgroundUpdateStore
|
|
|
from synapse.storage.event_federation import EventFederationStore
|
|
|
from synapse.storage.events_worker import EventsWorkerStore
|
|
|
from synapse.types import RoomStreamToken, get_domain_from_id
|
|
|
+from synapse.util import batch_iter
|
|
|
from synapse.util.async_helpers import ObservableDeferred
|
|
|
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
|
|
|
from synapse.util.frozenutils import frozendict_json_encoder
|
|
@@ -386,12 +387,10 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|
|
)
|
|
|
|
|
|
for room_id, ev_ctx_rm in iteritems(events_by_room):
|
|
|
- # Work out new extremities by recursively adding and removing
|
|
|
- # the new events.
|
|
|
latest_event_ids = yield self.get_latest_event_ids_in_room(
|
|
|
room_id
|
|
|
)
|
|
|
- new_latest_event_ids = yield self._calculate_new_extremeties(
|
|
|
+ new_latest_event_ids = yield self._calculate_new_extremities(
|
|
|
room_id, ev_ctx_rm, latest_event_ids
|
|
|
)
|
|
|
|
|
@@ -400,6 +399,12 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|
|
# No change in extremities, so no change in state
|
|
|
continue
|
|
|
|
|
|
+ # there should always be at least one forward extremity.
|
|
|
+ # (except during the initial persistence of the send_join
|
|
|
+ # results, in which case there will be no existing
|
|
|
+ # extremities, so we'll `continue` above and skip this bit.)
|
|
|
+ assert new_latest_event_ids, "No forward extremities left!"
|
|
|
+
|
|
|
new_forward_extremeties[room_id] = new_latest_event_ids
|
|
|
|
|
|
len_1 = (
|
|
@@ -517,44 +522,88 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|
|
)
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
- def _calculate_new_extremeties(self, room_id, event_contexts, latest_event_ids):
|
|
|
- """Calculates the new forward extremeties for a room given events to
|
|
|
+ def _calculate_new_extremities(self, room_id, event_contexts, latest_event_ids):
|
|
|
+ """Calculates the new forward extremities for a room given events to
|
|
|
persist.
|
|
|
|
|
|
Assumes that we are only persisting events for one room at a time.
|
|
|
"""
|
|
|
- new_latest_event_ids = set(latest_event_ids)
|
|
|
- # First, add all the new events to the list
|
|
|
- new_latest_event_ids.update(
|
|
|
- event.event_id for event, ctx in event_contexts
|
|
|
+
|
|
|
+ # we're only interested in new events which aren't outliers and which aren't
|
|
|
+ # being rejected.
|
|
|
+ new_events = [
|
|
|
+ event for event, ctx in event_contexts
|
|
|
if not event.internal_metadata.is_outlier() and not ctx.rejected
|
|
|
+ ]
|
|
|
+
|
|
|
+ # start with the existing forward extremities
|
|
|
+ result = set(latest_event_ids)
|
|
|
+
|
|
|
+ # add all the new events to the list
|
|
|
+ result.update(
|
|
|
+ event.event_id for event in new_events
|
|
|
)
|
|
|
- # Now remove all events that are referenced by the to-be-added events
|
|
|
- new_latest_event_ids.difference_update(
|
|
|
+
|
|
|
+ # Now remove all events which are prev_events of any of the new events
|
|
|
+ result.difference_update(
|
|
|
e_id
|
|
|
- for event, ctx in event_contexts
|
|
|
+ for event in new_events
|
|
|
for e_id, _ in event.prev_events
|
|
|
- if not event.internal_metadata.is_outlier() and not ctx.rejected
|
|
|
)
|
|
|
|
|
|
- # And finally remove any events that are referenced by previously added
|
|
|
- # events.
|
|
|
- rows = yield self._simple_select_many_batch(
|
|
|
- table="event_edges",
|
|
|
- column="prev_event_id",
|
|
|
- iterable=list(new_latest_event_ids),
|
|
|
- retcols=["prev_event_id"],
|
|
|
- keyvalues={
|
|
|
- "is_state": False,
|
|
|
- },
|
|
|
- desc="_calculate_new_extremeties",
|
|
|
- )
|
|
|
+ # Finally, remove any events which are prev_events of any existing events.
|
|
|
+ existing_prevs = yield self._get_events_which_are_prevs(result)
|
|
|
+ result.difference_update(existing_prevs)
|
|
|
|
|
|
- new_latest_event_ids.difference_update(
|
|
|
- row["prev_event_id"] for row in rows
|
|
|
- )
|
|
|
+ if not result:
|
|
|
+ logger.warn(
|
|
|
+ "Forward extremity list A+B-C-D is now empty in %s. "
|
|
|
+ "Old extremities (A): %s, new events (B): %s, "
|
|
|
+ "existing events which are reffed by new events (C): %s, "
|
|
|
+ "new events which are reffed by existing events (D): %s",
|
|
|
+ room_id, latest_event_ids, new_events,
|
|
|
+ [e_id for event in new_events for e_id, _ in event.prev_events],
|
|
|
+ existing_prevs,
|
|
|
+ )
|
|
|
+ defer.returnValue(result)
|
|
|
|
|
|
- defer.returnValue(new_latest_event_ids)
|
|
|
+ @defer.inlineCallbacks
|
|
|
+ def _get_events_which_are_prevs(self, event_ids):
|
|
|
+ """Filter the supplied list of event_ids to get those which are prev_events of
|
|
|
+ existing (non-outlier) events.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ event_ids (Iterable[str]): event ids to filter
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Deferred[List[str]]: filtered event ids
|
|
|
+ """
|
|
|
+ results = []
|
|
|
+
|
|
|
+ def _get_events(txn, batch):
|
|
|
+ sql = """
|
|
|
+ SELECT prev_event_id
|
|
|
+ FROM event_edges
|
|
|
+ INNER JOIN events USING (event_id)
|
|
|
+ LEFT JOIN rejections USING (event_id)
|
|
|
+ WHERE
|
|
|
+ prev_event_id IN (%s)
|
|
|
+ AND rejections.event_id IS NULL
|
|
|
+ """ % (
|
|
|
+ ",".join("?" for _ in batch),
|
|
|
+ )
|
|
|
+
|
|
|
+ txn.execute(sql, batch)
|
|
|
+ results.extend(r[0] for r in txn)
|
|
|
+
|
|
|
+ for chunk in batch_iter(event_ids, 100):
|
|
|
+ yield self.runInteraction(
|
|
|
+ "_get_events_which_are_prevs",
|
|
|
+ _get_events,
|
|
|
+ chunk,
|
|
|
+ )
|
|
|
+
|
|
|
+ defer.returnValue(results)
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
def _get_new_state_after_events(self, room_id, events_context, old_latest_event_ids,
|
|
@@ -586,10 +635,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
|
|
|
the new current state is only returned if we've already calculated
|
|
|
it.
|
|
|
"""
|
|
|
-
|
|
|
- if not new_latest_event_ids:
|
|
|
- return
|
|
|
-
|
|
|
# map from state_group to ((type, key) -> event_id) state map
|
|
|
state_groups_map = {}
|
|
|
|