pusherpool.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # Copyright 2015, 2016 OpenMarket Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from twisted.internet import defer
  18. from synapse.metrics.background_process_metrics import run_as_background_process
  19. from synapse.push import PusherConfigException
  20. from synapse.push.pusher import PusherFactory
  21. from synapse.util.async_helpers import concurrently_execute
  22. logger = logging.getLogger(__name__)
  23. class PusherPool:
  24. """
  25. The pusher pool. This is responsible for dispatching notifications of new events to
  26. the http and email pushers.
  27. It provides three methods which are designed to be called by the rest of the
  28. application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these
  29. delegates to each of the relevant pushers.
  30. Note that it is expected that each pusher will have its own 'processing' loop which
  31. will send out the notifications in the background, rather than blocking until the
  32. notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
  33. Pusher.on_new_receipts are not expected to return deferreds.
  34. """
  35. def __init__(self, _hs):
  36. self.hs = _hs
  37. self.pusher_factory = PusherFactory(_hs)
  38. self._should_start_pushers = _hs.config.start_pushers
  39. self.store = self.hs.get_datastore()
  40. self.clock = self.hs.get_clock()
  41. self.pushers = {}
  42. def start(self):
  43. """Starts the pushers off in a background process.
  44. """
  45. if not self._should_start_pushers:
  46. logger.info("Not starting pushers because they are disabled in the config")
  47. return
  48. run_as_background_process("start_pushers", self._start_pushers)
  49. @defer.inlineCallbacks
  50. def add_pusher(
  51. self,
  52. user_id,
  53. access_token,
  54. kind,
  55. app_id,
  56. app_display_name,
  57. device_display_name,
  58. pushkey,
  59. lang,
  60. data,
  61. profile_tag="",
  62. ):
  63. """Creates a new pusher and adds it to the pool
  64. Returns:
  65. Deferred[EmailPusher|HttpPusher]
  66. """
  67. time_now_msec = self.clock.time_msec()
  68. # we try to create the pusher just to validate the config: it
  69. # will then get pulled out of the database,
  70. # recreated, added and started: this means we have only one
  71. # code path adding pushers.
  72. self.pusher_factory.create_pusher(
  73. {
  74. "id": None,
  75. "user_name": user_id,
  76. "kind": kind,
  77. "app_id": app_id,
  78. "app_display_name": app_display_name,
  79. "device_display_name": device_display_name,
  80. "pushkey": pushkey,
  81. "ts": time_now_msec,
  82. "lang": lang,
  83. "data": data,
  84. "last_stream_ordering": None,
  85. "last_success": None,
  86. "failing_since": None,
  87. }
  88. )
  89. # create the pusher setting last_stream_ordering to the current maximum
  90. # stream ordering in event_push_actions, so it will process
  91. # pushes from this point onwards.
  92. last_stream_ordering = (
  93. yield self.store.get_latest_push_action_stream_ordering()
  94. )
  95. yield self.store.add_pusher(
  96. user_id=user_id,
  97. access_token=access_token,
  98. kind=kind,
  99. app_id=app_id,
  100. app_display_name=app_display_name,
  101. device_display_name=device_display_name,
  102. pushkey=pushkey,
  103. pushkey_ts=time_now_msec,
  104. lang=lang,
  105. data=data,
  106. last_stream_ordering=last_stream_ordering,
  107. profile_tag=profile_tag,
  108. )
  109. pusher = yield self.start_pusher_by_id(app_id, pushkey, user_id)
  110. return pusher
  111. @defer.inlineCallbacks
  112. def remove_pushers_by_app_id_and_pushkey_not_user(
  113. self, app_id, pushkey, not_user_id
  114. ):
  115. to_remove = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
  116. for p in to_remove:
  117. if p["user_name"] != not_user_id:
  118. logger.info(
  119. "Removing pusher for app id %s, pushkey %s, user %s",
  120. app_id,
  121. pushkey,
  122. p["user_name"],
  123. )
  124. yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
  125. @defer.inlineCallbacks
  126. def remove_pushers_by_access_token(self, user_id, access_tokens):
  127. """Remove the pushers for a given user corresponding to a set of
  128. access_tokens.
  129. Args:
  130. user_id (str): user to remove pushers for
  131. access_tokens (Iterable[int]): access token *ids* to remove pushers
  132. for
  133. """
  134. tokens = set(access_tokens)
  135. for p in (yield self.store.get_pushers_by_user_id(user_id)):
  136. if p["access_token"] in tokens:
  137. logger.info(
  138. "Removing pusher for app id %s, pushkey %s, user %s",
  139. p["app_id"],
  140. p["pushkey"],
  141. p["user_name"],
  142. )
  143. yield self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
  144. @defer.inlineCallbacks
  145. def on_new_notifications(self, min_stream_id, max_stream_id):
  146. if not self.pushers:
  147. # nothing to do here.
  148. return
  149. try:
  150. users_affected = yield self.store.get_push_action_users_in_range(
  151. min_stream_id, max_stream_id
  152. )
  153. for u in users_affected:
  154. if u in self.pushers:
  155. for p in self.pushers[u].values():
  156. p.on_new_notifications(min_stream_id, max_stream_id)
  157. except Exception:
  158. logger.exception("Exception in pusher on_new_notifications")
  159. @defer.inlineCallbacks
  160. def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
  161. if not self.pushers:
  162. # nothing to do here.
  163. return
  164. try:
  165. # Need to subtract 1 from the minimum because the lower bound here
  166. # is not inclusive
  167. updated_receipts = yield self.store.get_all_updated_receipts(
  168. min_stream_id - 1, max_stream_id
  169. )
  170. # This returns a tuple, user_id is at index 3
  171. users_affected = set([r[3] for r in updated_receipts])
  172. for u in users_affected:
  173. if u in self.pushers:
  174. for p in self.pushers[u].values():
  175. p.on_new_receipts(min_stream_id, max_stream_id)
  176. except Exception:
  177. logger.exception("Exception in pusher on_new_receipts")
  178. @defer.inlineCallbacks
  179. def start_pusher_by_id(self, app_id, pushkey, user_id):
  180. """Look up the details for the given pusher, and start it
  181. Returns:
  182. Deferred[EmailPusher|HttpPusher|None]: The pusher started, if any
  183. """
  184. if not self._should_start_pushers:
  185. return
  186. resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(app_id, pushkey)
  187. pusher_dict = None
  188. for r in resultlist:
  189. if r["user_name"] == user_id:
  190. pusher_dict = r
  191. pusher = None
  192. if pusher_dict:
  193. pusher = yield self._start_pusher(pusher_dict)
  194. return pusher
  195. @defer.inlineCallbacks
  196. def _start_pushers(self):
  197. """Start all the pushers
  198. Returns:
  199. Deferred
  200. """
  201. pushers = yield self.store.get_all_pushers()
  202. logger.info("Starting %d pushers", len(pushers))
  203. # Stagger starting up the pushers so we don't completely drown the
  204. # process on start up.
  205. yield concurrently_execute(self._start_pusher, pushers, 10)
  206. logger.info("Started pushers")
  207. @defer.inlineCallbacks
  208. def _start_pusher(self, pusherdict):
  209. """Start the given pusher
  210. Args:
  211. pusherdict (dict):
  212. Returns:
  213. Deferred[EmailPusher|HttpPusher]
  214. """
  215. try:
  216. p = self.pusher_factory.create_pusher(pusherdict)
  217. except PusherConfigException as e:
  218. logger.warning(
  219. "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s",
  220. pusherdict.get("user_name"),
  221. pusherdict.get("app_id"),
  222. pusherdict.get("pushkey"),
  223. e,
  224. )
  225. return
  226. except Exception:
  227. logger.exception("Couldn't start a pusher: caught Exception")
  228. return
  229. if not p:
  230. return
  231. appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"])
  232. byuser = self.pushers.setdefault(pusherdict["user_name"], {})
  233. if appid_pushkey in byuser:
  234. byuser[appid_pushkey].on_stop()
  235. byuser[appid_pushkey] = p
  236. # Check if there *may* be push to process. We do this as this check is a
  237. # lot cheaper to do than actually fetching the exact rows we need to
  238. # push.
  239. user_id = pusherdict["user_name"]
  240. last_stream_ordering = pusherdict["last_stream_ordering"]
  241. if last_stream_ordering:
  242. have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
  243. user_id, last_stream_ordering
  244. )
  245. else:
  246. # We always want to default to starting up the pusher rather than
  247. # risk missing push.
  248. have_notifs = True
  249. p.on_started(have_notifs)
  250. return p
  251. @defer.inlineCallbacks
  252. def remove_pusher(self, app_id, pushkey, user_id):
  253. appid_pushkey = "%s:%s" % (app_id, pushkey)
  254. byuser = self.pushers.get(user_id, {})
  255. if appid_pushkey in byuser:
  256. logger.info("Stopping pusher %s / %s", user_id, appid_pushkey)
  257. byuser[appid_pushkey].on_stop()
  258. del byuser[appid_pushkey]
  259. yield self.store.delete_pusher_by_app_id_pushkey_user_id(
  260. app_id, pushkey, user_id
  261. )