|
@@ -34,14 +34,16 @@ from canonicaljson import encode_canonical_json
|
|
|
from collections import deque, namedtuple, OrderedDict
|
|
|
from functools import wraps
|
|
|
|
|
|
-import synapse
|
|
|
import synapse.metrics
|
|
|
|
|
|
-
|
|
|
import logging
|
|
|
import math
|
|
|
import ujson as json
|
|
|
|
|
|
+# these are only included to make the type annotations work
|
|
|
+from synapse.events import EventBase # noqa: F401
|
|
|
+from synapse.events.snapshot import EventContext # noqa: F401
|
|
|
+
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
@@ -82,6 +84,11 @@ class _EventPeristenceQueue(object):
|
|
|
|
|
|
def add_to_queue(self, room_id, events_and_contexts, backfilled):
|
|
|
"""Add events to the queue, with the given persist_event options.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ room_id (str):
|
|
|
+ events_and_contexts (list[(EventBase, EventContext)]):
|
|
|
+ backfilled (bool):
|
|
|
"""
|
|
|
queue = self._event_persist_queues.setdefault(room_id, deque())
|
|
|
if queue:
|
|
@@ -210,14 +217,14 @@ class EventsStore(SQLBaseStore):
|
|
|
partitioned.setdefault(event.room_id, []).append((event, ctx))
|
|
|
|
|
|
deferreds = []
|
|
|
- for room_id, evs_ctxs in partitioned.items():
|
|
|
+ for room_id, evs_ctxs in partitioned.iteritems():
|
|
|
d = preserve_fn(self._event_persist_queue.add_to_queue)(
|
|
|
room_id, evs_ctxs,
|
|
|
backfilled=backfilled,
|
|
|
)
|
|
|
deferreds.append(d)
|
|
|
|
|
|
- for room_id in partitioned.keys():
|
|
|
+ for room_id in partitioned:
|
|
|
self._maybe_start_persisting(room_id)
|
|
|
|
|
|
return preserve_context_over_deferred(
|
|
@@ -227,6 +234,17 @@ class EventsStore(SQLBaseStore):
|
|
|
@defer.inlineCallbacks
|
|
|
@log_function
|
|
|
def persist_event(self, event, context, backfilled=False):
|
|
|
+ """
|
|
|
+
|
|
|
+ Args:
|
|
|
+ event (EventBase):
|
|
|
+ context (EventContext):
|
|
|
+ backfilled (bool):
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Deferred: resolves to (int, int): the stream ordering of ``event``,
|
|
|
+ and the stream ordering of the latest persisted event
|
|
|
+ """
|
|
|
deferred = self._event_persist_queue.add_to_queue(
|
|
|
event.room_id, [(event, context)],
|
|
|
backfilled=backfilled,
|
|
@@ -253,6 +271,16 @@ class EventsStore(SQLBaseStore):
|
|
|
@defer.inlineCallbacks
|
|
|
def _persist_events(self, events_and_contexts, backfilled=False,
|
|
|
delete_existing=False):
|
|
|
+ """Persist events to db
|
|
|
+
|
|
|
+ Args:
|
|
|
+ events_and_contexts (list[(EventBase, EventContext)]):
|
|
|
+ backfilled (bool):
|
|
|
+ delete_existing (bool):
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Deferred: resolves when the events have been persisted
|
|
|
+ """
|
|
|
if not events_and_contexts:
|
|
|
return
|
|
|
|
|
@@ -295,7 +323,7 @@ class EventsStore(SQLBaseStore):
|
|
|
(event, context)
|
|
|
)
|
|
|
|
|
|
- for room_id, ev_ctx_rm in events_by_room.items():
|
|
|
+ for room_id, ev_ctx_rm in events_by_room.iteritems():
|
|
|
# Work out new extremities by recursively adding and removing
|
|
|
# the new events.
|
|
|
latest_event_ids = yield self.get_latest_event_ids_in_room(
|
|
@@ -400,6 +428,7 @@ class EventsStore(SQLBaseStore):
|
|
|
# Now we need to work out the different state sets for
|
|
|
# each state extremities
|
|
|
state_sets = []
|
|
|
+ state_groups = set()
|
|
|
missing_event_ids = []
|
|
|
was_updated = False
|
|
|
for event_id in new_latest_event_ids:
|
|
@@ -409,9 +438,17 @@ class EventsStore(SQLBaseStore):
|
|
|
if event_id == ev.event_id:
|
|
|
if ctx.current_state_ids is None:
|
|
|
raise Exception("Unknown current state")
|
|
|
- state_sets.append(ctx.current_state_ids)
|
|
|
- if ctx.delta_ids or hasattr(ev, "state_key"):
|
|
|
- was_updated = True
|
|
|
+
|
|
|
+ # If we've already seen the state group don't bother adding
|
|
|
+ # it to the state sets again
|
|
|
+ if ctx.state_group not in state_groups:
|
|
|
+ state_sets.append(ctx.current_state_ids)
|
|
|
+ if ctx.delta_ids or hasattr(ev, "state_key"):
|
|
|
+ was_updated = True
|
|
|
+ if ctx.state_group:
|
|
|
+ # Add this as a seen state group (if it has a state
|
|
|
+ # group)
|
|
|
+ state_groups.add(ctx.state_group)
|
|
|
break
|
|
|
else:
|
|
|
# If we couldn't find it, then we'll need to pull
|
|
@@ -425,31 +462,57 @@ class EventsStore(SQLBaseStore):
|
|
|
missing_event_ids,
|
|
|
)
|
|
|
|
|
|
- groups = set(event_to_groups.values())
|
|
|
- group_to_state = yield self._get_state_for_groups(groups)
|
|
|
+ groups = set(event_to_groups.itervalues()) - state_groups
|
|
|
|
|
|
- state_sets.extend(group_to_state.values())
|
|
|
+ if groups:
|
|
|
+ group_to_state = yield self._get_state_for_groups(groups)
|
|
|
+ state_sets.extend(group_to_state.itervalues())
|
|
|
|
|
|
if not new_latest_event_ids:
|
|
|
current_state = {}
|
|
|
elif was_updated:
|
|
|
- current_state = yield resolve_events(
|
|
|
- state_sets,
|
|
|
- state_map_factory=lambda ev_ids: self.get_events(
|
|
|
- ev_ids, get_prev_content=False, check_redacted=False,
|
|
|
- ),
|
|
|
- )
|
|
|
+ if len(state_sets) == 1:
|
|
|
+ # If there is only one state set, then we know what the current
|
|
|
+ # state is.
|
|
|
+ current_state = state_sets[0]
|
|
|
+ else:
|
|
|
+ # We work out the current state by passing the state sets to the
|
|
|
+ # state resolution algorithm. It may ask for some events, including
|
|
|
+ # the events we have yet to persist, so we need a slightly more
|
|
|
+ # complicated event lookup function than simply looking the events
|
|
|
+ # up in the db.
|
|
|
+ events_map = {ev.event_id: ev for ev, _ in events_context}
|
|
|
+
|
|
|
+ @defer.inlineCallbacks
|
|
|
+ def get_events(ev_ids):
|
|
|
+ # We get the events by first looking at the list of events we
|
|
|
+ # are trying to persist, and then fetching the rest from the DB.
|
|
|
+ db = []
|
|
|
+ to_return = {}
|
|
|
+ for ev_id in ev_ids:
|
|
|
+ ev = events_map.get(ev_id, None)
|
|
|
+ if ev:
|
|
|
+ to_return[ev_id] = ev
|
|
|
+ else:
|
|
|
+ db.append(ev_id)
|
|
|
+
|
|
|
+ if db:
|
|
|
+ evs = yield self.get_events(
|
|
|
+ ev_ids, get_prev_content=False, check_redacted=False,
|
|
|
+ )
|
|
|
+ to_return.update(evs)
|
|
|
+ defer.returnValue(to_return)
|
|
|
+
|
|
|
+ current_state = yield resolve_events(
|
|
|
+ state_sets,
|
|
|
+ state_map_factory=get_events,
|
|
|
+ )
|
|
|
else:
|
|
|
return
|
|
|
|
|
|
- existing_state_rows = yield self._simple_select_list(
|
|
|
- table="current_state_events",
|
|
|
- keyvalues={"room_id": room_id},
|
|
|
- retcols=["event_id", "type", "state_key"],
|
|
|
- desc="_calculate_state_delta",
|
|
|
- )
|
|
|
+ existing_state = yield self.get_current_state_ids(room_id)
|
|
|
|
|
|
- existing_events = set(row["event_id"] for row in existing_state_rows)
|
|
|
+ existing_events = set(existing_state.itervalues())
|
|
|
new_events = set(ev_id for ev_id in current_state.itervalues())
|
|
|
changed_events = existing_events ^ new_events
|
|
|
|
|
@@ -457,9 +520,8 @@ class EventsStore(SQLBaseStore):
|
|
|
return
|
|
|
|
|
|
to_delete = {
|
|
|
- (row["type"], row["state_key"]): row["event_id"]
|
|
|
- for row in existing_state_rows
|
|
|
- if row["event_id"] in changed_events
|
|
|
+ key: ev_id for key, ev_id in existing_state.iteritems()
|
|
|
+ if ev_id in changed_events
|
|
|
}
|
|
|
events_to_insert = (new_events - existing_events)
|
|
|
to_insert = {
|
|
@@ -535,11 +597,91 @@ class EventsStore(SQLBaseStore):
|
|
|
and the rejections table. Things reading from those table will need to check
|
|
|
whether the event was rejected.
|
|
|
|
|
|
- If delete_existing is True then existing events will be purged from the
|
|
|
- database before insertion. This is useful when retrying due to IntegrityError.
|
|
|
+ Args:
|
|
|
+ txn (twisted.enterprise.adbapi.Connection): db connection
|
|
|
+ events_and_contexts (list[(EventBase, EventContext)]):
|
|
|
+ events to persist
|
|
|
+ backfilled (bool): True if the events were backfilled
|
|
|
+ delete_existing (bool): True to purge existing table rows for the
|
|
|
+ events from the database. This is useful when retrying due to
|
|
|
+ IntegrityError.
|
|
|
+ current_state_for_room (dict[str, (list[str], list[str])]):
|
|
|
+ The current-state delta for each room. For each room, a tuple
|
|
|
+ (to_delete, to_insert), being a list of event ids to be removed
|
|
|
+ from the current state, and a list of event ids to be added to
|
|
|
+ the current state.
|
|
|
+ new_forward_extremeties (dict[str, list[str]]):
|
|
|
+ The new forward extremities for each room. For each room, a
|
|
|
+ list of the event ids which are the forward extremities.
|
|
|
+
|
|
|
"""
|
|
|
+ self._update_current_state_txn(txn, current_state_for_room)
|
|
|
+
|
|
|
max_stream_order = events_and_contexts[-1][0].internal_metadata.stream_ordering
|
|
|
- for room_id, current_state_tuple in current_state_for_room.iteritems():
|
|
|
+ self._update_forward_extremities_txn(
|
|
|
+ txn,
|
|
|
+ new_forward_extremities=new_forward_extremeties,
|
|
|
+ max_stream_order=max_stream_order,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Ensure that we don't have the same event twice.
|
|
|
+ events_and_contexts = self._filter_events_and_contexts_for_duplicates(
|
|
|
+ events_and_contexts,
|
|
|
+ )
|
|
|
+
|
|
|
+ self._update_room_depths_txn(
|
|
|
+ txn,
|
|
|
+ events_and_contexts=events_and_contexts,
|
|
|
+ backfilled=backfilled,
|
|
|
+ )
|
|
|
+
|
|
|
+ # _update_outliers_txn filters out any events which have already been
|
|
|
+ # persisted, and returns the filtered list.
|
|
|
+ events_and_contexts = self._update_outliers_txn(
|
|
|
+ txn,
|
|
|
+ events_and_contexts=events_and_contexts,
|
|
|
+ )
|
|
|
+
|
|
|
+ # From this point onwards the events are only events that we haven't
|
|
|
+ # seen before.
|
|
|
+
|
|
|
+ if delete_existing:
|
|
|
+ # For paranoia reasons, we go and delete all the existing entries
|
|
|
+ # for these events so we can reinsert them.
|
|
|
+ # This gets around any problems with some tables already having
|
|
|
+ # entries.
|
|
|
+ self._delete_existing_rows_txn(
|
|
|
+ txn,
|
|
|
+ events_and_contexts=events_and_contexts,
|
|
|
+ )
|
|
|
+
|
|
|
+ self._store_event_txn(
|
|
|
+ txn,
|
|
|
+ events_and_contexts=events_and_contexts,
|
|
|
+ )
|
|
|
+
|
|
|
+ # Insert into the state_groups, state_groups_state, and
|
|
|
+ # event_to_state_groups tables.
|
|
|
+ self._store_mult_state_groups_txn(txn, events_and_contexts)
|
|
|
+
|
|
|
+ # _store_rejected_events_txn filters out any events which were
|
|
|
+ # rejected, and returns the filtered list.
|
|
|
+ events_and_contexts = self._store_rejected_events_txn(
|
|
|
+ txn,
|
|
|
+ events_and_contexts=events_and_contexts,
|
|
|
+ )
|
|
|
+
|
|
|
+ # From this point onwards the events are only ones that weren't
|
|
|
+ # rejected.
|
|
|
+
|
|
|
+ self._update_metadata_tables_txn(
|
|
|
+ txn,
|
|
|
+ events_and_contexts=events_and_contexts,
|
|
|
+ backfilled=backfilled,
|
|
|
+ )
|
|
|
+
|
|
|
+ def _update_current_state_txn(self, txn, state_delta_by_room):
|
|
|
+ for room_id, current_state_tuple in state_delta_by_room.iteritems():
|
|
|
to_delete, to_insert = current_state_tuple
|
|
|
txn.executemany(
|
|
|
"DELETE FROM current_state_events WHERE event_id = ?",
|
|
@@ -585,7 +727,13 @@ class EventsStore(SQLBaseStore):
|
|
|
txn, self.get_users_in_room, (room_id,)
|
|
|
)
|
|
|
|
|
|
- for room_id, new_extrem in new_forward_extremeties.items():
|
|
|
+ self._invalidate_cache_and_stream(
|
|
|
+ txn, self.get_current_state_ids, (room_id,)
|
|
|
+ )
|
|
|
+
|
|
|
+ def _update_forward_extremities_txn(self, txn, new_forward_extremities,
|
|
|
+ max_stream_order):
|
|
|
+ for room_id, new_extrem in new_forward_extremities.iteritems():
|
|
|
self._simple_delete_txn(
|
|
|
txn,
|
|
|
table="event_forward_extremities",
|
|
@@ -603,7 +751,7 @@ class EventsStore(SQLBaseStore):
|
|
|
"event_id": ev_id,
|
|
|
"room_id": room_id,
|
|
|
}
|
|
|
- for room_id, new_extrem in new_forward_extremeties.items()
|
|
|
+ for room_id, new_extrem in new_forward_extremities.iteritems()
|
|
|
for ev_id in new_extrem
|
|
|
],
|
|
|
)
|
|
@@ -620,13 +768,22 @@ class EventsStore(SQLBaseStore):
|
|
|
"event_id": event_id,
|
|
|
"stream_ordering": max_stream_order,
|
|
|
}
|
|
|
- for room_id, new_extrem in new_forward_extremeties.items()
|
|
|
+ for room_id, new_extrem in new_forward_extremities.iteritems()
|
|
|
for event_id in new_extrem
|
|
|
]
|
|
|
)
|
|
|
|
|
|
- # Ensure that we don't have the same event twice.
|
|
|
- # Pick the earliest non-outlier if there is one, else the earliest one.
|
|
|
+ @classmethod
|
|
|
+ def _filter_events_and_contexts_for_duplicates(cls, events_and_contexts):
|
|
|
+ """Ensure that we don't have the same event twice.
|
|
|
+
|
|
|
+ Pick the earliest non-outlier if there is one, else the earliest one.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ events_and_contexts (list[(EventBase, EventContext)]):
|
|
|
+ Returns:
|
|
|
+ list[(EventBase, EventContext)]: filtered list
|
|
|
+ """
|
|
|
new_events_and_contexts = OrderedDict()
|
|
|
for event, context in events_and_contexts:
|
|
|
prev_event_context = new_events_and_contexts.get(event.event_id)
|
|
@@ -639,9 +796,17 @@ class EventsStore(SQLBaseStore):
|
|
|
new_events_and_contexts[event.event_id] = (event, context)
|
|
|
else:
|
|
|
new_events_and_contexts[event.event_id] = (event, context)
|
|
|
+ return new_events_and_contexts.values()
|
|
|
|
|
|
- events_and_contexts = new_events_and_contexts.values()
|
|
|
+ def _update_room_depths_txn(self, txn, events_and_contexts, backfilled):
|
|
|
+ """Update min_depth for each room
|
|
|
|
|
|
+ Args:
|
|
|
+ txn (twisted.enterprise.adbapi.Connection): db connection
|
|
|
+ events_and_contexts (list[(EventBase, EventContext)]): events
|
|
|
+ we are persisting
|
|
|
+ backfilled (bool): True if the events were backfilled
|
|
|
+ """
|
|
|
depth_updates = {}
|
|
|
for event, context in events_and_contexts:
|
|
|
# Remove the any existing cache entries for the event_ids
|
|
@@ -657,9 +822,24 @@ class EventsStore(SQLBaseStore):
|
|
|
event.depth, depth_updates.get(event.room_id, event.depth)
|
|
|
)
|
|
|
|
|
|
- for room_id, depth in depth_updates.items():
|
|
|
+ for room_id, depth in depth_updates.iteritems():
|
|
|
self._update_min_depth_for_room_txn(txn, room_id, depth)
|
|
|
|
|
|
+ def _update_outliers_txn(self, txn, events_and_contexts):
|
|
|
+ """Update any outliers with new event info.
|
|
|
+
|
|
|
+ This turns outliers into ex-outliers (unless the new event was
|
|
|
+ rejected).
|
|
|
+
|
|
|
+ Args:
|
|
|
+ txn (twisted.enterprise.adbapi.Connection): db connection
|
|
|
+ events_and_contexts (list[(EventBase, EventContext)]): events
|
|
|
+ we are persisting
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list[(EventBase, EventContext)] new list, without events which
|
|
|
+ are already in the events table.
|
|
|
+ """
|
|
|
txn.execute(
|
|
|
"SELECT event_id, outlier FROM events WHERE event_id in (%s)" % (
|
|
|
",".join(["?"] * len(events_and_contexts)),
|
|
@@ -669,24 +849,21 @@ class EventsStore(SQLBaseStore):
|
|
|
|
|
|
have_persisted = {
|
|
|
event_id: outlier
|
|
|
- for event_id, outlier in txn.fetchall()
|
|
|
+ for event_id, outlier in txn
|
|
|
}
|
|
|
|
|
|
to_remove = set()
|
|
|
for event, context in events_and_contexts:
|
|
|
- if context.rejected:
|
|
|
- # If the event is rejected then we don't care if the event
|
|
|
- # was an outlier or not.
|
|
|
- if event.event_id in have_persisted:
|
|
|
- # If we have already seen the event then ignore it.
|
|
|
- to_remove.add(event)
|
|
|
- continue
|
|
|
-
|
|
|
if event.event_id not in have_persisted:
|
|
|
continue
|
|
|
|
|
|
to_remove.add(event)
|
|
|
|
|
|
+ if context.rejected:
|
|
|
+ # If the event is rejected then we don't care if the event
|
|
|
+ # was an outlier or not.
|
|
|
+ continue
|
|
|
+
|
|
|
outlier_persisted = have_persisted[event.event_id]
|
|
|
if not event.internal_metadata.is_outlier() and outlier_persisted:
|
|
|
# We received a copy of an event that we had already stored as
|
|
@@ -741,37 +918,19 @@ class EventsStore(SQLBaseStore):
|
|
|
# event isn't an outlier any more.
|
|
|
self._update_backward_extremeties(txn, [event])
|
|
|
|
|
|
- events_and_contexts = [
|
|
|
+ return [
|
|
|
ec for ec in events_and_contexts if ec[0] not in to_remove
|
|
|
]
|
|
|
|
|
|
+ @classmethod
|
|
|
+ def _delete_existing_rows_txn(cls, txn, events_and_contexts):
|
|
|
if not events_and_contexts:
|
|
|
- # Make sure we don't pass an empty list to functions that expect to
|
|
|
- # be storing at least one element.
|
|
|
+ # nothing to do here
|
|
|
return
|
|
|
|
|
|
- # From this point onwards the events are only events that we haven't
|
|
|
- # seen before.
|
|
|
-
|
|
|
- def event_dict(event):
|
|
|
- return {
|
|
|
- k: v
|
|
|
- for k, v in event.get_dict().items()
|
|
|
- if k not in [
|
|
|
- "redacted",
|
|
|
- "redacted_because",
|
|
|
- ]
|
|
|
- }
|
|
|
-
|
|
|
- if delete_existing:
|
|
|
- # For paranoia reasons, we go and delete all the existing entries
|
|
|
- # for these events so we can reinsert them.
|
|
|
- # This gets around any problems with some tables already having
|
|
|
- # entries.
|
|
|
+ logger.info("Deleting existing")
|
|
|
|
|
|
- logger.info("Deleting existing")
|
|
|
-
|
|
|
- for table in (
|
|
|
+ for table in (
|
|
|
"events",
|
|
|
"event_auth",
|
|
|
"event_json",
|
|
@@ -794,11 +953,30 @@ class EventsStore(SQLBaseStore):
|
|
|
"redactions",
|
|
|
"room_memberships",
|
|
|
"topics"
|
|
|
- ):
|
|
|
- txn.executemany(
|
|
|
- "DELETE FROM %s WHERE event_id = ?" % (table,),
|
|
|
- [(ev.event_id,) for ev, _ in events_and_contexts]
|
|
|
- )
|
|
|
+ ):
|
|
|
+ txn.executemany(
|
|
|
+ "DELETE FROM %s WHERE event_id = ?" % (table,),
|
|
|
+ [(ev.event_id,) for ev, _ in events_and_contexts]
|
|
|
+ )
|
|
|
+
|
|
|
+ def _store_event_txn(self, txn, events_and_contexts):
|
|
|
+ """Insert new events into the event and event_json tables
|
|
|
+
|
|
|
+ Args:
|
|
|
+ txn (twisted.enterprise.adbapi.Connection): db connection
|
|
|
+ events_and_contexts (list[(EventBase, EventContext)]): events
|
|
|
+ we are persisting
|
|
|
+ """
|
|
|
+
|
|
|
+ if not events_and_contexts:
|
|
|
+ # nothing to do here
|
|
|
+ return
|
|
|
+
|
|
|
+ def event_dict(event):
|
|
|
+ d = event.get_dict()
|
|
|
+ d.pop("redacted", None)
|
|
|
+ d.pop("redacted_because", None)
|
|
|
+ return d
|
|
|
|
|
|
self._simple_insert_many_txn(
|
|
|
txn,
|
|
@@ -842,6 +1020,19 @@ class EventsStore(SQLBaseStore):
|
|
|
],
|
|
|
)
|
|
|
|
|
|
+ def _store_rejected_events_txn(self, txn, events_and_contexts):
|
|
|
+ """Add rows to the 'rejections' table for received events which were
|
|
|
+ rejected
|
|
|
+
|
|
|
+ Args:
|
|
|
+ txn (twisted.enterprise.adbapi.Connection): db connection
|
|
|
+ events_and_contexts (list[(EventBase, EventContext)]): events
|
|
|
+ we are persisting
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ list[(EventBase, EventContext)] new list, without the rejected
|
|
|
+ events.
|
|
|
+ """
|
|
|
# Remove the rejected events from the list now that we've added them
|
|
|
# to the events table and the events_json table.
|
|
|
to_remove = set()
|
|
@@ -853,17 +1044,24 @@ class EventsStore(SQLBaseStore):
|
|
|
)
|
|
|
to_remove.add(event)
|
|
|
|
|
|
- events_and_contexts = [
|
|
|
+ return [
|
|
|
ec for ec in events_and_contexts if ec[0] not in to_remove
|
|
|
]
|
|
|
|
|
|
+ def _update_metadata_tables_txn(self, txn, events_and_contexts, backfilled):
|
|
|
+ """Update all the miscellaneous tables for new events
|
|
|
+
|
|
|
+ Args:
|
|
|
+ txn (twisted.enterprise.adbapi.Connection): db connection
|
|
|
+ events_and_contexts (list[(EventBase, EventContext)]): events
|
|
|
+ we are persisting
|
|
|
+ backfilled (bool): True if the events were backfilled
|
|
|
+ """
|
|
|
+
|
|
|
if not events_and_contexts:
|
|
|
- # Make sure we don't pass an empty list to functions that expect to
|
|
|
- # be storing at least one element.
|
|
|
+ # nothing to do here
|
|
|
return
|
|
|
|
|
|
- # From this point onwards the events are only ones that weren't rejected.
|
|
|
-
|
|
|
for event, context in events_and_contexts:
|
|
|
# Insert all the push actions into the event_push_actions table.
|
|
|
if context.push_actions:
|
|
@@ -892,10 +1090,6 @@ class EventsStore(SQLBaseStore):
|
|
|
],
|
|
|
)
|
|
|
|
|
|
- # Insert into the state_groups, state_groups_state, and
|
|
|
- # event_to_state_groups tables.
|
|
|
- self._store_mult_state_groups_txn(txn, events_and_contexts)
|
|
|
-
|
|
|
# Update the event_forward_extremities, event_backward_extremities and
|
|
|
# event_edges tables.
|
|
|
self._handle_mult_prev_events(
|
|
@@ -982,13 +1176,6 @@ class EventsStore(SQLBaseStore):
|
|
|
# Prefill the event cache
|
|
|
self._add_to_cache(txn, events_and_contexts)
|
|
|
|
|
|
- if backfilled:
|
|
|
- # Backfilled events come before the current state so we don't need
|
|
|
- # to update the current state table
|
|
|
- return
|
|
|
-
|
|
|
- return
|
|
|
-
|
|
|
def _add_to_cache(self, txn, events_and_contexts):
|
|
|
to_prefill = []
|
|
|
|
|
@@ -1597,14 +1784,13 @@ class EventsStore(SQLBaseStore):
|
|
|
|
|
|
def get_all_new_events_txn(txn):
|
|
|
sql = (
|
|
|
- "SELECT e.stream_ordering, ej.internal_metadata, ej.json, eg.state_group"
|
|
|
- " FROM events as e"
|
|
|
- " JOIN event_json as ej"
|
|
|
- " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
|
|
|
- " LEFT JOIN event_to_state_groups as eg"
|
|
|
- " ON e.event_id = eg.event_id"
|
|
|
- " WHERE ? < e.stream_ordering AND e.stream_ordering <= ?"
|
|
|
- " ORDER BY e.stream_ordering ASC"
|
|
|
+ "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
|
|
|
+ " state_key, redacts"
|
|
|
+ " FROM events AS e"
|
|
|
+ " LEFT JOIN redactions USING (event_id)"
|
|
|
+ " LEFT JOIN state_events USING (event_id)"
|
|
|
+ " WHERE ? < stream_ordering AND stream_ordering <= ?"
|
|
|
+ " ORDER BY stream_ordering ASC"
|
|
|
" LIMIT ?"
|
|
|
)
|
|
|
if have_forward_events:
|
|
@@ -1630,15 +1816,13 @@ class EventsStore(SQLBaseStore):
|
|
|
forward_ex_outliers = []
|
|
|
|
|
|
sql = (
|
|
|
- "SELECT -e.stream_ordering, ej.internal_metadata, ej.json,"
|
|
|
- " eg.state_group"
|
|
|
- " FROM events as e"
|
|
|
- " JOIN event_json as ej"
|
|
|
- " ON e.event_id = ej.event_id AND e.room_id = ej.room_id"
|
|
|
- " LEFT JOIN event_to_state_groups as eg"
|
|
|
- " ON e.event_id = eg.event_id"
|
|
|
- " WHERE ? > e.stream_ordering AND e.stream_ordering >= ?"
|
|
|
- " ORDER BY e.stream_ordering DESC"
|
|
|
+ "SELECT -e.stream_ordering, e.event_id, e.room_id, e.type,"
|
|
|
+ " state_key, redacts"
|
|
|
+ " FROM events AS e"
|
|
|
+ " LEFT JOIN redactions USING (event_id)"
|
|
|
+ " LEFT JOIN state_events USING (event_id)"
|
|
|
+ " WHERE ? > stream_ordering AND stream_ordering >= ?"
|
|
|
+ " ORDER BY stream_ordering DESC"
|
|
|
" LIMIT ?"
|
|
|
)
|
|
|
if have_backfill_events:
|
|
@@ -1825,7 +2009,7 @@ class EventsStore(SQLBaseStore):
|
|
|
"state_key": key[1],
|
|
|
"event_id": state_id,
|
|
|
}
|
|
|
- for key, state_id in curr_state.items()
|
|
|
+ for key, state_id in curr_state.iteritems()
|
|
|
],
|
|
|
)
|
|
|
|