httppusher.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. # Copyright 2017 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. import six
  18. from prometheus_client import Counter
  19. from twisted.internet import defer
  20. from twisted.internet.error import AlreadyCalled, AlreadyCancelled
  21. from synapse.metrics.background_process_metrics import run_as_background_process
  22. from synapse.push import PusherConfigException
  23. from . import push_rule_evaluator, push_tools
  24. if six.PY3:
  25. long = int
  26. logger = logging.getLogger(__name__)
  27. http_push_processed_counter = Counter(
  28. "synapse_http_httppusher_http_pushes_processed",
  29. "Number of push notifications successfully sent",
  30. )
  31. http_push_failed_counter = Counter(
  32. "synapse_http_httppusher_http_pushes_failed",
  33. "Number of push notifications which failed",
  34. )
  35. http_badges_processed_counter = Counter(
  36. "synapse_http_httppusher_badge_updates_processed",
  37. "Number of badge updates successfully sent",
  38. )
  39. http_badges_failed_counter = Counter(
  40. "synapse_http_httppusher_badge_updates_failed",
  41. "Number of badge updates which failed",
  42. )
  43. class HttpPusher(object):
  44. INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
  45. MAX_BACKOFF_SEC = 60 * 60
  46. # This one's in ms because we compare it against the clock
  47. GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
  48. def __init__(self, hs, pusherdict):
  49. self.hs = hs
  50. self.store = self.hs.get_datastore()
  51. self.clock = self.hs.get_clock()
  52. self.state_handler = self.hs.get_state_handler()
  53. self.user_id = pusherdict["user_name"]
  54. self.app_id = pusherdict["app_id"]
  55. self.app_display_name = pusherdict["app_display_name"]
  56. self.device_display_name = pusherdict["device_display_name"]
  57. self.pushkey = pusherdict["pushkey"]
  58. self.pushkey_ts = pusherdict["ts"]
  59. self.data = pusherdict["data"]
  60. self.last_stream_ordering = pusherdict["last_stream_ordering"]
  61. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  62. self.failing_since = pusherdict["failing_since"]
  63. self.timed_call = None
  64. self._is_processing = False
  65. # This is the highest stream ordering we know it's safe to process.
  66. # When new events arrive, we'll be given a window of new events: we
  67. # should honour this rather than just looking for anything higher
  68. # because of potential out-of-order event serialisation. This starts
  69. # off as None though as we don't know any better.
  70. self.max_stream_ordering = None
  71. if "data" not in pusherdict:
  72. raise PusherConfigException("No 'data' key for HTTP pusher")
  73. self.data = pusherdict["data"]
  74. self.name = "%s/%s/%s" % (
  75. pusherdict["user_name"],
  76. pusherdict["app_id"],
  77. pusherdict["pushkey"],
  78. )
  79. if self.data is None:
  80. raise PusherConfigException("data can not be null for HTTP pusher")
  81. if "url" not in self.data:
  82. raise PusherConfigException("'url' required in data for HTTP pusher")
  83. self.url = self.data["url"]
  84. self.http_client = hs.get_simple_http_client()
  85. self.data_minus_url = {}
  86. self.data_minus_url.update(self.data)
  87. del self.data_minus_url["url"]
  88. def on_started(self, should_check_for_notifs):
  89. """Called when this pusher has been started.
  90. Args:
  91. should_check_for_notifs (bool): Whether we should immediately
  92. check for push to send. Set to False only if it's known there
  93. is nothing to send
  94. """
  95. if should_check_for_notifs:
  96. self._start_processing()
  97. def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
  98. self.max_stream_ordering = max(
  99. max_stream_ordering, self.max_stream_ordering or 0
  100. )
  101. self._start_processing()
  102. def on_new_receipts(self, min_stream_id, max_stream_id):
  103. # Note that the min here shouldn't be relied upon to be accurate.
  104. # We could check the receipts are actually m.read receipts here,
  105. # but currently that's the only type of receipt anyway...
  106. run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
  107. @defer.inlineCallbacks
  108. def _update_badge(self):
  109. badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
  110. yield self._send_badge(badge)
  111. def on_timer(self):
  112. self._start_processing()
  113. def on_stop(self):
  114. if self.timed_call:
  115. try:
  116. self.timed_call.cancel()
  117. except (AlreadyCalled, AlreadyCancelled):
  118. pass
  119. self.timed_call = None
  120. def _start_processing(self):
  121. if self._is_processing:
  122. return
  123. run_as_background_process("httppush.process", self._process)
  124. @defer.inlineCallbacks
  125. def _process(self):
  126. # we should never get here if we are already processing
  127. assert not self._is_processing
  128. try:
  129. self._is_processing = True
  130. # if the max ordering changes while we're running _unsafe_process,
  131. # call it again, and so on until we've caught up.
  132. while True:
  133. starting_max_ordering = self.max_stream_ordering
  134. try:
  135. yield self._unsafe_process()
  136. except Exception:
  137. logger.exception("Exception processing notifs")
  138. if self.max_stream_ordering == starting_max_ordering:
  139. break
  140. finally:
  141. self._is_processing = False
  142. @defer.inlineCallbacks
  143. def _unsafe_process(self):
  144. """
  145. Looks for unset notifications and dispatch them, in order
  146. Never call this directly: use _process which will only allow this to
  147. run once per pusher.
  148. """
  149. fn = self.store.get_unread_push_actions_for_user_in_range_for_http
  150. unprocessed = yield fn(
  151. self.user_id, self.last_stream_ordering, self.max_stream_ordering
  152. )
  153. logger.info(
  154. "Processing %i unprocessed push actions for %s starting at "
  155. "stream_ordering %s",
  156. len(unprocessed),
  157. self.name,
  158. self.last_stream_ordering,
  159. )
  160. for push_action in unprocessed:
  161. processed = yield self._process_one(push_action)
  162. if processed:
  163. http_push_processed_counter.inc()
  164. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  165. self.last_stream_ordering = push_action["stream_ordering"]
  166. yield self.store.update_pusher_last_stream_ordering_and_success(
  167. self.app_id,
  168. self.pushkey,
  169. self.user_id,
  170. self.last_stream_ordering,
  171. self.clock.time_msec(),
  172. )
  173. if self.failing_since:
  174. self.failing_since = None
  175. yield self.store.update_pusher_failing_since(
  176. self.app_id, self.pushkey, self.user_id, self.failing_since
  177. )
  178. else:
  179. http_push_failed_counter.inc()
  180. if not self.failing_since:
  181. self.failing_since = self.clock.time_msec()
  182. yield self.store.update_pusher_failing_since(
  183. self.app_id, self.pushkey, self.user_id, self.failing_since
  184. )
  185. if (
  186. self.failing_since
  187. and self.failing_since
  188. < self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
  189. ):
  190. # we really only give up so that if the URL gets
  191. # fixed, we don't suddenly deliver a load
  192. # of old notifications.
  193. logger.warn(
  194. "Giving up on a notification to user %s, " "pushkey %s",
  195. self.user_id,
  196. self.pushkey,
  197. )
  198. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  199. self.last_stream_ordering = push_action["stream_ordering"]
  200. yield self.store.update_pusher_last_stream_ordering(
  201. self.app_id,
  202. self.pushkey,
  203. self.user_id,
  204. self.last_stream_ordering,
  205. )
  206. self.failing_since = None
  207. yield self.store.update_pusher_failing_since(
  208. self.app_id, self.pushkey, self.user_id, self.failing_since
  209. )
  210. else:
  211. logger.info("Push failed: delaying for %ds", self.backoff_delay)
  212. self.timed_call = self.hs.get_reactor().callLater(
  213. self.backoff_delay, self.on_timer
  214. )
  215. self.backoff_delay = min(
  216. self.backoff_delay * 2, self.MAX_BACKOFF_SEC
  217. )
  218. break
  219. @defer.inlineCallbacks
  220. def _process_one(self, push_action):
  221. if "notify" not in push_action["actions"]:
  222. return True
  223. tweaks = push_rule_evaluator.tweaks_for_actions(push_action["actions"])
  224. badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
  225. event = yield self.store.get_event(push_action["event_id"], allow_none=True)
  226. if event is None:
  227. return True # It's been redacted
  228. rejected = yield self.dispatch_push(event, tweaks, badge)
  229. if rejected is False:
  230. return False
  231. if isinstance(rejected, list) or isinstance(rejected, tuple):
  232. for pk in rejected:
  233. if pk != self.pushkey:
  234. # for sanity, we only remove the pushkey if it
  235. # was the one we actually sent...
  236. logger.warn(
  237. ("Ignoring rejected pushkey %s because we" " didn't send it"),
  238. pk,
  239. )
  240. else:
  241. logger.info("Pushkey %s was rejected: removing", pk)
  242. yield self.hs.remove_pusher(self.app_id, pk, self.user_id)
  243. return True
  244. @defer.inlineCallbacks
  245. def _build_notification_dict(self, event, tweaks, badge):
  246. if self.data.get("format") == "event_id_only":
  247. d = {
  248. "notification": {
  249. "event_id": event.event_id,
  250. "room_id": event.room_id,
  251. "counts": {"unread": badge},
  252. "devices": [
  253. {
  254. "app_id": self.app_id,
  255. "pushkey": self.pushkey,
  256. "pushkey_ts": long(self.pushkey_ts / 1000),
  257. "data": self.data_minus_url,
  258. }
  259. ],
  260. }
  261. }
  262. return d
  263. ctx = yield push_tools.get_context_for_event(
  264. self.store, self.state_handler, event, self.user_id
  265. )
  266. d = {
  267. "notification": {
  268. "id": event.event_id, # deprecated: remove soon
  269. "event_id": event.event_id,
  270. "room_id": event.room_id,
  271. "type": event.type,
  272. "sender": event.user_id,
  273. "counts": { # -- we don't mark messages as read yet so
  274. # we have no way of knowing
  275. # Just set the badge to 1 until we have read receipts
  276. "unread": badge,
  277. # 'missed_calls': 2
  278. },
  279. "devices": [
  280. {
  281. "app_id": self.app_id,
  282. "pushkey": self.pushkey,
  283. "pushkey_ts": long(self.pushkey_ts / 1000),
  284. "data": self.data_minus_url,
  285. "tweaks": tweaks,
  286. }
  287. ],
  288. }
  289. }
  290. if event.type == "m.room.member" and event.is_state():
  291. d["notification"]["membership"] = event.content["membership"]
  292. d["notification"]["user_is_target"] = event.state_key == self.user_id
  293. if self.hs.config.push_include_content and event.content:
  294. d["notification"]["content"] = event.content
  295. # We no longer send aliases separately, instead, we send the human
  296. # readable name of the room, which may be an alias.
  297. if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
  298. d["notification"]["sender_display_name"] = ctx["sender_display_name"]
  299. if "name" in ctx and len(ctx["name"]) > 0:
  300. d["notification"]["room_name"] = ctx["name"]
  301. return d
  302. @defer.inlineCallbacks
  303. def dispatch_push(self, event, tweaks, badge):
  304. notification_dict = yield self._build_notification_dict(event, tweaks, badge)
  305. if not notification_dict:
  306. return []
  307. try:
  308. resp = yield self.http_client.post_json_get_json(
  309. self.url, notification_dict
  310. )
  311. except Exception as e:
  312. logger.warning(
  313. "Failed to push event %s to %s: %s %s",
  314. event.event_id,
  315. self.name,
  316. type(e),
  317. e,
  318. )
  319. return False
  320. rejected = []
  321. if "rejected" in resp:
  322. rejected = resp["rejected"]
  323. return rejected
  324. @defer.inlineCallbacks
  325. def _send_badge(self, badge):
  326. """
  327. Args:
  328. badge (int): number of unread messages
  329. """
  330. logger.info("Sending updated badge count %d to %s", badge, self.name)
  331. d = {
  332. "notification": {
  333. "id": "",
  334. "type": None,
  335. "sender": "",
  336. "counts": {"unread": badge},
  337. "devices": [
  338. {
  339. "app_id": self.app_id,
  340. "pushkey": self.pushkey,
  341. "pushkey_ts": long(self.pushkey_ts / 1000),
  342. "data": self.data_minus_url,
  343. }
  344. ],
  345. }
  346. }
  347. try:
  348. yield self.http_client.post_json_get_json(self.url, d)
  349. http_badges_processed_counter.inc()
  350. except Exception as e:
  351. logger.warning(
  352. "Failed to send badge count to %s: %s %s", self.name, type(e), e
  353. )
  354. http_badges_failed_counter.inc()