Sfoglia il codice sorgente

Catch up after Federation Outage (split, 2): Track last successful stream ordering after transmission (#8247)

Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
reivilibre 4 anni fa
parent
commit
17fa4c7ca7

+ 1 - 0
changelog.d/8247.misc

@@ -0,0 +1 @@
+Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage.

+ 11 - 0
synapse/federation/sender/per_destination_queue.py

@@ -325,6 +325,17 @@ class PerDestinationQueue:
 
                     self._last_device_stream_id = device_stream_id
                     self._last_device_list_stream_id = dev_list_id
+
+                    if pending_pdus:
+                        # we sent some PDUs and it was successful, so update our
+                        # last_successful_stream_ordering in the destinations table.
+                        final_pdu = pending_pdus[-1]
+                        last_successful_stream_ordering = (
+                            final_pdu.internal_metadata.stream_ordering
+                        )
+                        await self._store.set_destination_last_successful_stream_ordering(
+                            self._destination, last_successful_stream_ordering
+                        )
                 else:
                     break
         except NotRetryingDestination as e:

+ 21 - 0
synapse/storage/databases/main/schema/delta/58/17_catchup_last_successful.sql

@@ -0,0 +1,21 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- This column tracks the stream_ordering of the event that was most recently
+-- successfully transmitted to the destination.
+-- A value of NULL means that we have not sent an event successfully yet
+-- (at least, not since the introduction of this column).
+ALTER TABLE destinations
+    ADD COLUMN last_successful_stream_ordering BIGINT;

+ 38 - 0
synapse/storage/databases/main/transactions.py

@@ -333,3 +333,41 @@ class TransactionStore(SQLBaseStore):
             ["stream_ordering"],
             [(stream_ordering,)] * len(rows),
         )
+
+    async def get_destination_last_successful_stream_ordering(
+        self, destination: str
+    ) -> Optional[int]:
+        """
+        Gets the stream ordering of the PDU most-recently successfully sent
+        to the specified destination, or None if this information has not been
+        tracked yet.
+
+        Args:
+            destination: the destination to query
+        """
+        return await self.db_pool.simple_select_one_onecol(
+            "destinations",
+            {"destination": destination},
+            "last_successful_stream_ordering",
+            allow_none=True,
+            desc="get_last_successful_stream_ordering",
+        )
+
+    async def set_destination_last_successful_stream_ordering(
+        self, destination: str, last_successful_stream_ordering: int
+    ) -> None:
+        """
+        Marks that we have successfully sent the PDUs up to and including the
+        one specified.
+
+        Args:
+            destination: the destination we have successfully sent to
+            last_successful_stream_ordering: the stream_ordering of the most
+                recent successfully-sent PDU
+        """
+        return await self.db_pool.simple_upsert(
+            "destinations",
+            keyvalues={"destination": destination},
+            values={"last_successful_stream_ordering": last_successful_stream_ordering},
+            desc="set_last_successful_stream_ordering",
+        )