|
@@ -332,6 +332,35 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
|
|
|
|
|
def insert_linearized_receipt_txn(self, txn, room_id, receipt_type,
|
|
|
user_id, event_id, data, stream_id):
|
|
|
+ res = self._simple_select_one_txn(
|
|
|
+ txn,
|
|
|
+ table="events",
|
|
|
+ retcols=["topological_ordering", "stream_ordering"],
|
|
|
+ keyvalues={"event_id": event_id},
|
|
|
+ allow_none=True
|
|
|
+ )
|
|
|
+
|
|
|
+ stream_ordering = int(res["stream_ordering"]) if res else None
|
|
|
+
|
|
|
+ # We don't want to clobber receipts for more recent events, so we
|
|
|
+ # have to compare orderings of existing receipts
|
|
|
+ if stream_ordering is not None:
|
|
|
+ sql = (
|
|
|
+ "SELECT stream_ordering, event_id FROM events"
|
|
|
+ " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
|
|
|
+ " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
|
|
|
+ )
|
|
|
+ txn.execute(sql, (room_id, receipt_type, user_id))
|
|
|
+
|
|
|
+ for so, eid in txn:
|
|
|
+ if int(so) >= stream_ordering:
|
|
|
+ logger.debug(
|
|
|
+ "Ignoring new receipt for %s in favour of existing "
|
|
|
+ "one for later event %s",
|
|
|
+ event_id, eid,
|
|
|
+ )
|
|
|
+ return False
|
|
|
+
|
|
|
txn.call_after(
|
|
|
self.get_receipts_for_room.invalidate, (room_id, receipt_type)
|
|
|
)
|
|
@@ -355,34 +384,6 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
|
|
(user_id, room_id, receipt_type)
|
|
|
)
|
|
|
|
|
|
- res = self._simple_select_one_txn(
|
|
|
- txn,
|
|
|
- table="events",
|
|
|
- retcols=["topological_ordering", "stream_ordering"],
|
|
|
- keyvalues={"event_id": event_id},
|
|
|
- allow_none=True
|
|
|
- )
|
|
|
-
|
|
|
- topological_ordering = int(res["topological_ordering"]) if res else None
|
|
|
- stream_ordering = int(res["stream_ordering"]) if res else None
|
|
|
-
|
|
|
- # We don't want to clobber receipts for more recent events, so we
|
|
|
- # have to compare orderings of existing receipts
|
|
|
- sql = (
|
|
|
- "SELECT topological_ordering, stream_ordering, event_id FROM events"
|
|
|
- " INNER JOIN receipts_linearized as r USING (event_id, room_id)"
|
|
|
- " WHERE r.room_id = ? AND r.receipt_type = ? AND r.user_id = ?"
|
|
|
- )
|
|
|
-
|
|
|
- txn.execute(sql, (room_id, receipt_type, user_id))
|
|
|
-
|
|
|
- if topological_ordering:
|
|
|
- for to, so, _ in txn:
|
|
|
- if int(to) > topological_ordering:
|
|
|
- return False
|
|
|
- elif int(to) == topological_ordering and int(so) >= stream_ordering:
|
|
|
- return False
|
|
|
-
|
|
|
self._simple_delete_txn(
|
|
|
txn,
|
|
|
table="receipts_linearized",
|
|
@@ -406,7 +407,7 @@ class ReceiptsStore(ReceiptsWorkerStore):
|
|
|
}
|
|
|
)
|
|
|
|
|
|
- if receipt_type == "m.read" and topological_ordering:
|
|
|
+ if receipt_type == "m.read" and stream_ordering is not None:
|
|
|
self._remove_old_push_actions_before_txn(
|
|
|
txn,
|
|
|
room_id=room_id,
|