pusher.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from typing import Iterable, Iterator
  18. from canonicaljson import encode_canonical_json, json
  19. from twisted.internet import defer
  20. from synapse.storage._base import SQLBaseStore
  21. from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
  22. logger = logging.getLogger(__name__)
  23. class PusherWorkerStore(SQLBaseStore):
  24. def _decode_pushers_rows(self, rows: Iterable[dict]) -> Iterator[dict]:
  25. """JSON-decode the data in the rows returned from the `pushers` table
  26. Drops any rows whose data cannot be decoded
  27. """
  28. for r in rows:
  29. dataJson = r["data"]
  30. try:
  31. r["data"] = json.loads(dataJson)
  32. except Exception as e:
  33. logger.warning(
  34. "Invalid JSON in data for pusher %d: %s, %s",
  35. r["id"],
  36. dataJson,
  37. e.args[0],
  38. )
  39. continue
  40. yield r
  41. @defer.inlineCallbacks
  42. def user_has_pusher(self, user_id):
  43. ret = yield self.db.simple_select_one_onecol(
  44. "pushers", {"user_name": user_id}, "id", allow_none=True
  45. )
  46. return ret is not None
  47. def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
  48. return self.get_pushers_by({"app_id": app_id, "pushkey": pushkey})
  49. def get_pushers_by_user_id(self, user_id):
  50. return self.get_pushers_by({"user_name": user_id})
  51. @defer.inlineCallbacks
  52. def get_pushers_by(self, keyvalues):
  53. ret = yield self.db.simple_select_list(
  54. "pushers",
  55. keyvalues,
  56. [
  57. "id",
  58. "user_name",
  59. "access_token",
  60. "profile_tag",
  61. "kind",
  62. "app_id",
  63. "app_display_name",
  64. "device_display_name",
  65. "pushkey",
  66. "ts",
  67. "lang",
  68. "data",
  69. "last_stream_ordering",
  70. "last_success",
  71. "failing_since",
  72. ],
  73. desc="get_pushers_by",
  74. )
  75. return self._decode_pushers_rows(ret)
  76. @defer.inlineCallbacks
  77. def get_all_pushers(self):
  78. def get_pushers(txn):
  79. txn.execute("SELECT * FROM pushers")
  80. rows = self.db.cursor_to_dict(txn)
  81. return self._decode_pushers_rows(rows)
  82. rows = yield self.db.runInteraction("get_all_pushers", get_pushers)
  83. return rows
  84. def get_all_updated_pushers(self, last_id, current_id, limit):
  85. if last_id == current_id:
  86. return defer.succeed(([], []))
  87. def get_all_updated_pushers_txn(txn):
  88. sql = (
  89. "SELECT id, user_name, access_token, profile_tag, kind,"
  90. " app_id, app_display_name, device_display_name, pushkey, ts,"
  91. " lang, data"
  92. " FROM pushers"
  93. " WHERE ? < id AND id <= ?"
  94. " ORDER BY id ASC LIMIT ?"
  95. )
  96. txn.execute(sql, (last_id, current_id, limit))
  97. updated = txn.fetchall()
  98. sql = (
  99. "SELECT stream_id, user_id, app_id, pushkey"
  100. " FROM deleted_pushers"
  101. " WHERE ? < stream_id AND stream_id <= ?"
  102. " ORDER BY stream_id ASC LIMIT ?"
  103. )
  104. txn.execute(sql, (last_id, current_id, limit))
  105. deleted = txn.fetchall()
  106. return updated, deleted
  107. return self.db.runInteraction(
  108. "get_all_updated_pushers", get_all_updated_pushers_txn
  109. )
  110. def get_all_updated_pushers_rows(self, last_id, current_id, limit):
  111. """Get all the pushers that have changed between the given tokens.
  112. Returns:
  113. Deferred(list(tuple)): each tuple consists of:
  114. stream_id (str)
  115. user_id (str)
  116. app_id (str)
  117. pushkey (str)
  118. was_deleted (bool): whether the pusher was added/updated (False)
  119. or deleted (True)
  120. """
  121. if last_id == current_id:
  122. return defer.succeed([])
  123. def get_all_updated_pushers_rows_txn(txn):
  124. sql = (
  125. "SELECT id, user_name, app_id, pushkey"
  126. " FROM pushers"
  127. " WHERE ? < id AND id <= ?"
  128. " ORDER BY id ASC LIMIT ?"
  129. )
  130. txn.execute(sql, (last_id, current_id, limit))
  131. results = [list(row) + [False] for row in txn]
  132. sql = (
  133. "SELECT stream_id, user_id, app_id, pushkey"
  134. " FROM deleted_pushers"
  135. " WHERE ? < stream_id AND stream_id <= ?"
  136. " ORDER BY stream_id ASC LIMIT ?"
  137. )
  138. txn.execute(sql, (last_id, current_id, limit))
  139. results.extend(list(row) + [True] for row in txn)
  140. results.sort() # Sort so that they're ordered by stream id
  141. return results
  142. return self.db.runInteraction(
  143. "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn
  144. )
  145. @cachedInlineCallbacks(num_args=1, max_entries=15000)
  146. def get_if_user_has_pusher(self, user_id):
  147. # This only exists for the cachedList decorator
  148. raise NotImplementedError()
  149. @cachedList(
  150. cached_method_name="get_if_user_has_pusher",
  151. list_name="user_ids",
  152. num_args=1,
  153. inlineCallbacks=True,
  154. )
  155. def get_if_users_have_pushers(self, user_ids):
  156. rows = yield self.db.simple_select_many_batch(
  157. table="pushers",
  158. column="user_name",
  159. iterable=user_ids,
  160. retcols=["user_name"],
  161. desc="get_if_users_have_pushers",
  162. )
  163. result = {user_id: False for user_id in user_ids}
  164. result.update({r["user_name"]: True for r in rows})
  165. return result
  166. class PusherStore(PusherWorkerStore):
  167. def get_pushers_stream_token(self):
  168. return self._pushers_id_gen.get_current_token()
  169. @defer.inlineCallbacks
  170. def add_pusher(
  171. self,
  172. user_id,
  173. access_token,
  174. kind,
  175. app_id,
  176. app_display_name,
  177. device_display_name,
  178. pushkey,
  179. pushkey_ts,
  180. lang,
  181. data,
  182. last_stream_ordering,
  183. profile_tag="",
  184. ):
  185. with self._pushers_id_gen.get_next() as stream_id:
  186. # no need to lock because `pushers` has a unique key on
  187. # (app_id, pushkey, user_name) so simple_upsert will retry
  188. yield self.db.simple_upsert(
  189. table="pushers",
  190. keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
  191. values={
  192. "access_token": access_token,
  193. "kind": kind,
  194. "app_display_name": app_display_name,
  195. "device_display_name": device_display_name,
  196. "ts": pushkey_ts,
  197. "lang": lang,
  198. "data": bytearray(encode_canonical_json(data)),
  199. "last_stream_ordering": last_stream_ordering,
  200. "profile_tag": profile_tag,
  201. "id": stream_id,
  202. },
  203. desc="add_pusher",
  204. lock=False,
  205. )
  206. user_has_pusher = self.get_if_user_has_pusher.cache.get(
  207. (user_id,), None, update_metrics=False
  208. )
  209. if user_has_pusher is not True:
  210. # invalidate, since we the user might not have had a pusher before
  211. yield self.db.runInteraction(
  212. "add_pusher",
  213. self._invalidate_cache_and_stream,
  214. self.get_if_user_has_pusher,
  215. (user_id,),
  216. )
  217. @defer.inlineCallbacks
  218. def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
  219. def delete_pusher_txn(txn, stream_id):
  220. self._invalidate_cache_and_stream(
  221. txn, self.get_if_user_has_pusher, (user_id,)
  222. )
  223. self.db.simple_delete_one_txn(
  224. txn,
  225. "pushers",
  226. {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
  227. )
  228. # it's possible for us to end up with duplicate rows for
  229. # (app_id, pushkey, user_id) at different stream_ids, but that
  230. # doesn't really matter.
  231. self.db.simple_insert_txn(
  232. txn,
  233. table="deleted_pushers",
  234. values={
  235. "stream_id": stream_id,
  236. "app_id": app_id,
  237. "pushkey": pushkey,
  238. "user_id": user_id,
  239. },
  240. )
  241. with self._pushers_id_gen.get_next() as stream_id:
  242. yield self.db.runInteraction("delete_pusher", delete_pusher_txn, stream_id)
  243. @defer.inlineCallbacks
  244. def update_pusher_last_stream_ordering(
  245. self, app_id, pushkey, user_id, last_stream_ordering
  246. ):
  247. yield self.db.simple_update_one(
  248. "pushers",
  249. {"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
  250. {"last_stream_ordering": last_stream_ordering},
  251. desc="update_pusher_last_stream_ordering",
  252. )
  253. @defer.inlineCallbacks
  254. def update_pusher_last_stream_ordering_and_success(
  255. self, app_id, pushkey, user_id, last_stream_ordering, last_success
  256. ):
  257. """Update the last stream ordering position we've processed up to for
  258. the given pusher.
  259. Args:
  260. app_id (str)
  261. pushkey (str)
  262. last_stream_ordering (int)
  263. last_success (int)
  264. Returns:
  265. Deferred[bool]: True if the pusher still exists; False if it has been deleted.
  266. """
  267. updated = yield self.db.simple_update(
  268. table="pushers",
  269. keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
  270. updatevalues={
  271. "last_stream_ordering": last_stream_ordering,
  272. "last_success": last_success,
  273. },
  274. desc="update_pusher_last_stream_ordering_and_success",
  275. )
  276. return bool(updated)
  277. @defer.inlineCallbacks
  278. def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since):
  279. yield self.db.simple_update(
  280. table="pushers",
  281. keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
  282. updatevalues={"failing_since": failing_since},
  283. desc="update_pusher_failing_since",
  284. )
  285. @defer.inlineCallbacks
  286. def get_throttle_params_by_room(self, pusher_id):
  287. res = yield self.db.simple_select_list(
  288. "pusher_throttle",
  289. {"pusher": pusher_id},
  290. ["room_id", "last_sent_ts", "throttle_ms"],
  291. desc="get_throttle_params_by_room",
  292. )
  293. params_by_room = {}
  294. for row in res:
  295. params_by_room[row["room_id"]] = {
  296. "last_sent_ts": row["last_sent_ts"],
  297. "throttle_ms": row["throttle_ms"],
  298. }
  299. return params_by_room
  300. @defer.inlineCallbacks
  301. def set_throttle_params(self, pusher_id, room_id, params):
  302. # no need to lock because `pusher_throttle` has a primary key on
  303. # (pusher, room_id) so simple_upsert will retry
  304. yield self.db.simple_upsert(
  305. "pusher_throttle",
  306. {"pusher": pusher_id, "room_id": room_id},
  307. params,
  308. desc="set_throttle_params",
  309. lock=False,
  310. )