emailpusher.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer
  16. from twisted.internet.error import AlreadyCalled, AlreadyCancelled
  17. import logging
  18. from synapse.util.metrics import Measure
  19. from synapse.util.logcontext import LoggingContext
  20. logger = logging.getLogger(__name__)
  21. # The amount of time we always wait before ever emailing about a notification
  22. # (to give the user a chance to respond to other push or notice the window)
  23. DELAY_BEFORE_MAIL_MS = 10 * 60 * 1000
  24. # THROTTLE is the minimum time between mail notifications sent for a given room.
  25. # Each room maintains its own throttle counter, but each new mail notification
  26. # sends the pending notifications for all rooms.
  27. THROTTLE_START_MS = 10 * 60 * 1000
  28. THROTTLE_MAX_MS = 24 * 60 * 60 * 1000 # 24h
  29. # THROTTLE_MULTIPLIER = 6 # 10 mins, 1 hour, 6 hours, 24 hours
  30. THROTTLE_MULTIPLIER = 144 # 10 mins, 24 hours - i.e. jump straight to 1 day
  31. # If no event triggers a notification for this long after the previous,
  32. # the throttle is released.
  33. # 12 hours - a gap of 12 hours in conversation is surely enough to merit a new
  34. # notification when things get going again...
  35. THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000)
  36. # does each email include all unread notifs, or just the ones which have happened
  37. # since the last mail?
  38. # XXX: this is currently broken as it includes ones from parted rooms(!)
  39. INCLUDE_ALL_UNREAD_NOTIFS = False
  40. class EmailPusher(object):
  41. """
  42. A pusher that sends email notifications about events (approximately)
  43. when they happen.
  44. This shares quite a bit of code with httpusher: it would be good to
  45. factor out the common parts
  46. """
  47. def __init__(self, hs, pusherdict, mailer):
  48. self.hs = hs
  49. self.mailer = mailer
  50. self.store = self.hs.get_datastore()
  51. self.clock = self.hs.get_clock()
  52. self.pusher_id = pusherdict['id']
  53. self.user_id = pusherdict['user_name']
  54. self.app_id = pusherdict['app_id']
  55. self.email = pusherdict['pushkey']
  56. self.last_stream_ordering = pusherdict['last_stream_ordering']
  57. self.timed_call = None
  58. self.throttle_params = None
  59. # See httppusher
  60. self.max_stream_ordering = None
  61. self.processing = False
  62. @defer.inlineCallbacks
  63. def on_started(self):
  64. if self.mailer is not None:
  65. try:
  66. self.throttle_params = yield self.store.get_throttle_params_by_room(
  67. self.pusher_id
  68. )
  69. yield self._process()
  70. except Exception:
  71. logger.exception("Error starting email pusher")
  72. def on_stop(self):
  73. if self.timed_call:
  74. try:
  75. self.timed_call.cancel()
  76. except (AlreadyCalled, AlreadyCancelled):
  77. pass
  78. self.timed_call = None
  79. @defer.inlineCallbacks
  80. def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
  81. self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
  82. yield self._process()
  83. def on_new_receipts(self, min_stream_id, max_stream_id):
  84. # We could wake up and cancel the timer but there tend to be quite a
  85. # lot of read receipts so it's probably less work to just let the
  86. # timer fire
  87. return defer.succeed(None)
  88. @defer.inlineCallbacks
  89. def on_timer(self):
  90. self.timed_call = None
  91. yield self._process()
  92. @defer.inlineCallbacks
  93. def _process(self):
  94. if self.processing:
  95. return
  96. with LoggingContext("emailpush._process"):
  97. with Measure(self.clock, "emailpush._process"):
  98. try:
  99. self.processing = True
  100. # if the max ordering changes while we're running _unsafe_process,
  101. # call it again, and so on until we've caught up.
  102. while True:
  103. starting_max_ordering = self.max_stream_ordering
  104. try:
  105. yield self._unsafe_process()
  106. except Exception:
  107. logger.exception("Exception processing notifs")
  108. if self.max_stream_ordering == starting_max_ordering:
  109. break
  110. finally:
  111. self.processing = False
  112. @defer.inlineCallbacks
  113. def _unsafe_process(self):
  114. """
  115. Main logic of the push loop without the wrapper function that sets
  116. up logging, measures and guards against multiple instances of it
  117. being run.
  118. """
  119. start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
  120. fn = self.store.get_unread_push_actions_for_user_in_range_for_email
  121. unprocessed = yield fn(self.user_id, start, self.max_stream_ordering)
  122. soonest_due_at = None
  123. if not unprocessed:
  124. yield self.save_last_stream_ordering_and_success(self.max_stream_ordering)
  125. return
  126. for push_action in unprocessed:
  127. received_at = push_action['received_ts']
  128. if received_at is None:
  129. received_at = 0
  130. notif_ready_at = received_at + DELAY_BEFORE_MAIL_MS
  131. room_ready_at = self.room_ready_to_notify_at(
  132. push_action['room_id']
  133. )
  134. should_notify_at = max(notif_ready_at, room_ready_at)
  135. if should_notify_at < self.clock.time_msec():
  136. # one of our notifications is ready for sending, so we send
  137. # *one* email updating the user on their notifications,
  138. # we then consider all previously outstanding notifications
  139. # to be delivered.
  140. reason = {
  141. 'room_id': push_action['room_id'],
  142. 'now': self.clock.time_msec(),
  143. 'received_at': received_at,
  144. 'delay_before_mail_ms': DELAY_BEFORE_MAIL_MS,
  145. 'last_sent_ts': self.get_room_last_sent_ts(push_action['room_id']),
  146. 'throttle_ms': self.get_room_throttle_ms(push_action['room_id']),
  147. }
  148. yield self.send_notification(unprocessed, reason)
  149. yield self.save_last_stream_ordering_and_success(max([
  150. ea['stream_ordering'] for ea in unprocessed
  151. ]))
  152. # we update the throttle on all the possible unprocessed push actions
  153. for ea in unprocessed:
  154. yield self.sent_notif_update_throttle(
  155. ea['room_id'], ea
  156. )
  157. break
  158. else:
  159. if soonest_due_at is None or should_notify_at < soonest_due_at:
  160. soonest_due_at = should_notify_at
  161. if self.timed_call is not None:
  162. try:
  163. self.timed_call.cancel()
  164. except (AlreadyCalled, AlreadyCancelled):
  165. pass
  166. self.timed_call = None
  167. if soonest_due_at is not None:
  168. self.timed_call = self.hs.get_reactor().callLater(
  169. self.seconds_until(soonest_due_at), self.on_timer
  170. )
  171. @defer.inlineCallbacks
  172. def save_last_stream_ordering_and_success(self, last_stream_ordering):
  173. self.last_stream_ordering = last_stream_ordering
  174. yield self.store.update_pusher_last_stream_ordering_and_success(
  175. self.app_id, self.email, self.user_id,
  176. last_stream_ordering, self.clock.time_msec()
  177. )
  178. def seconds_until(self, ts_msec):
  179. secs = (ts_msec - self.clock.time_msec()) / 1000
  180. return max(secs, 0)
  181. def get_room_throttle_ms(self, room_id):
  182. if room_id in self.throttle_params:
  183. return self.throttle_params[room_id]["throttle_ms"]
  184. else:
  185. return 0
  186. def get_room_last_sent_ts(self, room_id):
  187. if room_id in self.throttle_params:
  188. return self.throttle_params[room_id]["last_sent_ts"]
  189. else:
  190. return 0
  191. def room_ready_to_notify_at(self, room_id):
  192. """
  193. Determines whether throttling should prevent us from sending an email
  194. for the given room
  195. Returns: The timestamp when we are next allowed to send an email notif
  196. for this room
  197. """
  198. last_sent_ts = self.get_room_last_sent_ts(room_id)
  199. throttle_ms = self.get_room_throttle_ms(room_id)
  200. may_send_at = last_sent_ts + throttle_ms
  201. return may_send_at
  202. @defer.inlineCallbacks
  203. def sent_notif_update_throttle(self, room_id, notified_push_action):
  204. # We have sent a notification, so update the throttle accordingly.
  205. # If the event that triggered the notif happened more than
  206. # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
  207. # notif, we release the throttle. Otherwise, the throttle is increased.
  208. time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
  209. notified_push_action['stream_ordering']
  210. )
  211. time_of_this_notifs = notified_push_action['received_ts']
  212. if time_of_previous_notifs is not None and time_of_this_notifs is not None:
  213. gap = time_of_this_notifs - time_of_previous_notifs
  214. else:
  215. # if we don't know the arrival time of one of the notifs (it was not
  216. # stored prior to email notification code) then assume a gap of
  217. # zero which will just not reset the throttle
  218. gap = 0
  219. current_throttle_ms = self.get_room_throttle_ms(room_id)
  220. if gap > THROTTLE_RESET_AFTER_MS:
  221. new_throttle_ms = THROTTLE_START_MS
  222. else:
  223. if current_throttle_ms == 0:
  224. new_throttle_ms = THROTTLE_START_MS
  225. else:
  226. new_throttle_ms = min(
  227. current_throttle_ms * THROTTLE_MULTIPLIER,
  228. THROTTLE_MAX_MS
  229. )
  230. self.throttle_params[room_id] = {
  231. "last_sent_ts": self.clock.time_msec(),
  232. "throttle_ms": new_throttle_ms
  233. }
  234. yield self.store.set_throttle_params(
  235. self.pusher_id, room_id, self.throttle_params[room_id]
  236. )
  237. @defer.inlineCallbacks
  238. def send_notification(self, push_actions, reason):
  239. logger.info("Sending notif email for user %r", self.user_id)
  240. yield self.mailer.send_notification_mail(
  241. self.app_id, self.user_id, self.email, push_actions, reason
  242. )