transactions.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from collections import namedtuple
  17. import six
  18. from canonicaljson import encode_canonical_json
  19. from twisted.internet import defer
  20. from synapse.metrics.background_process_metrics import run_as_background_process
  21. from synapse.storage._base import SQLBaseStore, db_to_json
  22. from synapse.storage.database import Database
  23. from synapse.util.caches.expiringcache import ExpiringCache
  24. # py2 sqlite has buffer hardcoded as only binary type, so we must use it,
  25. # despite being deprecated and removed in favor of memoryview
  26. if six.PY2:
  27. db_binary_type = six.moves.builtins.buffer
  28. else:
  29. db_binary_type = memoryview
  30. logger = logging.getLogger(__name__)
  31. _TransactionRow = namedtuple(
  32. "_TransactionRow",
  33. ("id", "transaction_id", "destination", "ts", "response_code", "response_json"),
  34. )
  35. _UpdateTransactionRow = namedtuple(
  36. "_TransactionRow", ("response_code", "response_json")
  37. )
  38. SENTINEL = object()
  39. class TransactionStore(SQLBaseStore):
  40. """A collection of queries for handling PDUs.
  41. """
  42. def __init__(self, database: Database, db_conn, hs):
  43. super(TransactionStore, self).__init__(database, db_conn, hs)
  44. self._clock.looping_call(self._start_cleanup_transactions, 30 * 60 * 1000)
  45. self._destination_retry_cache = ExpiringCache(
  46. cache_name="get_destination_retry_timings",
  47. clock=self._clock,
  48. expiry_ms=5 * 60 * 1000,
  49. )
  50. def get_received_txn_response(self, transaction_id, origin):
  51. """For an incoming transaction from a given origin, check if we have
  52. already responded to it. If so, return the response code and response
  53. body (as a dict).
  54. Args:
  55. transaction_id (str)
  56. origin(str)
  57. Returns:
  58. tuple: None if we have not previously responded to
  59. this transaction or a 2-tuple of (int, dict)
  60. """
  61. return self.db.runInteraction(
  62. "get_received_txn_response",
  63. self._get_received_txn_response,
  64. transaction_id,
  65. origin,
  66. )
  67. def _get_received_txn_response(self, txn, transaction_id, origin):
  68. result = self.db.simple_select_one_txn(
  69. txn,
  70. table="received_transactions",
  71. keyvalues={"transaction_id": transaction_id, "origin": origin},
  72. retcols=(
  73. "transaction_id",
  74. "origin",
  75. "ts",
  76. "response_code",
  77. "response_json",
  78. "has_been_referenced",
  79. ),
  80. allow_none=True,
  81. )
  82. if result and result["response_code"]:
  83. return result["response_code"], db_to_json(result["response_json"])
  84. else:
  85. return None
  86. def set_received_txn_response(self, transaction_id, origin, code, response_dict):
  87. """Persist the response we returened for an incoming transaction, and
  88. should return for subsequent transactions with the same transaction_id
  89. and origin.
  90. Args:
  91. txn
  92. transaction_id (str)
  93. origin (str)
  94. code (int)
  95. response_json (str)
  96. """
  97. return self.db.simple_insert(
  98. table="received_transactions",
  99. values={
  100. "transaction_id": transaction_id,
  101. "origin": origin,
  102. "response_code": code,
  103. "response_json": db_binary_type(encode_canonical_json(response_dict)),
  104. "ts": self._clock.time_msec(),
  105. },
  106. or_ignore=True,
  107. desc="set_received_txn_response",
  108. )
  109. @defer.inlineCallbacks
  110. def get_destination_retry_timings(self, destination):
  111. """Gets the current retry timings (if any) for a given destination.
  112. Args:
  113. destination (str)
  114. Returns:
  115. None if not retrying
  116. Otherwise a dict for the retry scheme
  117. """
  118. result = self._destination_retry_cache.get(destination, SENTINEL)
  119. if result is not SENTINEL:
  120. return result
  121. result = yield self.db.runInteraction(
  122. "get_destination_retry_timings",
  123. self._get_destination_retry_timings,
  124. destination,
  125. )
  126. # We don't hugely care about race conditions between getting and
  127. # invalidating the cache, since we time out fairly quickly anyway.
  128. self._destination_retry_cache[destination] = result
  129. return result
  130. def _get_destination_retry_timings(self, txn, destination):
  131. result = self.db.simple_select_one_txn(
  132. txn,
  133. table="destinations",
  134. keyvalues={"destination": destination},
  135. retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"),
  136. allow_none=True,
  137. )
  138. if result and result["retry_last_ts"] > 0:
  139. return result
  140. else:
  141. return None
  142. def set_destination_retry_timings(
  143. self, destination, failure_ts, retry_last_ts, retry_interval
  144. ):
  145. """Sets the current retry timings for a given destination.
  146. Both timings should be zero if retrying is no longer occuring.
  147. Args:
  148. destination (str)
  149. failure_ts (int|None) - when the server started failing (ms since epoch)
  150. retry_last_ts (int) - time of last retry attempt in unix epoch ms
  151. retry_interval (int) - how long until next retry in ms
  152. """
  153. self._destination_retry_cache.pop(destination, None)
  154. return self.db.runInteraction(
  155. "set_destination_retry_timings",
  156. self._set_destination_retry_timings,
  157. destination,
  158. failure_ts,
  159. retry_last_ts,
  160. retry_interval,
  161. )
  162. def _set_destination_retry_timings(
  163. self, txn, destination, failure_ts, retry_last_ts, retry_interval
  164. ):
  165. if self.database_engine.can_native_upsert:
  166. # Upsert retry time interval if retry_interval is zero (i.e. we're
  167. # resetting it) or greater than the existing retry interval.
  168. sql = """
  169. INSERT INTO destinations (
  170. destination, failure_ts, retry_last_ts, retry_interval
  171. )
  172. VALUES (?, ?, ?, ?)
  173. ON CONFLICT (destination) DO UPDATE SET
  174. failure_ts = EXCLUDED.failure_ts,
  175. retry_last_ts = EXCLUDED.retry_last_ts,
  176. retry_interval = EXCLUDED.retry_interval
  177. WHERE
  178. EXCLUDED.retry_interval = 0
  179. OR destinations.retry_interval < EXCLUDED.retry_interval
  180. """
  181. txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval))
  182. return
  183. self.database_engine.lock_table(txn, "destinations")
  184. # We need to be careful here as the data may have changed from under us
  185. # due to a worker setting the timings.
  186. prev_row = self.db.simple_select_one_txn(
  187. txn,
  188. table="destinations",
  189. keyvalues={"destination": destination},
  190. retcols=("failure_ts", "retry_last_ts", "retry_interval"),
  191. allow_none=True,
  192. )
  193. if not prev_row:
  194. self.db.simple_insert_txn(
  195. txn,
  196. table="destinations",
  197. values={
  198. "destination": destination,
  199. "failure_ts": failure_ts,
  200. "retry_last_ts": retry_last_ts,
  201. "retry_interval": retry_interval,
  202. },
  203. )
  204. elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
  205. self.db.simple_update_one_txn(
  206. txn,
  207. "destinations",
  208. keyvalues={"destination": destination},
  209. updatevalues={
  210. "failure_ts": failure_ts,
  211. "retry_last_ts": retry_last_ts,
  212. "retry_interval": retry_interval,
  213. },
  214. )
  215. def _start_cleanup_transactions(self):
  216. return run_as_background_process(
  217. "cleanup_transactions", self._cleanup_transactions
  218. )
  219. def _cleanup_transactions(self):
  220. now = self._clock.time_msec()
  221. month_ago = now - 30 * 24 * 60 * 60 * 1000
  222. def _cleanup_transactions_txn(txn):
  223. txn.execute("DELETE FROM received_transactions WHERE ts < ?", (month_ago,))
  224. return self.db.runInteraction(
  225. "_cleanup_transactions", _cleanup_transactions_txn
  226. )