|
@@ -118,8 +118,6 @@ class StateStore(SQLBaseStore):
|
|
|
if self._have_persisted_state_group_txn(txn, context.state_group):
|
|
|
continue
|
|
|
|
|
|
- state_event_ids = dict(context.current_state_ids)
|
|
|
-
|
|
|
self._simple_insert_txn(
|
|
|
txn,
|
|
|
table="state_groups",
|
|
@@ -130,49 +128,36 @@ class StateStore(SQLBaseStore):
|
|
|
},
|
|
|
)
|
|
|
|
|
|
+ # We persist as a delta if we can, while also ensuring the chain
|
|
|
+ # of deltas isn't tooo long, as otherwise read performance degrades.
|
|
|
if context.prev_group:
|
|
|
potential_hops = self._count_state_group_hops_txn(
|
|
|
txn, context.prev_group
|
|
|
)
|
|
|
- if potential_hops < MAX_STATE_DELTA_HOPS:
|
|
|
- self._simple_insert_txn(
|
|
|
- txn,
|
|
|
- table="state_group_edges",
|
|
|
- values={
|
|
|
- "state_group": context.state_group,
|
|
|
- "prev_state_group": context.prev_group,
|
|
|
- },
|
|
|
- )
|
|
|
+ if context.prev_group and potential_hops < MAX_STATE_DELTA_HOPS:
|
|
|
+ self._simple_insert_txn(
|
|
|
+ txn,
|
|
|
+ table="state_group_edges",
|
|
|
+ values={
|
|
|
+ "state_group": context.state_group,
|
|
|
+ "prev_state_group": context.prev_group,
|
|
|
+ },
|
|
|
+ )
|
|
|
|
|
|
- self._simple_insert_many_txn(
|
|
|
- txn,
|
|
|
- table="state_groups_state",
|
|
|
- values=[
|
|
|
- {
|
|
|
- "state_group": context.state_group,
|
|
|
- "room_id": event.room_id,
|
|
|
- "type": key[0],
|
|
|
- "state_key": key[1],
|
|
|
- "event_id": state_id,
|
|
|
- }
|
|
|
- for key, state_id in context.delta_ids.items()
|
|
|
- ],
|
|
|
- )
|
|
|
- else:
|
|
|
- self._simple_insert_many_txn(
|
|
|
- txn,
|
|
|
- table="state_groups_state",
|
|
|
- values=[
|
|
|
- {
|
|
|
- "state_group": context.state_group,
|
|
|
- "room_id": event.room_id,
|
|
|
- "type": key[0],
|
|
|
- "state_key": key[1],
|
|
|
- "event_id": state_id,
|
|
|
- }
|
|
|
- for key, state_id in context.current_state_ids.items()
|
|
|
- ],
|
|
|
- )
|
|
|
+ self._simple_insert_many_txn(
|
|
|
+ txn,
|
|
|
+ table="state_groups_state",
|
|
|
+ values=[
|
|
|
+ {
|
|
|
+ "state_group": context.state_group,
|
|
|
+ "room_id": event.room_id,
|
|
|
+ "type": key[0],
|
|
|
+ "state_key": key[1],
|
|
|
+ "event_id": state_id,
|
|
|
+ }
|
|
|
+ for key, state_id in context.delta_ids.items()
|
|
|
+ ],
|
|
|
+ )
|
|
|
else:
|
|
|
self._simple_insert_many_txn(
|
|
|
txn,
|
|
@@ -185,7 +170,7 @@ class StateStore(SQLBaseStore):
|
|
|
"state_key": key[1],
|
|
|
"event_id": state_id,
|
|
|
}
|
|
|
- for key, state_id in state_event_ids.items()
|
|
|
+ for key, state_id in context.current_state_ids.items()
|
|
|
],
|
|
|
)
|
|
|
|
|
@@ -202,6 +187,10 @@ class StateStore(SQLBaseStore):
|
|
|
)
|
|
|
|
|
|
def _count_state_group_hops_txn(self, txn, state_group):
|
|
|
+ """Given a state group, count how many hops there are in the tree.
|
|
|
+
|
|
|
+ This is used to ensure the delta chains don't get too long.
|
|
|
+ """
|
|
|
if isinstance(self.database_engine, PostgresEngine):
|
|
|
sql = ("""
|
|
|
WITH RECURSIVE state(state_group) AS (
|
|
@@ -319,6 +308,11 @@ class StateStore(SQLBaseStore):
|
|
|
|
|
|
results = {group: {} for group in groups}
|
|
|
if isinstance(self.database_engine, PostgresEngine):
|
|
|
+ # The below query walks the state_group tree so that the "state"
|
|
|
+ # table includes all state_groups in the tree. It then joins
|
|
|
+ # against `state_groups_state` to fetch the latest state.
|
|
|
+ # It assumes that previous state groups are always numerically
|
|
|
+ # lesser.
|
|
|
sql = ("""
|
|
|
WITH RECURSIVE state(state_group) AS (
|
|
|
VALUES(?::bigint)
|
|
@@ -644,6 +638,9 @@ class StateStore(SQLBaseStore):
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
def _background_deduplicate_state(self, progress, batch_size):
|
|
|
+ """This background update will slowly deduplicate state by reencoding
|
|
|
+ them as deltas.
|
|
|
+ """
|
|
|
last_state_group = progress.get("last_state_group", 0)
|
|
|
rows_inserted = progress.get("rows_inserted", 0)
|
|
|
max_group = progress.get("max_group", None)
|