emailpusher.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer, reactor
  16. from twisted.internet.error import AlreadyCalled, AlreadyCancelled
  17. import logging
  18. from synapse.util.metrics import Measure
  19. from synapse.util.logcontext import LoggingContext
  20. from mailer import Mailer
  21. logger = logging.getLogger(__name__)
  22. # The amount of time we always wait before ever emailing about a notification
  23. # (to give the user a chance to respond to other push or notice the window)
  24. DELAY_BEFORE_MAIL_MS = 10 * 60 * 1000
  25. # THROTTLE is the minimum time between mail notifications sent for a given room.
  26. # Each room maintains its own throttle counter, but each new mail notification
  27. # sends the pending notifications for all rooms.
  28. THROTTLE_START_MS = 10 * 60 * 1000
  29. THROTTLE_MAX_MS = 24 * 60 * 60 * 1000 # 24h
  30. # THROTTLE_MULTIPLIER = 6 # 10 mins, 1 hour, 6 hours, 24 hours
  31. THROTTLE_MULTIPLIER = 144 # 10 mins, 24 hours - i.e. jump straight to 1 day
  32. # If no event triggers a notification for this long after the previous,
  33. # the throttle is released.
  34. # 12 hours - a gap of 12 hours in conversation is surely enough to merit a new
  35. # notification when things get going again...
  36. THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000)
  37. # does each email include all unread notifs, or just the ones which have happened
  38. # since the last mail?
  39. # XXX: this is currently broken as it includes ones from parted rooms(!)
  40. INCLUDE_ALL_UNREAD_NOTIFS = False
  41. class EmailPusher(object):
  42. """
  43. A pusher that sends email notifications about events (approximately)
  44. when they happen.
  45. This shares quite a bit of code with httpusher: it would be good to
  46. factor out the common parts
  47. """
  48. def __init__(self, hs, pusherdict):
  49. self.hs = hs
  50. self.store = self.hs.get_datastore()
  51. self.clock = self.hs.get_clock()
  52. self.pusher_id = pusherdict['id']
  53. self.user_id = pusherdict['user_name']
  54. self.app_id = pusherdict['app_id']
  55. self.email = pusherdict['pushkey']
  56. self.last_stream_ordering = pusherdict['last_stream_ordering']
  57. self.timed_call = None
  58. self.throttle_params = None
  59. # See httppusher
  60. self.max_stream_ordering = None
  61. self.processing = False
  62. if self.hs.config.email_enable_notifs:
  63. if 'data' in pusherdict and 'brand' in pusherdict['data']:
  64. app_name = pusherdict['data']['brand']
  65. else:
  66. app_name = self.hs.config.email_app_name
  67. self.mailer = Mailer(self.hs, app_name)
  68. else:
  69. self.mailer = None
  70. @defer.inlineCallbacks
  71. def on_started(self):
  72. if self.mailer is not None:
  73. self.throttle_params = yield self.store.get_throttle_params_by_room(
  74. self.pusher_id
  75. )
  76. yield self._process()
  77. def on_stop(self):
  78. if self.timed_call:
  79. try:
  80. self.timed_call.cancel()
  81. except (AlreadyCalled, AlreadyCancelled):
  82. pass
  83. self.timed_call = None
  84. @defer.inlineCallbacks
  85. def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
  86. self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
  87. yield self._process()
  88. def on_new_receipts(self, min_stream_id, max_stream_id):
  89. # We could wake up and cancel the timer but there tend to be quite a
  90. # lot of read receipts so it's probably less work to just let the
  91. # timer fire
  92. return defer.succeed(None)
  93. @defer.inlineCallbacks
  94. def on_timer(self):
  95. self.timed_call = None
  96. yield self._process()
  97. @defer.inlineCallbacks
  98. def _process(self):
  99. if self.processing:
  100. return
  101. with LoggingContext("emailpush._process"):
  102. with Measure(self.clock, "emailpush._process"):
  103. try:
  104. self.processing = True
  105. # if the max ordering changes while we're running _unsafe_process,
  106. # call it again, and so on until we've caught up.
  107. while True:
  108. starting_max_ordering = self.max_stream_ordering
  109. try:
  110. yield self._unsafe_process()
  111. except:
  112. logger.exception("Exception processing notifs")
  113. if self.max_stream_ordering == starting_max_ordering:
  114. break
  115. finally:
  116. self.processing = False
  117. @defer.inlineCallbacks
  118. def _unsafe_process(self):
  119. """
  120. Main logic of the push loop without the wrapper function that sets
  121. up logging, measures and guards against multiple instances of it
  122. being run.
  123. """
  124. start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
  125. fn = self.store.get_unread_push_actions_for_user_in_range_for_email
  126. unprocessed = yield fn(self.user_id, start, self.max_stream_ordering)
  127. soonest_due_at = None
  128. if not unprocessed:
  129. yield self.save_last_stream_ordering_and_success(self.max_stream_ordering)
  130. return
  131. for push_action in unprocessed:
  132. received_at = push_action['received_ts']
  133. if received_at is None:
  134. received_at = 0
  135. notif_ready_at = received_at + DELAY_BEFORE_MAIL_MS
  136. room_ready_at = self.room_ready_to_notify_at(
  137. push_action['room_id']
  138. )
  139. should_notify_at = max(notif_ready_at, room_ready_at)
  140. if should_notify_at < self.clock.time_msec():
  141. # one of our notifications is ready for sending, so we send
  142. # *one* email updating the user on their notifications,
  143. # we then consider all previously outstanding notifications
  144. # to be delivered.
  145. reason = {
  146. 'room_id': push_action['room_id'],
  147. 'now': self.clock.time_msec(),
  148. 'received_at': received_at,
  149. 'delay_before_mail_ms': DELAY_BEFORE_MAIL_MS,
  150. 'last_sent_ts': self.get_room_last_sent_ts(push_action['room_id']),
  151. 'throttle_ms': self.get_room_throttle_ms(push_action['room_id']),
  152. }
  153. yield self.send_notification(unprocessed, reason)
  154. yield self.save_last_stream_ordering_and_success(max([
  155. ea['stream_ordering'] for ea in unprocessed
  156. ]))
  157. # we update the throttle on all the possible unprocessed push actions
  158. for ea in unprocessed:
  159. yield self.sent_notif_update_throttle(
  160. ea['room_id'], ea
  161. )
  162. break
  163. else:
  164. if soonest_due_at is None or should_notify_at < soonest_due_at:
  165. soonest_due_at = should_notify_at
  166. if self.timed_call is not None:
  167. try:
  168. self.timed_call.cancel()
  169. except (AlreadyCalled, AlreadyCancelled):
  170. pass
  171. self.timed_call = None
  172. if soonest_due_at is not None:
  173. self.timed_call = reactor.callLater(
  174. self.seconds_until(soonest_due_at), self.on_timer
  175. )
  176. @defer.inlineCallbacks
  177. def save_last_stream_ordering_and_success(self, last_stream_ordering):
  178. self.last_stream_ordering = last_stream_ordering
  179. yield self.store.update_pusher_last_stream_ordering_and_success(
  180. self.app_id, self.email, self.user_id,
  181. last_stream_ordering, self.clock.time_msec()
  182. )
  183. def seconds_until(self, ts_msec):
  184. return (ts_msec - self.clock.time_msec()) / 1000
  185. def get_room_throttle_ms(self, room_id):
  186. if room_id in self.throttle_params:
  187. return self.throttle_params[room_id]["throttle_ms"]
  188. else:
  189. return 0
  190. def get_room_last_sent_ts(self, room_id):
  191. if room_id in self.throttle_params:
  192. return self.throttle_params[room_id]["last_sent_ts"]
  193. else:
  194. return 0
  195. def room_ready_to_notify_at(self, room_id):
  196. """
  197. Determines whether throttling should prevent us from sending an email
  198. for the given room
  199. Returns: The timestamp when we are next allowed to send an email notif
  200. for this room
  201. """
  202. last_sent_ts = self.get_room_last_sent_ts(room_id)
  203. throttle_ms = self.get_room_throttle_ms(room_id)
  204. may_send_at = last_sent_ts + throttle_ms
  205. return may_send_at
  206. @defer.inlineCallbacks
  207. def sent_notif_update_throttle(self, room_id, notified_push_action):
  208. # We have sent a notification, so update the throttle accordingly.
  209. # If the event that triggered the notif happened more than
  210. # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
  211. # notif, we release the throttle. Otherwise, the throttle is increased.
  212. time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
  213. notified_push_action['stream_ordering']
  214. )
  215. time_of_this_notifs = notified_push_action['received_ts']
  216. if time_of_previous_notifs is not None and time_of_this_notifs is not None:
  217. gap = time_of_this_notifs - time_of_previous_notifs
  218. else:
  219. # if we don't know the arrival time of one of the notifs (it was not
  220. # stored prior to email notification code) then assume a gap of
  221. # zero which will just not reset the throttle
  222. gap = 0
  223. current_throttle_ms = self.get_room_throttle_ms(room_id)
  224. if gap > THROTTLE_RESET_AFTER_MS:
  225. new_throttle_ms = THROTTLE_START_MS
  226. else:
  227. if current_throttle_ms == 0:
  228. new_throttle_ms = THROTTLE_START_MS
  229. else:
  230. new_throttle_ms = min(
  231. current_throttle_ms * THROTTLE_MULTIPLIER,
  232. THROTTLE_MAX_MS
  233. )
  234. self.throttle_params[room_id] = {
  235. "last_sent_ts": self.clock.time_msec(),
  236. "throttle_ms": new_throttle_ms
  237. }
  238. yield self.store.set_throttle_params(
  239. self.pusher_id, room_id, self.throttle_params[room_id]
  240. )
  241. @defer.inlineCallbacks
  242. def send_notification(self, push_actions, reason):
  243. logger.info("Sending notif email for user %r", self.user_id)
  244. yield self.mailer.send_notification_mail(
  245. self.app_id, self.user_id, self.email, push_actions, reason
  246. )