emailpusher.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer, reactor
  16. import logging
  17. from synapse.util.metrics import Measure
  18. from synapse.util.logcontext import LoggingContext
  19. from mailer import Mailer
  20. logger = logging.getLogger(__name__)
  21. # The amount of time we always wait before ever emailing about a notification
  22. # (to give the user a chance to respond to other push or notice the window)
  23. DELAY_BEFORE_MAIL_MS = 10 * 60 * 1000
  24. # THROTTLE is the minimum time between mail notifications sent for a given room.
  25. # Each room maintains its own throttle counter, but each new mail notification
  26. # sends the pending notifications for all rooms.
  27. THROTTLE_START_MS = 10 * 60 * 1000
  28. THROTTLE_MAX_MS = 24 * 60 * 60 * 1000 # 24h
  29. # THROTTLE_MULTIPLIER = 6 # 10 mins, 1 hour, 6 hours, 24 hours
  30. THROTTLE_MULTIPLIER = 144 # 10 mins, 24 hours - i.e. jump straight to 1 day
  31. # If no event triggers a notification for this long after the previous,
  32. # the throttle is released.
  33. # 12 hours - a gap of 12 hours in conversation is surely enough to merit a new
  34. # notification when things get going again...
  35. THROTTLE_RESET_AFTER_MS = (12 * 60 * 60 * 1000)
  36. # does each email include all unread notifs, or just the ones which have happened
  37. # since the last mail?
  38. # XXX: this is currently broken as it includes ones from parted rooms(!)
  39. INCLUDE_ALL_UNREAD_NOTIFS = False
  40. class EmailPusher(object):
  41. """
  42. A pusher that sends email notifications about events (approximately)
  43. when they happen.
  44. This shares quite a bit of code with httpusher: it would be good to
  45. factor out the common parts
  46. """
  47. def __init__(self, hs, pusherdict):
  48. self.hs = hs
  49. self.store = self.hs.get_datastore()
  50. self.clock = self.hs.get_clock()
  51. self.pusher_id = pusherdict['id']
  52. self.user_id = pusherdict['user_name']
  53. self.app_id = pusherdict['app_id']
  54. self.email = pusherdict['pushkey']
  55. self.last_stream_ordering = pusherdict['last_stream_ordering']
  56. self.timed_call = None
  57. self.throttle_params = None
  58. # See httppusher
  59. self.max_stream_ordering = None
  60. self.processing = False
  61. if self.hs.config.email_enable_notifs:
  62. if 'data' in pusherdict and 'brand' in pusherdict['data']:
  63. app_name = pusherdict['data']['brand']
  64. else:
  65. app_name = self.hs.config.email_app_name
  66. self.mailer = Mailer(self.hs, app_name)
  67. else:
  68. self.mailer = None
  69. @defer.inlineCallbacks
  70. def on_started(self):
  71. if self.mailer is not None:
  72. self.throttle_params = yield self.store.get_throttle_params_by_room(
  73. self.pusher_id
  74. )
  75. yield self._process()
  76. def on_stop(self):
  77. if self.timed_call:
  78. self.timed_call.cancel()
  79. @defer.inlineCallbacks
  80. def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
  81. self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
  82. yield self._process()
  83. def on_new_receipts(self, min_stream_id, max_stream_id):
  84. # We could wake up and cancel the timer but there tend to be quite a
  85. # lot of read receipts so it's probably less work to just let the
  86. # timer fire
  87. return defer.succeed(None)
  88. @defer.inlineCallbacks
  89. def on_timer(self):
  90. self.timed_call = None
  91. yield self._process()
  92. @defer.inlineCallbacks
  93. def _process(self):
  94. if self.processing:
  95. return
  96. with LoggingContext("emailpush._process"):
  97. with Measure(self.clock, "emailpush._process"):
  98. try:
  99. self.processing = True
  100. # if the max ordering changes while we're running _unsafe_process,
  101. # call it again, and so on until we've caught up.
  102. while True:
  103. starting_max_ordering = self.max_stream_ordering
  104. try:
  105. yield self._unsafe_process()
  106. except:
  107. logger.exception("Exception processing notifs")
  108. if self.max_stream_ordering == starting_max_ordering:
  109. break
  110. finally:
  111. self.processing = False
  112. @defer.inlineCallbacks
  113. def _unsafe_process(self):
  114. """
  115. Main logic of the push loop without the wrapper function that sets
  116. up logging, measures and guards against multiple instances of it
  117. being run.
  118. """
  119. start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
  120. unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
  121. self.user_id, start, self.max_stream_ordering
  122. )
  123. soonest_due_at = None
  124. for push_action in unprocessed:
  125. received_at = push_action['received_ts']
  126. if received_at is None:
  127. received_at = 0
  128. notif_ready_at = received_at + DELAY_BEFORE_MAIL_MS
  129. room_ready_at = self.room_ready_to_notify_at(
  130. push_action['room_id']
  131. )
  132. should_notify_at = max(notif_ready_at, room_ready_at)
  133. if should_notify_at < self.clock.time_msec():
  134. # one of our notifications is ready for sending, so we send
  135. # *one* email updating the user on their notifications,
  136. # we then consider all previously outstanding notifications
  137. # to be delivered.
  138. reason = {
  139. 'room_id': push_action['room_id'],
  140. 'now': self.clock.time_msec(),
  141. 'received_at': received_at,
  142. 'delay_before_mail_ms': DELAY_BEFORE_MAIL_MS,
  143. 'last_sent_ts': self.get_room_last_sent_ts(push_action['room_id']),
  144. 'throttle_ms': self.get_room_throttle_ms(push_action['room_id']),
  145. }
  146. yield self.send_notification(unprocessed, reason)
  147. yield self.save_last_stream_ordering_and_success(max([
  148. ea['stream_ordering'] for ea in unprocessed
  149. ]))
  150. # we update the throttle on all the possible unprocessed push actions
  151. for ea in unprocessed:
  152. yield self.sent_notif_update_throttle(
  153. ea['room_id'], ea
  154. )
  155. break
  156. else:
  157. if soonest_due_at is None or should_notify_at < soonest_due_at:
  158. soonest_due_at = should_notify_at
  159. if self.timed_call is not None:
  160. self.timed_call.cancel()
  161. self.timed_call = None
  162. if soonest_due_at is not None:
  163. self.timed_call = reactor.callLater(
  164. self.seconds_until(soonest_due_at), self.on_timer
  165. )
  166. @defer.inlineCallbacks
  167. def save_last_stream_ordering_and_success(self, last_stream_ordering):
  168. self.last_stream_ordering = last_stream_ordering
  169. yield self.store.update_pusher_last_stream_ordering_and_success(
  170. self.app_id, self.email, self.user_id,
  171. last_stream_ordering, self.clock.time_msec()
  172. )
  173. def seconds_until(self, ts_msec):
  174. return (ts_msec - self.clock.time_msec()) / 1000
  175. def get_room_throttle_ms(self, room_id):
  176. if room_id in self.throttle_params:
  177. return self.throttle_params[room_id]["throttle_ms"]
  178. else:
  179. return 0
  180. def get_room_last_sent_ts(self, room_id):
  181. if room_id in self.throttle_params:
  182. return self.throttle_params[room_id]["last_sent_ts"]
  183. else:
  184. return 0
  185. def room_ready_to_notify_at(self, room_id):
  186. """
  187. Determines whether throttling should prevent us from sending an email
  188. for the given room
  189. Returns: The timestamp when we are next allowed to send an email notif
  190. for this room
  191. """
  192. last_sent_ts = self.get_room_last_sent_ts(room_id)
  193. throttle_ms = self.get_room_throttle_ms(room_id)
  194. may_send_at = last_sent_ts + throttle_ms
  195. return may_send_at
  196. @defer.inlineCallbacks
  197. def sent_notif_update_throttle(self, room_id, notified_push_action):
  198. # We have sent a notification, so update the throttle accordingly.
  199. # If the event that triggered the notif happened more than
  200. # THROTTLE_RESET_AFTER_MS after the previous one that triggered a
  201. # notif, we release the throttle. Otherwise, the throttle is increased.
  202. time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before(
  203. notified_push_action['stream_ordering']
  204. )
  205. time_of_this_notifs = notified_push_action['received_ts']
  206. if time_of_previous_notifs is not None and time_of_this_notifs is not None:
  207. gap = time_of_this_notifs - time_of_previous_notifs
  208. else:
  209. # if we don't know the arrival time of one of the notifs (it was not
  210. # stored prior to email notification code) then assume a gap of
  211. # zero which will just not reset the throttle
  212. gap = 0
  213. current_throttle_ms = self.get_room_throttle_ms(room_id)
  214. if gap > THROTTLE_RESET_AFTER_MS:
  215. new_throttle_ms = THROTTLE_START_MS
  216. else:
  217. if current_throttle_ms == 0:
  218. new_throttle_ms = THROTTLE_START_MS
  219. else:
  220. new_throttle_ms = min(
  221. current_throttle_ms * THROTTLE_MULTIPLIER,
  222. THROTTLE_MAX_MS
  223. )
  224. self.throttle_params[room_id] = {
  225. "last_sent_ts": self.clock.time_msec(),
  226. "throttle_ms": new_throttle_ms
  227. }
  228. yield self.store.set_throttle_params(
  229. self.pusher_id, room_id, self.throttle_params[room_id]
  230. )
  231. @defer.inlineCallbacks
  232. def send_notification(self, push_actions, reason):
  233. logger.info("Sending notif email for user %r", self.user_id)
  234. yield self.mailer.send_notification_mail(
  235. self.app_id, self.user_id, self.email, push_actions, reason
  236. )