|
@@ -2110,11 +2110,29 @@ class EventsWorkerStore(SQLBaseStore):
|
|
|
def _get_partial_state_events_batch_txn(
|
|
|
txn: LoggingTransaction, room_id: str
|
|
|
) -> List[str]:
|
|
|
+ # we want to work through the events from oldest to newest, so
|
|
|
+ # we only want events whose prev_events do *not* have partial state - hence
|
|
|
+ # the 'NOT EXISTS' clause in the below.
|
|
|
+ #
|
|
|
+ # This is necessary because ordering by stream ordering isn't quite enough
|
|
|
+ # to ensure that we work from oldest to newest event (in particular,
|
|
|
+ # if an event is initially persisted as an outlier and later de-outliered,
|
|
|
+ # it can end up with a lower stream_ordering than its prev_events).
|
|
|
+ #
|
|
|
+ # Typically this means we'll only return one event per batch, but that's
|
|
|
+ # hard to do much about.
|
|
|
+ #
|
|
|
+ # See also: https://github.com/matrix-org/synapse/issues/13001
|
|
|
txn.execute(
|
|
|
"""
|
|
|
SELECT event_id FROM partial_state_events AS pse
|
|
|
JOIN events USING (event_id)
|
|
|
- WHERE pse.room_id = ?
|
|
|
+ WHERE pse.room_id = ? AND
|
|
|
+ NOT EXISTS(
|
|
|
+ SELECT 1 FROM event_edges AS ee
|
|
|
+ JOIN partial_state_events AS prev_pse ON (prev_pse.event_id=ee.prev_event_id)
|
|
|
+ WHERE ee.event_id=pse.event_id
|
|
|
+ )
|
|
|
ORDER BY events.stream_ordering
|
|
|
LIMIT 100
|
|
|
""",
|