pusher.py 9.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from ._base import SQLBaseStore
  16. from twisted.internet import defer
  17. from canonicaljson import encode_canonical_json
  18. from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList
  19. import logging
  20. import simplejson as json
  21. import types
  22. logger = logging.getLogger(__name__)
  23. class PusherStore(SQLBaseStore):
  24. def _decode_pushers_rows(self, rows):
  25. for r in rows:
  26. dataJson = r['data']
  27. r['data'] = None
  28. try:
  29. if isinstance(dataJson, types.BufferType):
  30. dataJson = str(dataJson).decode("UTF8")
  31. r['data'] = json.loads(dataJson)
  32. except Exception as e:
  33. logger.warn(
  34. "Invalid JSON in data for pusher %d: %s, %s",
  35. r['id'], dataJson, e.message,
  36. )
  37. pass
  38. if isinstance(r['pushkey'], types.BufferType):
  39. r['pushkey'] = str(r['pushkey']).decode("UTF8")
  40. return rows
  41. @defer.inlineCallbacks
  42. def user_has_pusher(self, user_id):
  43. ret = yield self._simple_select_one_onecol(
  44. "pushers", {"user_name": user_id}, "id", allow_none=True
  45. )
  46. defer.returnValue(ret is not None)
  47. def get_pushers_by_app_id_and_pushkey(self, app_id, pushkey):
  48. return self.get_pushers_by({
  49. "app_id": app_id,
  50. "pushkey": pushkey,
  51. })
  52. def get_pushers_by_user_id(self, user_id):
  53. return self.get_pushers_by({
  54. "user_name": user_id,
  55. })
  56. @defer.inlineCallbacks
  57. def get_pushers_by(self, keyvalues):
  58. ret = yield self._simple_select_list(
  59. "pushers", keyvalues,
  60. [
  61. "id",
  62. "user_name",
  63. "access_token",
  64. "profile_tag",
  65. "kind",
  66. "app_id",
  67. "app_display_name",
  68. "device_display_name",
  69. "pushkey",
  70. "ts",
  71. "lang",
  72. "data",
  73. "last_stream_ordering",
  74. "last_success",
  75. "failing_since",
  76. ], desc="get_pushers_by"
  77. )
  78. defer.returnValue(self._decode_pushers_rows(ret))
  79. @defer.inlineCallbacks
  80. def get_all_pushers(self):
  81. def get_pushers(txn):
  82. txn.execute("SELECT * FROM pushers")
  83. rows = self.cursor_to_dict(txn)
  84. return self._decode_pushers_rows(rows)
  85. rows = yield self.runInteraction("get_all_pushers", get_pushers)
  86. defer.returnValue(rows)
  87. def get_pushers_stream_token(self):
  88. return self._pushers_id_gen.get_current_token()
  89. def get_all_updated_pushers(self, last_id, current_id, limit):
  90. if last_id == current_id:
  91. return defer.succeed(([], []))
  92. def get_all_updated_pushers_txn(txn):
  93. sql = (
  94. "SELECT id, user_name, access_token, profile_tag, kind,"
  95. " app_id, app_display_name, device_display_name, pushkey, ts,"
  96. " lang, data"
  97. " FROM pushers"
  98. " WHERE ? < id AND id <= ?"
  99. " ORDER BY id ASC LIMIT ?"
  100. )
  101. txn.execute(sql, (last_id, current_id, limit))
  102. updated = txn.fetchall()
  103. sql = (
  104. "SELECT stream_id, user_id, app_id, pushkey"
  105. " FROM deleted_pushers"
  106. " WHERE ? < stream_id AND stream_id <= ?"
  107. " ORDER BY stream_id ASC LIMIT ?"
  108. )
  109. txn.execute(sql, (last_id, current_id, limit))
  110. deleted = txn.fetchall()
  111. return (updated, deleted)
  112. return self.runInteraction(
  113. "get_all_updated_pushers", get_all_updated_pushers_txn
  114. )
  115. @cachedInlineCallbacks(lru=True, num_args=1, max_entries=15000)
  116. def get_if_user_has_pusher(self, user_id):
  117. result = yield self._simple_select_many_batch(
  118. table='pushers',
  119. keyvalues={
  120. 'user_name': 'user_id',
  121. },
  122. retcol='user_name',
  123. desc='get_if_user_has_pusher',
  124. allow_none=True,
  125. )
  126. defer.returnValue(bool(result))
  127. @cachedList(cached_method_name="get_if_user_has_pusher",
  128. list_name="user_ids", num_args=1, inlineCallbacks=True)
  129. def get_if_users_have_pushers(self, user_ids):
  130. rows = yield self._simple_select_many_batch(
  131. table='pushers',
  132. column='user_name',
  133. iterable=user_ids,
  134. retcols=['user_name'],
  135. desc='get_if_users_have_pushers'
  136. )
  137. result = {user_id: False for user_id in user_ids}
  138. result.update({r['user_name']: True for r in rows})
  139. defer.returnValue(result)
  140. @defer.inlineCallbacks
  141. def add_pusher(self, user_id, access_token, kind, app_id,
  142. app_display_name, device_display_name,
  143. pushkey, pushkey_ts, lang, data, last_stream_ordering,
  144. profile_tag=""):
  145. with self._pushers_id_gen.get_next() as stream_id:
  146. def f(txn):
  147. newly_inserted = self._simple_upsert_txn(
  148. txn,
  149. "pushers",
  150. {
  151. "app_id": app_id,
  152. "pushkey": pushkey,
  153. "user_name": user_id,
  154. },
  155. {
  156. "access_token": access_token,
  157. "kind": kind,
  158. "app_display_name": app_display_name,
  159. "device_display_name": device_display_name,
  160. "ts": pushkey_ts,
  161. "lang": lang,
  162. "data": encode_canonical_json(data),
  163. "last_stream_ordering": last_stream_ordering,
  164. "profile_tag": profile_tag,
  165. "id": stream_id,
  166. },
  167. )
  168. if newly_inserted:
  169. # get_if_user_has_pusher only cares if the user has
  170. # at least *one* pusher.
  171. txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
  172. yield self.runInteraction("add_pusher", f)
  173. @defer.inlineCallbacks
  174. def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id):
  175. def delete_pusher_txn(txn, stream_id):
  176. txn.call_after(self.get_if_user_has_pusher.invalidate, (user_id,))
  177. self._simple_delete_one_txn(
  178. txn,
  179. "pushers",
  180. {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}
  181. )
  182. self._simple_upsert_txn(
  183. txn,
  184. "deleted_pushers",
  185. {"app_id": app_id, "pushkey": pushkey, "user_id": user_id},
  186. {"stream_id": stream_id},
  187. )
  188. with self._pushers_id_gen.get_next() as stream_id:
  189. yield self.runInteraction(
  190. "delete_pusher", delete_pusher_txn, stream_id
  191. )
  192. @defer.inlineCallbacks
  193. def update_pusher_last_stream_ordering(self, app_id, pushkey, user_id,
  194. last_stream_ordering):
  195. yield self._simple_update_one(
  196. "pushers",
  197. {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
  198. {'last_stream_ordering': last_stream_ordering},
  199. desc="update_pusher_last_stream_ordering",
  200. )
  201. @defer.inlineCallbacks
  202. def update_pusher_last_stream_ordering_and_success(self, app_id, pushkey,
  203. user_id,
  204. last_stream_ordering,
  205. last_success):
  206. yield self._simple_update_one(
  207. "pushers",
  208. {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
  209. {
  210. 'last_stream_ordering': last_stream_ordering,
  211. 'last_success': last_success
  212. },
  213. desc="update_pusher_last_stream_ordering_and_success",
  214. )
  215. @defer.inlineCallbacks
  216. def update_pusher_failing_since(self, app_id, pushkey, user_id,
  217. failing_since):
  218. yield self._simple_update_one(
  219. "pushers",
  220. {'app_id': app_id, 'pushkey': pushkey, 'user_name': user_id},
  221. {'failing_since': failing_since},
  222. desc="update_pusher_failing_since",
  223. )
  224. @defer.inlineCallbacks
  225. def get_throttle_params_by_room(self, pusher_id):
  226. res = yield self._simple_select_list(
  227. "pusher_throttle",
  228. {"pusher": pusher_id},
  229. ["room_id", "last_sent_ts", "throttle_ms"],
  230. desc="get_throttle_params_by_room"
  231. )
  232. params_by_room = {}
  233. for row in res:
  234. params_by_room[row["room_id"]] = {
  235. "last_sent_ts": row["last_sent_ts"],
  236. "throttle_ms": row["throttle_ms"]
  237. }
  238. defer.returnValue(params_by_room)
  239. @defer.inlineCallbacks
  240. def set_throttle_params(self, pusher_id, room_id, params):
  241. yield self._simple_upsert(
  242. "pusher_throttle",
  243. {"pusher": pusher_id, "room_id": room_id},
  244. params,
  245. desc="set_throttle_params"
  246. )