httppusher.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. # Copyright 2017 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from prometheus_client import Counter
  18. from twisted.internet.error import AlreadyCalled, AlreadyCancelled
  19. from synapse.api.constants import EventTypes
  20. from synapse.logging import opentracing
  21. from synapse.metrics.background_process_metrics import run_as_background_process
  22. from synapse.push import PusherConfigException
  23. from . import push_rule_evaluator, push_tools
  24. logger = logging.getLogger(__name__)
  25. http_push_processed_counter = Counter(
  26. "synapse_http_httppusher_http_pushes_processed",
  27. "Number of push notifications successfully sent",
  28. )
  29. http_push_failed_counter = Counter(
  30. "synapse_http_httppusher_http_pushes_failed",
  31. "Number of push notifications which failed",
  32. )
  33. http_badges_processed_counter = Counter(
  34. "synapse_http_httppusher_badge_updates_processed",
  35. "Number of badge updates successfully sent",
  36. )
  37. http_badges_failed_counter = Counter(
  38. "synapse_http_httppusher_badge_updates_failed",
  39. "Number of badge updates which failed",
  40. )
  41. class HttpPusher:
  42. INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
  43. MAX_BACKOFF_SEC = 60 * 60
  44. # This one's in ms because we compare it against the clock
  45. GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
  46. def __init__(self, hs, pusherdict):
  47. self.hs = hs
  48. self.store = self.hs.get_datastore()
  49. self.storage = self.hs.get_storage()
  50. self.clock = self.hs.get_clock()
  51. self.state_handler = self.hs.get_state_handler()
  52. self.user_id = pusherdict["user_name"]
  53. self.app_id = pusherdict["app_id"]
  54. self.app_display_name = pusherdict["app_display_name"]
  55. self.device_display_name = pusherdict["device_display_name"]
  56. self.pushkey = pusherdict["pushkey"]
  57. self.pushkey_ts = pusherdict["ts"]
  58. self.data = pusherdict["data"]
  59. self.last_stream_ordering = pusherdict["last_stream_ordering"]
  60. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  61. self.failing_since = pusherdict["failing_since"]
  62. self.timed_call = None
  63. self._is_processing = False
  64. # This is the highest stream ordering we know it's safe to process.
  65. # When new events arrive, we'll be given a window of new events: we
  66. # should honour this rather than just looking for anything higher
  67. # because of potential out-of-order event serialisation. This starts
  68. # off as None though as we don't know any better.
  69. self.max_stream_ordering = None
  70. if "data" not in pusherdict:
  71. raise PusherConfigException("No 'data' key for HTTP pusher")
  72. self.data = pusherdict["data"]
  73. self.name = "%s/%s/%s" % (
  74. pusherdict["user_name"],
  75. pusherdict["app_id"],
  76. pusherdict["pushkey"],
  77. )
  78. if self.data is None:
  79. raise PusherConfigException("data can not be null for HTTP pusher")
  80. if "url" not in self.data:
  81. raise PusherConfigException("'url' required in data for HTTP pusher")
  82. self.url = self.data["url"]
  83. self.http_client = hs.get_proxied_http_client()
  84. self.data_minus_url = {}
  85. self.data_minus_url.update(self.data)
  86. del self.data_minus_url["url"]
  87. def on_started(self, should_check_for_notifs):
  88. """Called when this pusher has been started.
  89. Args:
  90. should_check_for_notifs (bool): Whether we should immediately
  91. check for push to send. Set to False only if it's known there
  92. is nothing to send
  93. """
  94. if should_check_for_notifs:
  95. self._start_processing()
  96. def on_new_notifications(self, max_stream_ordering):
  97. self.max_stream_ordering = max(
  98. max_stream_ordering, self.max_stream_ordering or 0
  99. )
  100. self._start_processing()
  101. def on_new_receipts(self, min_stream_id, max_stream_id):
  102. # Note that the min here shouldn't be relied upon to be accurate.
  103. # We could check the receipts are actually m.read receipts here,
  104. # but currently that's the only type of receipt anyway...
  105. run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
  106. async def _update_badge(self):
  107. # XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
  108. # to be largely redundant. perhaps we can remove it.
  109. badge = await push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
  110. await self._send_badge(badge)
  111. def on_timer(self):
  112. self._start_processing()
  113. def on_stop(self):
  114. if self.timed_call:
  115. try:
  116. self.timed_call.cancel()
  117. except (AlreadyCalled, AlreadyCancelled):
  118. pass
  119. self.timed_call = None
  120. def _start_processing(self):
  121. if self._is_processing:
  122. return
  123. run_as_background_process("httppush.process", self._process)
  124. async def _process(self):
  125. # we should never get here if we are already processing
  126. assert not self._is_processing
  127. try:
  128. self._is_processing = True
  129. # if the max ordering changes while we're running _unsafe_process,
  130. # call it again, and so on until we've caught up.
  131. while True:
  132. starting_max_ordering = self.max_stream_ordering
  133. try:
  134. await self._unsafe_process()
  135. except Exception:
  136. logger.exception("Exception processing notifs")
  137. if self.max_stream_ordering == starting_max_ordering:
  138. break
  139. finally:
  140. self._is_processing = False
  141. async def _unsafe_process(self):
  142. """
  143. Looks for unset notifications and dispatch them, in order
  144. Never call this directly: use _process which will only allow this to
  145. run once per pusher.
  146. """
  147. fn = self.store.get_unread_push_actions_for_user_in_range_for_http
  148. unprocessed = await fn(
  149. self.user_id, self.last_stream_ordering, self.max_stream_ordering
  150. )
  151. logger.info(
  152. "Processing %i unprocessed push actions for %s starting at "
  153. "stream_ordering %s",
  154. len(unprocessed),
  155. self.name,
  156. self.last_stream_ordering,
  157. )
  158. for push_action in unprocessed:
  159. with opentracing.start_active_span(
  160. "http-push",
  161. tags={
  162. "authenticated_entity": self.user_id,
  163. "event_id": push_action["event_id"],
  164. "app_id": self.app_id,
  165. "app_display_name": self.app_display_name,
  166. },
  167. ):
  168. processed = await self._process_one(push_action)
  169. if processed:
  170. http_push_processed_counter.inc()
  171. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  172. self.last_stream_ordering = push_action["stream_ordering"]
  173. pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
  174. self.app_id,
  175. self.pushkey,
  176. self.user_id,
  177. self.last_stream_ordering,
  178. self.clock.time_msec(),
  179. )
  180. if not pusher_still_exists:
  181. # The pusher has been deleted while we were processing, so
  182. # lets just stop and return.
  183. self.on_stop()
  184. return
  185. if self.failing_since:
  186. self.failing_since = None
  187. await self.store.update_pusher_failing_since(
  188. self.app_id, self.pushkey, self.user_id, self.failing_since
  189. )
  190. else:
  191. http_push_failed_counter.inc()
  192. if not self.failing_since:
  193. self.failing_since = self.clock.time_msec()
  194. await self.store.update_pusher_failing_since(
  195. self.app_id, self.pushkey, self.user_id, self.failing_since
  196. )
  197. if (
  198. self.failing_since
  199. and self.failing_since
  200. < self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
  201. ):
  202. # we really only give up so that if the URL gets
  203. # fixed, we don't suddenly deliver a load
  204. # of old notifications.
  205. logger.warning(
  206. "Giving up on a notification to user %s, pushkey %s",
  207. self.user_id,
  208. self.pushkey,
  209. )
  210. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  211. self.last_stream_ordering = push_action["stream_ordering"]
  212. pusher_still_exists = await self.store.update_pusher_last_stream_ordering(
  213. self.app_id,
  214. self.pushkey,
  215. self.user_id,
  216. self.last_stream_ordering,
  217. )
  218. if not pusher_still_exists:
  219. # The pusher has been deleted while we were processing, so
  220. # lets just stop and return.
  221. self.on_stop()
  222. return
  223. self.failing_since = None
  224. await self.store.update_pusher_failing_since(
  225. self.app_id, self.pushkey, self.user_id, self.failing_since
  226. )
  227. else:
  228. logger.info("Push failed: delaying for %ds", self.backoff_delay)
  229. self.timed_call = self.hs.get_reactor().callLater(
  230. self.backoff_delay, self.on_timer
  231. )
  232. self.backoff_delay = min(
  233. self.backoff_delay * 2, self.MAX_BACKOFF_SEC
  234. )
  235. break
  236. async def _process_one(self, push_action):
  237. if "notify" not in push_action["actions"]:
  238. return True
  239. tweaks = push_rule_evaluator.tweaks_for_actions(push_action["actions"])
  240. badge = await push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
  241. event = await self.store.get_event(push_action["event_id"], allow_none=True)
  242. if event is None:
  243. return True # It's been redacted
  244. rejected = await self.dispatch_push(event, tweaks, badge)
  245. if rejected is False:
  246. return False
  247. if isinstance(rejected, list) or isinstance(rejected, tuple):
  248. for pk in rejected:
  249. if pk != self.pushkey:
  250. # for sanity, we only remove the pushkey if it
  251. # was the one we actually sent...
  252. logger.warning(
  253. ("Ignoring rejected pushkey %s because we didn't send it"), pk,
  254. )
  255. else:
  256. logger.info("Pushkey %s was rejected: removing", pk)
  257. await self.hs.remove_pusher(self.app_id, pk, self.user_id)
  258. return True
  259. async def _build_notification_dict(self, event, tweaks, badge):
  260. priority = "low"
  261. if (
  262. event.type == EventTypes.Encrypted
  263. or tweaks.get("highlight")
  264. or tweaks.get("sound")
  265. ):
  266. # HACK send our push as high priority only if it generates a sound, highlight
  267. # or may do so (i.e. is encrypted so has unknown effects).
  268. priority = "high"
  269. if self.data.get("format") == "event_id_only":
  270. d = {
  271. "notification": {
  272. "event_id": event.event_id,
  273. "room_id": event.room_id,
  274. "counts": {"unread": badge},
  275. "prio": priority,
  276. "devices": [
  277. {
  278. "app_id": self.app_id,
  279. "pushkey": self.pushkey,
  280. "pushkey_ts": int(self.pushkey_ts / 1000),
  281. "data": self.data_minus_url,
  282. }
  283. ],
  284. }
  285. }
  286. return d
  287. ctx = await push_tools.get_context_for_event(
  288. self.storage, self.state_handler, event, self.user_id
  289. )
  290. d = {
  291. "notification": {
  292. "id": event.event_id, # deprecated: remove soon
  293. "event_id": event.event_id,
  294. "room_id": event.room_id,
  295. "type": event.type,
  296. "sender": event.user_id,
  297. "prio": priority,
  298. "counts": {
  299. "unread": badge,
  300. # 'missed_calls': 2
  301. },
  302. "devices": [
  303. {
  304. "app_id": self.app_id,
  305. "pushkey": self.pushkey,
  306. "pushkey_ts": int(self.pushkey_ts / 1000),
  307. "data": self.data_minus_url,
  308. "tweaks": tweaks,
  309. }
  310. ],
  311. }
  312. }
  313. if event.type == "m.room.member" and event.is_state():
  314. d["notification"]["membership"] = event.content["membership"]
  315. d["notification"]["user_is_target"] = event.state_key == self.user_id
  316. if self.hs.config.push_include_content and event.content:
  317. d["notification"]["content"] = event.content
  318. # We no longer send aliases separately, instead, we send the human
  319. # readable name of the room, which may be an alias.
  320. if "sender_display_name" in ctx and len(ctx["sender_display_name"]) > 0:
  321. d["notification"]["sender_display_name"] = ctx["sender_display_name"]
  322. if "name" in ctx and len(ctx["name"]) > 0:
  323. d["notification"]["room_name"] = ctx["name"]
  324. return d
  325. async def dispatch_push(self, event, tweaks, badge):
  326. notification_dict = await self._build_notification_dict(event, tweaks, badge)
  327. if not notification_dict:
  328. return []
  329. try:
  330. resp = await self.http_client.post_json_get_json(
  331. self.url, notification_dict
  332. )
  333. except Exception as e:
  334. logger.warning(
  335. "Failed to push event %s to %s: %s %s",
  336. event.event_id,
  337. self.name,
  338. type(e),
  339. e,
  340. )
  341. return False
  342. rejected = []
  343. if "rejected" in resp:
  344. rejected = resp["rejected"]
  345. return rejected
  346. async def _send_badge(self, badge):
  347. """
  348. Args:
  349. badge (int): number of unread messages
  350. """
  351. logger.debug("Sending updated badge count %d to %s", badge, self.name)
  352. d = {
  353. "notification": {
  354. "id": "",
  355. "type": None,
  356. "sender": "",
  357. "counts": {"unread": badge},
  358. "devices": [
  359. {
  360. "app_id": self.app_id,
  361. "pushkey": self.pushkey,
  362. "pushkey_ts": int(self.pushkey_ts / 1000),
  363. "data": self.data_minus_url,
  364. }
  365. ],
  366. }
  367. }
  368. try:
  369. await self.http_client.post_json_get_json(self.url, d)
  370. http_badges_processed_counter.inc()
  371. except Exception as e:
  372. logger.warning(
  373. "Failed to send badge count to %s: %s %s", self.name, type(e), e
  374. )
  375. http_badges_failed_counter.inc()