|
@@ -224,7 +224,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
|
|
|
|
|
|
await self.db_pool.runInteraction(
|
|
|
"set_destination_retry_timings",
|
|
|
- self._set_destination_retry_timings_native,
|
|
|
+ self._set_destination_retry_timings_txn,
|
|
|
destination,
|
|
|
failure_ts,
|
|
|
retry_last_ts,
|
|
@@ -232,7 +232,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
|
|
|
db_autocommit=True, # Safe as it's a single upsert
|
|
|
)
|
|
|
|
|
|
- def _set_destination_retry_timings_native(
|
|
|
+ def _set_destination_retry_timings_txn(
|
|
|
self,
|
|
|
txn: LoggingTransaction,
|
|
|
destination: str,
|
|
@@ -266,58 +266,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
|
|
|
txn, self.get_destination_retry_timings, (destination,)
|
|
|
)
|
|
|
|
|
|
- def _set_destination_retry_timings_emulated(
|
|
|
- self,
|
|
|
- txn: LoggingTransaction,
|
|
|
- destination: str,
|
|
|
- failure_ts: Optional[int],
|
|
|
- retry_last_ts: int,
|
|
|
- retry_interval: int,
|
|
|
- ) -> None:
|
|
|
- self.database_engine.lock_table(txn, "destinations")
|
|
|
-
|
|
|
- # We need to be careful here as the data may have changed from under us
|
|
|
- # due to a worker setting the timings.
|
|
|
-
|
|
|
- prev_row = self.db_pool.simple_select_one_txn(
|
|
|
- txn,
|
|
|
- table="destinations",
|
|
|
- keyvalues={"destination": destination},
|
|
|
- retcols=("failure_ts", "retry_last_ts", "retry_interval"),
|
|
|
- allow_none=True,
|
|
|
- )
|
|
|
-
|
|
|
- if not prev_row:
|
|
|
- self.db_pool.simple_insert_txn(
|
|
|
- txn,
|
|
|
- table="destinations",
|
|
|
- values={
|
|
|
- "destination": destination,
|
|
|
- "failure_ts": failure_ts,
|
|
|
- "retry_last_ts": retry_last_ts,
|
|
|
- "retry_interval": retry_interval,
|
|
|
- },
|
|
|
- )
|
|
|
- elif (
|
|
|
- retry_interval == 0
|
|
|
- or prev_row["retry_interval"] is None
|
|
|
- or prev_row["retry_interval"] < retry_interval
|
|
|
- ):
|
|
|
- self.db_pool.simple_update_one_txn(
|
|
|
- txn,
|
|
|
- "destinations",
|
|
|
- keyvalues={"destination": destination},
|
|
|
- updatevalues={
|
|
|
- "failure_ts": failure_ts,
|
|
|
- "retry_last_ts": retry_last_ts,
|
|
|
- "retry_interval": retry_interval,
|
|
|
- },
|
|
|
- )
|
|
|
-
|
|
|
- self._invalidate_cache_and_stream(
|
|
|
- txn, self.get_destination_retry_timings, (destination,)
|
|
|
- )
|
|
|
-
|
|
|
async def store_destination_rooms_entries(
|
|
|
self,
|
|
|
destinations: Iterable[str],
|