event_push_actions.py 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from ._base import SQLBaseStore
  16. from twisted.internet import defer
  17. from synapse.util.caches.descriptors import cachedInlineCallbacks
  18. import logging
  19. import ujson as json
  20. logger = logging.getLogger(__name__)
  21. class EventPushActionsStore(SQLBaseStore):
  22. def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
  23. """
  24. Args:
  25. event: the event set actions for
  26. tuples: list of tuples of (user_id, actions)
  27. """
  28. values = []
  29. for uid, actions in tuples:
  30. values.append({
  31. 'room_id': event.room_id,
  32. 'event_id': event.event_id,
  33. 'user_id': uid,
  34. 'actions': json.dumps(actions),
  35. 'stream_ordering': event.internal_metadata.stream_ordering,
  36. 'topological_ordering': event.depth,
  37. 'notif': 1,
  38. 'highlight': 1 if _action_has_highlight(actions) else 0,
  39. })
  40. for uid, __ in tuples:
  41. txn.call_after(
  42. self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
  43. (event.room_id, uid)
  44. )
  45. self._simple_insert_many_txn(txn, "event_push_actions", values)
  46. @cachedInlineCallbacks(num_args=3, lru=True, tree=True, max_entries=5000)
  47. def get_unread_event_push_actions_by_room_for_user(
  48. self, room_id, user_id, last_read_event_id
  49. ):
  50. def _get_unread_event_push_actions_by_room(txn):
  51. sql = (
  52. "SELECT stream_ordering, topological_ordering"
  53. " FROM events"
  54. " WHERE room_id = ? AND event_id = ?"
  55. )
  56. txn.execute(
  57. sql, (room_id, last_read_event_id)
  58. )
  59. results = txn.fetchall()
  60. if len(results) == 0:
  61. return {"notify_count": 0, "highlight_count": 0}
  62. stream_ordering = results[0][0]
  63. topological_ordering = results[0][1]
  64. sql = (
  65. "SELECT sum(notif), sum(highlight)"
  66. " FROM event_push_actions ea"
  67. " WHERE"
  68. " user_id = ?"
  69. " AND room_id = ?"
  70. " AND ("
  71. " topological_ordering > ?"
  72. " OR (topological_ordering = ? AND stream_ordering > ?)"
  73. ")"
  74. )
  75. txn.execute(sql, (
  76. user_id, room_id,
  77. topological_ordering, topological_ordering, stream_ordering
  78. ))
  79. row = txn.fetchone()
  80. if row:
  81. return {
  82. "notify_count": row[0] or 0,
  83. "highlight_count": row[1] or 0,
  84. }
  85. else:
  86. return {"notify_count": 0, "highlight_count": 0}
  87. ret = yield self.runInteraction(
  88. "get_unread_event_push_actions_by_room",
  89. _get_unread_event_push_actions_by_room
  90. )
  91. defer.returnValue(ret)
  92. @defer.inlineCallbacks
  93. def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
  94. def f(txn):
  95. sql = (
  96. "SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
  97. " stream_ordering >= ? AND stream_ordering <= ?"
  98. )
  99. txn.execute(sql, (min_stream_ordering, max_stream_ordering))
  100. return [r[0] for r in txn.fetchall()]
  101. ret = yield self.runInteraction("get_push_action_users_in_range", f)
  102. defer.returnValue(ret)
  103. @defer.inlineCallbacks
  104. def get_unread_push_actions_for_user_in_range(self, user_id,
  105. min_stream_ordering,
  106. max_stream_ordering=None):
  107. def get_after_receipt(txn):
  108. sql = (
  109. "SELECT ep.event_id, ep.stream_ordering, ep.actions "
  110. "FROM event_push_actions AS ep, ("
  111. " SELECT room_id, user_id,"
  112. " max(topological_ordering) as topological_ordering,"
  113. " max(stream_ordering) as stream_ordering"
  114. " FROM events"
  115. " NATURAL JOIN receipts_linearized WHERE receipt_type = 'm.read'"
  116. " GROUP BY room_id, user_id"
  117. ") AS rl "
  118. "WHERE"
  119. " ep.room_id = rl.room_id"
  120. " AND ("
  121. " ep.topological_ordering > rl.topological_ordering"
  122. " OR ("
  123. " ep.topological_ordering = rl.topological_ordering"
  124. " AND ep.stream_ordering > rl.stream_ordering"
  125. " )"
  126. " )"
  127. " AND ep.stream_ordering > ?"
  128. " AND ep.user_id = ?"
  129. " AND ep.user_id = rl.user_id"
  130. )
  131. args = [min_stream_ordering, user_id]
  132. if max_stream_ordering is not None:
  133. sql += " AND ep.stream_ordering <= ?"
  134. args.append(max_stream_ordering)
  135. sql += " ORDER BY ep.stream_ordering ASC"
  136. txn.execute(sql, args)
  137. return txn.fetchall()
  138. after_read_receipt = yield self.runInteraction(
  139. "get_unread_push_actions_for_user_in_range", get_after_receipt
  140. )
  141. def get_no_receipt(txn):
  142. sql = (
  143. "SELECT ep.event_id, ep.stream_ordering, ep.actions "
  144. "FROM event_push_actions AS ep "
  145. "WHERE ep.room_id not in ("
  146. " SELECT room_id FROM events NATURAL JOIN receipts_linearized"
  147. " WHERE receipt_type = 'm.read' AND user_id = ? "
  148. " GROUP BY room_id"
  149. ") AND ep.user_id = ? AND ep.stream_ordering > ?"
  150. )
  151. args = [user_id, user_id, min_stream_ordering]
  152. if max_stream_ordering is not None:
  153. sql += " AND ep.stream_ordering <= ?"
  154. args.append(max_stream_ordering)
  155. sql += " ORDER BY ep.stream_ordering ASC"
  156. txn.execute(sql, args)
  157. return txn.fetchall()
  158. no_read_receipt = yield self.runInteraction(
  159. "get_unread_push_actions_for_user_in_range", get_no_receipt
  160. )
  161. defer.returnValue([
  162. {
  163. "event_id": row[0],
  164. "stream_ordering": row[1],
  165. "actions": json.loads(row[2]),
  166. } for row in after_read_receipt + no_read_receipt
  167. ])
  168. @defer.inlineCallbacks
  169. def get_latest_push_action_stream_ordering(self):
  170. def f(txn):
  171. txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
  172. return txn.fetchone()
  173. result = yield self.runInteraction(
  174. "get_latest_push_action_stream_ordering", f
  175. )
  176. defer.returnValue(result[0] or 0)
  177. def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
  178. # Sad that we have to blow away the cache for the whole room here
  179. txn.call_after(
  180. self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
  181. (room_id,)
  182. )
  183. txn.execute(
  184. "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
  185. (room_id, event_id)
  186. )
  187. def _action_has_highlight(actions):
  188. for action in actions:
  189. try:
  190. if action.get("set_tweak", None) == "highlight":
  191. return action.get("value", True)
  192. except AttributeError:
  193. pass
  194. return False