|
@@ -301,15 +301,24 @@ class EventsStore(SQLBaseStore):
|
|
|
latest_event_ids = yield self.get_latest_event_ids_in_room(
|
|
|
room_id
|
|
|
)
|
|
|
+ latest_event_ids = frozenset(latest_event_ids)
|
|
|
new_latest_event_ids = yield self._calculate_new_extremeties(
|
|
|
- room_id, [ev for ev, _ in ev_ctx_rm]
|
|
|
+ room_id, [ev for ev, _ in ev_ctx_rm], latest_event_ids
|
|
|
)
|
|
|
|
|
|
- if new_latest_event_ids == set(latest_event_ids):
|
|
|
+ if new_latest_event_ids == latest_event_ids:
|
|
|
# No change in extremities, so no change in state
|
|
|
continue
|
|
|
|
|
|
- new_forward_extremeties[room_id] = new_latest_event_ids
|
|
|
+ to_add = new_latest_event_ids - latest_event_ids
|
|
|
+ to_remove = latest_event_ids - new_latest_event_ids
|
|
|
+
|
|
|
+ new_forward_extremeties[room_id] = {
|
|
|
+ "full_list": new_latest_event_ids,
|
|
|
+ "to_add": to_add,
|
|
|
+ "to_remove": to_remove,
|
|
|
+ "prev_latest": latest_event_ids,
|
|
|
+ }
|
|
|
|
|
|
state = yield self._calculate_state_delta(
|
|
|
room_id, ev_ctx_rm, new_latest_event_ids
|
|
@@ -329,15 +338,12 @@ class EventsStore(SQLBaseStore):
|
|
|
persist_event_counter.inc_by(len(chunk))
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
- def _calculate_new_extremeties(self, room_id, events):
|
|
|
+ def _calculate_new_extremeties(self, room_id, events, latest_event_ids):
|
|
|
"""Calculates the new forward extremeties for a room given events to
|
|
|
persist.
|
|
|
|
|
|
Assumes that we are only persisting events for one room at a time.
|
|
|
"""
|
|
|
- latest_event_ids = yield self.get_latest_event_ids_in_room(
|
|
|
- room_id
|
|
|
- )
|
|
|
new_latest_event_ids = set(latest_event_ids)
|
|
|
# First, add all the new events to the list
|
|
|
new_latest_event_ids.update(
|
|
@@ -573,12 +579,26 @@ class EventsStore(SQLBaseStore):
|
|
|
txn, self.get_users_in_room, (room_id,)
|
|
|
)
|
|
|
|
|
|
- for room_id, new_extrem in new_forward_extremeties.items():
|
|
|
- self._simple_delete_txn(
|
|
|
+ for room_id, new_extrem_dict in new_forward_extremeties.items():
|
|
|
+ current_latest = self._simple_select_onecol_txn(
|
|
|
txn,
|
|
|
table="event_forward_extremities",
|
|
|
keyvalues={"room_id": room_id},
|
|
|
+ retcol="event_id"
|
|
|
)
|
|
|
+
|
|
|
+ if set(current_latest) != new_extrem_dict["prev_latest"]:
|
|
|
+ raise RuntimeError(
|
|
|
+ "event_forward_extremities don't match that when we"
|
|
|
+ " calculated new extrems"
|
|
|
+ )
|
|
|
+
|
|
|
+ txn.executemany(
|
|
|
+ "DELETE FROM event_forward_extremities"
|
|
|
+ " WHERE room_id = ? AND event_id = ?",
|
|
|
+ ((room_id, event_id) for event_id in new_extrem_dict["to_remove"])
|
|
|
+ )
|
|
|
+
|
|
|
txn.call_after(
|
|
|
self.get_latest_event_ids_in_room.invalidate, (room_id,)
|
|
|
)
|
|
@@ -592,7 +612,7 @@ class EventsStore(SQLBaseStore):
|
|
|
"room_id": room_id,
|
|
|
}
|
|
|
for room_id, new_extrem in new_forward_extremeties.items()
|
|
|
- for ev_id in new_extrem
|
|
|
+ for ev_id in new_extrem["to_add"]
|
|
|
],
|
|
|
)
|
|
|
# We now insert into stream_ordering_to_exterm a mapping from room_id,
|
|
@@ -609,7 +629,7 @@ class EventsStore(SQLBaseStore):
|
|
|
"stream_ordering": max_stream_order,
|
|
|
}
|
|
|
for room_id, new_extrem in new_forward_extremeties.items()
|
|
|
- for event_id in new_extrem
|
|
|
+ for event_id in new_extrem["full_list"]
|
|
|
]
|
|
|
)
|
|
|
|