123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401 |
- # -*- coding: utf-8 -*-
- # Copyright 2015, 2016 OpenMarket Ltd
- # Copyright 2017 New Vector Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- import six
- from prometheus_client import Counter
- from twisted.internet import defer
- from twisted.internet.error import AlreadyCalled, AlreadyCancelled
- from synapse.metrics.background_process_metrics import run_as_background_process
- from synapse.push import PusherConfigException
- from . import push_rule_evaluator, push_tools
- if six.PY3:
- long = int
- logger = logging.getLogger(__name__)
- http_push_processed_counter = Counter(
- "synapse_http_httppusher_http_pushes_processed",
- "Number of push notifications successfully sent",
- )
- http_push_failed_counter = Counter(
- "synapse_http_httppusher_http_pushes_failed",
- "Number of push notifications which failed",
- )
- http_badges_processed_counter = Counter(
- "synapse_http_httppusher_badge_updates_processed",
- "Number of badge updates successfully sent",
- )
- http_badges_failed_counter = Counter(
- "synapse_http_httppusher_badge_updates_failed",
- "Number of badge updates which failed",
- )
- class HttpPusher(object):
- INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
- MAX_BACKOFF_SEC = 60 * 60
- # This one's in ms because we compare it against the clock
- GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
- def __init__(self, hs, pusherdict):
- self.hs = hs
- self.store = self.hs.get_datastore()
- self.clock = self.hs.get_clock()
- self.state_handler = self.hs.get_state_handler()
- self.user_id = pusherdict['user_name']
- self.app_id = pusherdict['app_id']
- self.app_display_name = pusherdict['app_display_name']
- self.device_display_name = pusherdict['device_display_name']
- self.pushkey = pusherdict['pushkey']
- self.pushkey_ts = pusherdict['ts']
- self.data = pusherdict['data']
- self.last_stream_ordering = pusherdict['last_stream_ordering']
- self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
- self.failing_since = pusherdict['failing_since']
- self.timed_call = None
- self._is_processing = False
- # This is the highest stream ordering we know it's safe to process.
- # When new events arrive, we'll be given a window of new events: we
- # should honour this rather than just looking for anything higher
- # because of potential out-of-order event serialisation. This starts
- # off as None though as we don't know any better.
- self.max_stream_ordering = None
- if 'data' not in pusherdict:
- raise PusherConfigException(
- "No 'data' key for HTTP pusher"
- )
- self.data = pusherdict['data']
- self.name = "%s/%s/%s" % (
- pusherdict['user_name'],
- pusherdict['app_id'],
- pusherdict['pushkey'],
- )
- if self.data is None:
- raise PusherConfigException(
- "data can not be null for HTTP pusher"
- )
- if 'url' not in self.data:
- raise PusherConfigException(
- "'url' required in data for HTTP pusher"
- )
- self.url = self.data['url']
- self.http_client = hs.get_simple_http_client()
- self.data_minus_url = {}
- self.data_minus_url.update(self.data)
- del self.data_minus_url['url']
- def on_started(self):
- self._start_processing()
- def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
- self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
- self._start_processing()
- def on_new_receipts(self, min_stream_id, max_stream_id):
- # Note that the min here shouldn't be relied upon to be accurate.
- # We could check the receipts are actually m.read receipts here,
- # but currently that's the only type of receipt anyway...
- run_as_background_process("http_pusher.on_new_receipts", self._update_badge)
- @defer.inlineCallbacks
- def _update_badge(self):
- badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
- yield self._send_badge(badge)
- def on_timer(self):
- self._start_processing()
- def on_stop(self):
- if self.timed_call:
- try:
- self.timed_call.cancel()
- except (AlreadyCalled, AlreadyCancelled):
- pass
- self.timed_call = None
- def _start_processing(self):
- if self._is_processing:
- return
- run_as_background_process("httppush.process", self._process)
- @defer.inlineCallbacks
- def _process(self):
- # we should never get here if we are already processing
- assert not self._is_processing
- try:
- self._is_processing = True
- # if the max ordering changes while we're running _unsafe_process,
- # call it again, and so on until we've caught up.
- while True:
- starting_max_ordering = self.max_stream_ordering
- try:
- yield self._unsafe_process()
- except Exception:
- logger.exception("Exception processing notifs")
- if self.max_stream_ordering == starting_max_ordering:
- break
- finally:
- self._is_processing = False
- @defer.inlineCallbacks
- def _unsafe_process(self):
- """
- Looks for unset notifications and dispatch them, in order
- Never call this directly: use _process which will only allow this to
- run once per pusher.
- """
- fn = self.store.get_unread_push_actions_for_user_in_range_for_http
- unprocessed = yield fn(
- self.user_id, self.last_stream_ordering, self.max_stream_ordering
- )
- logger.info(
- "Processing %i unprocessed push actions for %s starting at "
- "stream_ordering %s",
- len(unprocessed), self.name, self.last_stream_ordering,
- )
- for push_action in unprocessed:
- processed = yield self._process_one(push_action)
- if processed:
- http_push_processed_counter.inc()
- self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
- self.last_stream_ordering = push_action['stream_ordering']
- yield self.store.update_pusher_last_stream_ordering_and_success(
- self.app_id, self.pushkey, self.user_id,
- self.last_stream_ordering,
- self.clock.time_msec()
- )
- if self.failing_since:
- self.failing_since = None
- yield self.store.update_pusher_failing_since(
- self.app_id, self.pushkey, self.user_id,
- self.failing_since
- )
- else:
- http_push_failed_counter.inc()
- if not self.failing_since:
- self.failing_since = self.clock.time_msec()
- yield self.store.update_pusher_failing_since(
- self.app_id, self.pushkey, self.user_id,
- self.failing_since
- )
- if (
- self.failing_since and
- self.failing_since <
- self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
- ):
- # we really only give up so that if the URL gets
- # fixed, we don't suddenly deliver a load
- # of old notifications.
- logger.warn("Giving up on a notification to user %s, "
- "pushkey %s",
- self.user_id, self.pushkey)
- self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
- self.last_stream_ordering = push_action['stream_ordering']
- yield self.store.update_pusher_last_stream_ordering(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_stream_ordering
- )
- self.failing_since = None
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since
- )
- else:
- logger.info("Push failed: delaying for %ds", self.backoff_delay)
- self.timed_call = self.hs.get_reactor().callLater(
- self.backoff_delay, self.on_timer
- )
- self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
- break
- @defer.inlineCallbacks
- def _process_one(self, push_action):
- if 'notify' not in push_action['actions']:
- defer.returnValue(True)
- tweaks = push_rule_evaluator.tweaks_for_actions(push_action['actions'])
- badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
- event = yield self.store.get_event(push_action['event_id'], allow_none=True)
- if event is None:
- defer.returnValue(True) # It's been redacted
- rejected = yield self.dispatch_push(event, tweaks, badge)
- if rejected is False:
- defer.returnValue(False)
- if isinstance(rejected, list) or isinstance(rejected, tuple):
- for pk in rejected:
- if pk != self.pushkey:
- # for sanity, we only remove the pushkey if it
- # was the one we actually sent...
- logger.warn(
- ("Ignoring rejected pushkey %s because we"
- " didn't send it"), pk
- )
- else:
- logger.info(
- "Pushkey %s was rejected: removing",
- pk
- )
- yield self.hs.remove_pusher(
- self.app_id, pk, self.user_id
- )
- defer.returnValue(True)
- @defer.inlineCallbacks
- def _build_notification_dict(self, event, tweaks, badge):
- if self.data.get('format') == 'event_id_only':
- d = {
- 'notification': {
- 'event_id': event.event_id,
- 'room_id': event.room_id,
- 'counts': {
- 'unread': badge,
- },
- 'devices': [
- {
- 'app_id': self.app_id,
- 'pushkey': self.pushkey,
- 'pushkey_ts': long(self.pushkey_ts / 1000),
- 'data': self.data_minus_url,
- }
- ]
- }
- }
- defer.returnValue(d)
- ctx = yield push_tools.get_context_for_event(
- self.store, self.state_handler, event, self.user_id
- )
- d = {
- 'notification': {
- 'id': event.event_id, # deprecated: remove soon
- 'event_id': event.event_id,
- 'room_id': event.room_id,
- 'type': event.type,
- 'sender': event.user_id,
- 'counts': { # -- we don't mark messages as read yet so
- # we have no way of knowing
- # Just set the badge to 1 until we have read receipts
- 'unread': badge,
- # 'missed_calls': 2
- },
- 'devices': [
- {
- 'app_id': self.app_id,
- 'pushkey': self.pushkey,
- 'pushkey_ts': long(self.pushkey_ts / 1000),
- 'data': self.data_minus_url,
- 'tweaks': tweaks
- }
- ]
- }
- }
- if event.type == 'm.room.member' and event.is_state():
- d['notification']['membership'] = event.content['membership']
- d['notification']['user_is_target'] = event.state_key == self.user_id
- if self.hs.config.push_include_content and event.content:
- d['notification']['content'] = event.content
- # We no longer send aliases separately, instead, we send the human
- # readable name of the room, which may be an alias.
- if 'sender_display_name' in ctx and len(ctx['sender_display_name']) > 0:
- d['notification']['sender_display_name'] = ctx['sender_display_name']
- if 'name' in ctx and len(ctx['name']) > 0:
- d['notification']['room_name'] = ctx['name']
- defer.returnValue(d)
- @defer.inlineCallbacks
- def dispatch_push(self, event, tweaks, badge):
- notification_dict = yield self._build_notification_dict(event, tweaks, badge)
- if not notification_dict:
- defer.returnValue([])
- try:
- resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
- except Exception as e:
- logger.warning(
- "Failed to push event %s to %s: %s %s",
- event.event_id, self.name, type(e), e,
- )
- defer.returnValue(False)
- rejected = []
- if 'rejected' in resp:
- rejected = resp['rejected']
- defer.returnValue(rejected)
- @defer.inlineCallbacks
- def _send_badge(self, badge):
- """
- Args:
- badge (int): number of unread messages
- """
- logger.info("Sending updated badge count %d to %s", badge, self.name)
- d = {
- 'notification': {
- 'id': '',
- 'type': None,
- 'sender': '',
- 'counts': {
- 'unread': badge
- },
- 'devices': [
- {
- 'app_id': self.app_id,
- 'pushkey': self.pushkey,
- 'pushkey_ts': long(self.pushkey_ts / 1000),
- 'data': self.data_minus_url,
- }
- ]
- }
- }
- try:
- yield self.http_client.post_json_get_json(self.url, d)
- http_badges_processed_counter.inc()
- except Exception as e:
- logger.warning(
- "Failed to send badge count to %s: %s %s",
- self.name, type(e), e,
- )
- http_badges_failed_counter.inc()
|