httppusher.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. # Copyright 2017 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. import six
  18. from prometheus_client import Counter
  19. from twisted.internet import defer
  20. from twisted.internet.error import AlreadyCalled, AlreadyCancelled
  21. from synapse.metrics.background_process_metrics import run_as_background_process
  22. from synapse.push import PusherConfigException
  23. from . import push_rule_evaluator, push_tools
  24. if six.PY3:
  25. long = int
  26. logger = logging.getLogger(__name__)
  27. http_push_processed_counter = Counter(
  28. "synapse_http_httppusher_http_pushes_processed",
  29. "Number of push notifications successfully sent",
  30. )
  31. http_push_failed_counter = Counter(
  32. "synapse_http_httppusher_http_pushes_failed",
  33. "Number of push notifications which failed",
  34. )
  35. http_badges_processed_counter = Counter(
  36. "synapse_http_httppusher_badge_updates_processed",
  37. "Number of badge updates successfully sent",
  38. )
  39. http_badges_failed_counter = Counter(
  40. "synapse_http_httppusher_badge_updates_failed",
  41. "Number of badge updates which failed",
  42. )
  43. class HttpPusher(object):
  44. INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
  45. MAX_BACKOFF_SEC = 60 * 60
  46. # This one's in ms because we compare it against the clock
  47. GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
  48. def __init__(self, hs, pusherdict):
  49. self.hs = hs
  50. self.store = self.hs.get_datastore()
  51. self.clock = self.hs.get_clock()
  52. self.state_handler = self.hs.get_state_handler()
  53. self.user_id = pusherdict['user_name']
  54. self.app_id = pusherdict['app_id']
  55. self.app_display_name = pusherdict['app_display_name']
  56. self.device_display_name = pusherdict['device_display_name']
  57. self.pushkey = pusherdict['pushkey']
  58. self.pushkey_ts = pusherdict['ts']
  59. self.data = pusherdict['data']
  60. self.last_stream_ordering = pusherdict['last_stream_ordering']
  61. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  62. self.failing_since = pusherdict['failing_since']
  63. self.timed_call = None
  64. self._is_processing = False
  65. # This is the highest stream ordering we know it's safe to process.
  66. # When new events arrive, we'll be given a window of new events: we
  67. # should honour this rather than just looking for anything higher
  68. # because of potential out-of-order event serialisation. This starts
  69. # off as None though as we don't know any better.
  70. self.max_stream_ordering = None
  71. if 'data' not in pusherdict:
  72. raise PusherConfigException(
  73. "No 'data' key for HTTP pusher"
  74. )
  75. self.data = pusherdict['data']
  76. self.name = "%s/%s/%s" % (
  77. pusherdict['user_name'],
  78. pusherdict['app_id'],
  79. pusherdict['pushkey'],
  80. )
  81. if self.data is None:
  82. raise PusherConfigException(
  83. "data can not be null for HTTP pusher"
  84. )
  85. if 'url' not in self.data:
  86. raise PusherConfigException(
  87. "'url' required in data for HTTP pusher"
  88. )
  89. self.url = self.data['url']
  90. self.http_client = hs.get_simple_http_client()
  91. self.data_minus_url = {}
  92. self.data_minus_url.update(self.data)
  93. del self.data_minus_url['url']
  94. def on_started(self):
  95. self._start_processing()
  96. def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
  97. self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
  98. self._start_processing()
  99. def on_new_receipts(self, min_stream_id, max_stream_id):
  100. # Note that the min here shouldn't be relied upon to be accurate.
  101. # We could check the receipts are actually m.read receipts here,
  102. # but currently that's the only type of receipt anyway...
  103. run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
  104. @defer.inlineCallbacks
  105. def _update_badge(self):
  106. badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
  107. yield self._send_badge(badge)
  108. def on_timer(self):
  109. self._start_processing()
  110. def on_stop(self):
  111. if self.timed_call:
  112. try:
  113. self.timed_call.cancel()
  114. except (AlreadyCalled, AlreadyCancelled):
  115. pass
  116. self.timed_call = None
  117. def _start_processing(self):
  118. if self._is_processing:
  119. return
  120. run_as_background_process("httppush.process", self._process)
  121. @defer.inlineCallbacks
  122. def _process(self):
  123. # we should never get here if we are already processing
  124. assert not self._is_processing
  125. try:
  126. self._is_processing = True
  127. # if the max ordering changes while we're running _unsafe_process,
  128. # call it again, and so on until we've caught up.
  129. while True:
  130. starting_max_ordering = self.max_stream_ordering
  131. try:
  132. yield self._unsafe_process()
  133. except Exception:
  134. logger.exception("Exception processing notifs")
  135. if self.max_stream_ordering == starting_max_ordering:
  136. break
  137. finally:
  138. self._is_processing = False
  139. @defer.inlineCallbacks
  140. def _unsafe_process(self):
  141. """
  142. Looks for unset notifications and dispatch them, in order
  143. Never call this directly: use _process which will only allow this to
  144. run once per pusher.
  145. """
  146. fn = self.store.get_unread_push_actions_for_user_in_range_for_http
  147. unprocessed = yield fn(
  148. self.user_id, self.last_stream_ordering, self.max_stream_ordering
  149. )
  150. logger.info(
  151. "Processing %i unprocessed push actions for %s starting at "
  152. "stream_ordering %s",
  153. len(unprocessed), self.name, self.last_stream_ordering,
  154. )
  155. for push_action in unprocessed:
  156. processed = yield self._process_one(push_action)
  157. if processed:
  158. http_push_processed_counter.inc()
  159. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  160. self.last_stream_ordering = push_action['stream_ordering']
  161. yield self.store.update_pusher_last_stream_ordering_and_success(
  162. self.app_id, self.pushkey, self.user_id,
  163. self.last_stream_ordering,
  164. self.clock.time_msec()
  165. )
  166. if self.failing_since:
  167. self.failing_since = None
  168. yield self.store.update_pusher_failing_since(
  169. self.app_id, self.pushkey, self.user_id,
  170. self.failing_since
  171. )
  172. else:
  173. http_push_failed_counter.inc()
  174. if not self.failing_since:
  175. self.failing_since = self.clock.time_msec()
  176. yield self.store.update_pusher_failing_since(
  177. self.app_id, self.pushkey, self.user_id,
  178. self.failing_since
  179. )
  180. if (
  181. self.failing_since and
  182. self.failing_since <
  183. self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
  184. ):
  185. # we really only give up so that if the URL gets
  186. # fixed, we don't suddenly deliver a load
  187. # of old notifications.
  188. logger.warn("Giving up on a notification to user %s, "
  189. "pushkey %s",
  190. self.user_id, self.pushkey)
  191. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  192. self.last_stream_ordering = push_action['stream_ordering']
  193. yield self.store.update_pusher_last_stream_ordering(
  194. self.app_id,
  195. self.pushkey,
  196. self.user_id,
  197. self.last_stream_ordering
  198. )
  199. self.failing_since = None
  200. yield self.store.update_pusher_failing_since(
  201. self.app_id,
  202. self.pushkey,
  203. self.user_id,
  204. self.failing_since
  205. )
  206. else:
  207. logger.info("Push failed: delaying for %ds", self.backoff_delay)
  208. self.timed_call = self.hs.get_reactor().callLater(
  209. self.backoff_delay, self.on_timer
  210. )
  211. self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
  212. break
  213. @defer.inlineCallbacks
  214. def _process_one(self, push_action):
  215. if 'notify' not in push_action['actions']:
  216. defer.returnValue(True)
  217. tweaks = push_rule_evaluator.tweaks_for_actions(push_action['actions'])
  218. badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
  219. event = yield self.store.get_event(push_action['event_id'], allow_none=True)
  220. if event is None:
  221. defer.returnValue(True) # It's been redacted
  222. rejected = yield self.dispatch_push(event, tweaks, badge)
  223. if rejected is False:
  224. defer.returnValue(False)
  225. if isinstance(rejected, list) or isinstance(rejected, tuple):
  226. for pk in rejected:
  227. if pk != self.pushkey:
  228. # for sanity, we only remove the pushkey if it
  229. # was the one we actually sent...
  230. logger.warn(
  231. ("Ignoring rejected pushkey %s because we"
  232. " didn't send it"), pk
  233. )
  234. else:
  235. logger.info(
  236. "Pushkey %s was rejected: removing",
  237. pk
  238. )
  239. yield self.hs.remove_pusher(
  240. self.app_id, pk, self.user_id
  241. )
  242. defer.returnValue(True)
  243. @defer.inlineCallbacks
  244. def _build_notification_dict(self, event, tweaks, badge):
  245. if self.data.get('format') == 'event_id_only':
  246. d = {
  247. 'notification': {
  248. 'event_id': event.event_id,
  249. 'room_id': event.room_id,
  250. 'counts': {
  251. 'unread': badge,
  252. },
  253. 'devices': [
  254. {
  255. 'app_id': self.app_id,
  256. 'pushkey': self.pushkey,
  257. 'pushkey_ts': long(self.pushkey_ts / 1000),
  258. 'data': self.data_minus_url,
  259. }
  260. ]
  261. }
  262. }
  263. defer.returnValue(d)
  264. ctx = yield push_tools.get_context_for_event(
  265. self.store, self.state_handler, event, self.user_id
  266. )
  267. d = {
  268. 'notification': {
  269. 'id': event.event_id, # deprecated: remove soon
  270. 'event_id': event.event_id,
  271. 'room_id': event.room_id,
  272. 'type': event.type,
  273. 'sender': event.user_id,
  274. 'counts': { # -- we don't mark messages as read yet so
  275. # we have no way of knowing
  276. # Just set the badge to 1 until we have read receipts
  277. 'unread': badge,
  278. # 'missed_calls': 2
  279. },
  280. 'devices': [
  281. {
  282. 'app_id': self.app_id,
  283. 'pushkey': self.pushkey,
  284. 'pushkey_ts': long(self.pushkey_ts / 1000),
  285. 'data': self.data_minus_url,
  286. 'tweaks': tweaks
  287. }
  288. ]
  289. }
  290. }
  291. if event.type == 'm.room.member' and event.is_state():
  292. d['notification']['membership'] = event.content['membership']
  293. d['notification']['user_is_target'] = event.state_key == self.user_id
  294. if self.hs.config.push_include_content and event.content:
  295. d['notification']['content'] = event.content
  296. # We no longer send aliases separately, instead, we send the human
  297. # readable name of the room, which may be an alias.
  298. if 'sender_display_name' in ctx and len(ctx['sender_display_name']) > 0:
  299. d['notification']['sender_display_name'] = ctx['sender_display_name']
  300. if 'name' in ctx and len(ctx['name']) > 0:
  301. d['notification']['room_name'] = ctx['name']
  302. defer.returnValue(d)
  303. @defer.inlineCallbacks
  304. def dispatch_push(self, event, tweaks, badge):
  305. notification_dict = yield self._build_notification_dict(event, tweaks, badge)
  306. if not notification_dict:
  307. defer.returnValue([])
  308. try:
  309. resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
  310. except Exception as e:
  311. logger.warning(
  312. "Failed to push event %s to %s: %s %s",
  313. event.event_id, self.name, type(e), e,
  314. )
  315. defer.returnValue(False)
  316. rejected = []
  317. if 'rejected' in resp:
  318. rejected = resp['rejected']
  319. defer.returnValue(rejected)
  320. @defer.inlineCallbacks
  321. def _send_badge(self, badge):
  322. """
  323. Args:
  324. badge (int): number of unread messages
  325. """
  326. logger.info("Sending updated badge count %d to %s", badge, self.name)
  327. d = {
  328. 'notification': {
  329. 'id': '',
  330. 'type': None,
  331. 'sender': '',
  332. 'counts': {
  333. 'unread': badge
  334. },
  335. 'devices': [
  336. {
  337. 'app_id': self.app_id,
  338. 'pushkey': self.pushkey,
  339. 'pushkey_ts': long(self.pushkey_ts / 1000),
  340. 'data': self.data_minus_url,
  341. }
  342. ]
  343. }
  344. }
  345. try:
  346. yield self.http_client.post_json_get_json(self.url, d)
  347. http_badges_processed_counter.inc()
  348. except Exception as e:
  349. logger.warning(
  350. "Failed to send badge count to %s: %s %s",
  351. self.name, type(e), e,
  352. )
  353. http_badges_failed_counter.inc()