httppusher.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. # Copyright 2017 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from twisted.internet import defer, reactor
  18. from twisted.internet.error import AlreadyCalled, AlreadyCancelled
  19. from . import push_rule_evaluator
  20. from . import push_tools
  21. from synapse.push import PusherConfigException
  22. from synapse.util.logcontext import LoggingContext
  23. from synapse.util.metrics import Measure
  24. from prometheus_client import Counter
  25. logger = logging.getLogger(__name__)
  26. http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
  27. http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
  28. class HttpPusher(object):
  29. INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
  30. MAX_BACKOFF_SEC = 60 * 60
  31. # This one's in ms because we compare it against the clock
  32. GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
  33. def __init__(self, hs, pusherdict):
  34. self.hs = hs
  35. self.store = self.hs.get_datastore()
  36. self.clock = self.hs.get_clock()
  37. self.state_handler = self.hs.get_state_handler()
  38. self.user_id = pusherdict['user_name']
  39. self.app_id = pusherdict['app_id']
  40. self.app_display_name = pusherdict['app_display_name']
  41. self.device_display_name = pusherdict['device_display_name']
  42. self.pushkey = pusherdict['pushkey']
  43. self.pushkey_ts = pusherdict['ts']
  44. self.data = pusherdict['data']
  45. self.last_stream_ordering = pusherdict['last_stream_ordering']
  46. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  47. self.failing_since = pusherdict['failing_since']
  48. self.timed_call = None
  49. self.processing = False
  50. # This is the highest stream ordering we know it's safe to process.
  51. # When new events arrive, we'll be given a window of new events: we
  52. # should honour this rather than just looking for anything higher
  53. # because of potential out-of-order event serialisation. This starts
  54. # off as None though as we don't know any better.
  55. self.max_stream_ordering = None
  56. if 'data' not in pusherdict:
  57. raise PusherConfigException(
  58. "No 'data' key for HTTP pusher"
  59. )
  60. self.data = pusherdict['data']
  61. self.name = "%s/%s/%s" % (
  62. pusherdict['user_name'],
  63. pusherdict['app_id'],
  64. pusherdict['pushkey'],
  65. )
  66. if 'url' not in self.data:
  67. raise PusherConfigException(
  68. "'url' required in data for HTTP pusher"
  69. )
  70. self.url = self.data['url']
  71. self.http_client = hs.get_simple_http_client()
  72. self.data_minus_url = {}
  73. self.data_minus_url.update(self.data)
  74. del self.data_minus_url['url']
  75. @defer.inlineCallbacks
  76. def on_started(self):
  77. try:
  78. yield self._process()
  79. except Exception:
  80. logger.exception("Error starting http pusher")
  81. @defer.inlineCallbacks
  82. def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
  83. self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
  84. yield self._process()
  85. @defer.inlineCallbacks
  86. def on_new_receipts(self, min_stream_id, max_stream_id):
  87. # Note that the min here shouldn't be relied upon to be accurate.
  88. # We could check the receipts are actually m.read receipts here,
  89. # but currently that's the only type of receipt anyway...
  90. with LoggingContext("push.on_new_receipts"):
  91. with Measure(self.clock, "push.on_new_receipts"):
  92. badge = yield push_tools.get_badge_count(
  93. self.hs.get_datastore(), self.user_id
  94. )
  95. yield self._send_badge(badge)
  96. @defer.inlineCallbacks
  97. def on_timer(self):
  98. yield self._process()
  99. def on_stop(self):
  100. if self.timed_call:
  101. try:
  102. self.timed_call.cancel()
  103. except (AlreadyCalled, AlreadyCancelled):
  104. pass
  105. self.timed_call = None
  106. @defer.inlineCallbacks
  107. def _process(self):
  108. if self.processing:
  109. return
  110. with LoggingContext("push._process"):
  111. with Measure(self.clock, "push._process"):
  112. try:
  113. self.processing = True
  114. # if the max ordering changes while we're running _unsafe_process,
  115. # call it again, and so on until we've caught up.
  116. while True:
  117. starting_max_ordering = self.max_stream_ordering
  118. try:
  119. yield self._unsafe_process()
  120. except Exception:
  121. logger.exception("Exception processing notifs")
  122. if self.max_stream_ordering == starting_max_ordering:
  123. break
  124. finally:
  125. self.processing = False
  126. @defer.inlineCallbacks
  127. def _unsafe_process(self):
  128. """
  129. Looks for unset notifications and dispatch them, in order
  130. Never call this directly: use _process which will only allow this to
  131. run once per pusher.
  132. """
  133. fn = self.store.get_unread_push_actions_for_user_in_range_for_http
  134. unprocessed = yield fn(
  135. self.user_id, self.last_stream_ordering, self.max_stream_ordering
  136. )
  137. logger.info(
  138. "Processing %i unprocessed push actions for %s starting at "
  139. "stream_ordering %s",
  140. len(unprocessed), self.name, self.last_stream_ordering,
  141. )
  142. for push_action in unprocessed:
  143. processed = yield self._process_one(push_action)
  144. if processed:
  145. http_push_processed_counter.inc()
  146. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  147. self.last_stream_ordering = push_action['stream_ordering']
  148. yield self.store.update_pusher_last_stream_ordering_and_success(
  149. self.app_id, self.pushkey, self.user_id,
  150. self.last_stream_ordering,
  151. self.clock.time_msec()
  152. )
  153. if self.failing_since:
  154. self.failing_since = None
  155. yield self.store.update_pusher_failing_since(
  156. self.app_id, self.pushkey, self.user_id,
  157. self.failing_since
  158. )
  159. else:
  160. http_push_failed_counter.inc()
  161. if not self.failing_since:
  162. self.failing_since = self.clock.time_msec()
  163. yield self.store.update_pusher_failing_since(
  164. self.app_id, self.pushkey, self.user_id,
  165. self.failing_since
  166. )
  167. if (
  168. self.failing_since and
  169. self.failing_since <
  170. self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
  171. ):
  172. # we really only give up so that if the URL gets
  173. # fixed, we don't suddenly deliver a load
  174. # of old notifications.
  175. logger.warn("Giving up on a notification to user %s, "
  176. "pushkey %s",
  177. self.user_id, self.pushkey)
  178. self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
  179. self.last_stream_ordering = push_action['stream_ordering']
  180. yield self.store.update_pusher_last_stream_ordering(
  181. self.app_id,
  182. self.pushkey,
  183. self.user_id,
  184. self.last_stream_ordering
  185. )
  186. self.failing_since = None
  187. yield self.store.update_pusher_failing_since(
  188. self.app_id,
  189. self.pushkey,
  190. self.user_id,
  191. self.failing_since
  192. )
  193. else:
  194. logger.info("Push failed: delaying for %ds", self.backoff_delay)
  195. self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
  196. self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
  197. break
  198. @defer.inlineCallbacks
  199. def _process_one(self, push_action):
  200. if 'notify' not in push_action['actions']:
  201. defer.returnValue(True)
  202. tweaks = push_rule_evaluator.tweaks_for_actions(push_action['actions'])
  203. badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
  204. event = yield self.store.get_event(push_action['event_id'], allow_none=True)
  205. if event is None:
  206. defer.returnValue(True) # It's been redacted
  207. rejected = yield self.dispatch_push(event, tweaks, badge)
  208. if rejected is False:
  209. defer.returnValue(False)
  210. if isinstance(rejected, list) or isinstance(rejected, tuple):
  211. for pk in rejected:
  212. if pk != self.pushkey:
  213. # for sanity, we only remove the pushkey if it
  214. # was the one we actually sent...
  215. logger.warn(
  216. ("Ignoring rejected pushkey %s because we"
  217. " didn't send it"), pk
  218. )
  219. else:
  220. logger.info(
  221. "Pushkey %s was rejected: removing",
  222. pk
  223. )
  224. yield self.hs.remove_pusher(
  225. self.app_id, pk, self.user_id
  226. )
  227. defer.returnValue(True)
  228. @defer.inlineCallbacks
  229. def _build_notification_dict(self, event, tweaks, badge):
  230. if self.data.get('format') == 'event_id_only':
  231. d = {
  232. 'notification': {
  233. 'event_id': event.event_id,
  234. 'room_id': event.room_id,
  235. 'counts': {
  236. 'unread': badge,
  237. },
  238. 'devices': [
  239. {
  240. 'app_id': self.app_id,
  241. 'pushkey': self.pushkey,
  242. 'pushkey_ts': long(self.pushkey_ts / 1000),
  243. 'data': self.data_minus_url,
  244. }
  245. ]
  246. }
  247. }
  248. defer.returnValue(d)
  249. ctx = yield push_tools.get_context_for_event(
  250. self.store, self.state_handler, event, self.user_id
  251. )
  252. d = {
  253. 'notification': {
  254. 'id': event.event_id, # deprecated: remove soon
  255. 'event_id': event.event_id,
  256. 'room_id': event.room_id,
  257. 'type': event.type,
  258. 'sender': event.user_id,
  259. 'counts': { # -- we don't mark messages as read yet so
  260. # we have no way of knowing
  261. # Just set the badge to 1 until we have read receipts
  262. 'unread': badge,
  263. # 'missed_calls': 2
  264. },
  265. 'devices': [
  266. {
  267. 'app_id': self.app_id,
  268. 'pushkey': self.pushkey,
  269. 'pushkey_ts': long(self.pushkey_ts / 1000),
  270. 'data': self.data_minus_url,
  271. 'tweaks': tweaks
  272. }
  273. ]
  274. }
  275. }
  276. if event.type == 'm.room.member':
  277. d['notification']['membership'] = event.content['membership']
  278. d['notification']['user_is_target'] = event.state_key == self.user_id
  279. if self.hs.config.push_include_content and 'content' in event:
  280. d['notification']['content'] = event.content
  281. # We no longer send aliases separately, instead, we send the human
  282. # readable name of the room, which may be an alias.
  283. if 'sender_display_name' in ctx and len(ctx['sender_display_name']) > 0:
  284. d['notification']['sender_display_name'] = ctx['sender_display_name']
  285. if 'name' in ctx and len(ctx['name']) > 0:
  286. d['notification']['room_name'] = ctx['name']
  287. defer.returnValue(d)
  288. @defer.inlineCallbacks
  289. def dispatch_push(self, event, tweaks, badge):
  290. notification_dict = yield self._build_notification_dict(event, tweaks, badge)
  291. if not notification_dict:
  292. defer.returnValue([])
  293. try:
  294. resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
  295. except Exception:
  296. logger.warn(
  297. "Failed to push event %s to %s",
  298. event.event_id, self.name, exc_info=True,
  299. )
  300. defer.returnValue(False)
  301. rejected = []
  302. if 'rejected' in resp:
  303. rejected = resp['rejected']
  304. defer.returnValue(rejected)
  305. @defer.inlineCallbacks
  306. def _send_badge(self, badge):
  307. logger.info("Sending updated badge count %d to %s", badge, self.name)
  308. d = {
  309. 'notification': {
  310. 'id': '',
  311. 'type': None,
  312. 'sender': '',
  313. 'counts': {
  314. 'unread': badge
  315. },
  316. 'devices': [
  317. {
  318. 'app_id': self.app_id,
  319. 'pushkey': self.pushkey,
  320. 'pushkey_ts': long(self.pushkey_ts / 1000),
  321. 'data': self.data_minus_url,
  322. }
  323. ]
  324. }
  325. }
  326. try:
  327. resp = yield self.http_client.post_json_get_json(self.url, d)
  328. except Exception:
  329. logger.warn(
  330. "Failed to send badge count to %s",
  331. self.name, exc_info=True,
  332. )
  333. defer.returnValue(False)
  334. rejected = []
  335. if 'rejected' in resp:
  336. rejected = resp['rejected']
  337. defer.returnValue(rejected)