event_push_actions.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from ._base import SQLBaseStore
  16. from twisted.internet import defer
  17. from synapse.util.caches.descriptors import cachedInlineCallbacks
  18. from synapse.types import RoomStreamToken
  19. from .stream import lower_bound
  20. import logging
  21. import ujson as json
  22. logger = logging.getLogger(__name__)
  23. class EventPushActionsStore(SQLBaseStore):
  24. EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
  25. def __init__(self, hs):
  26. self.stream_ordering_month_ago = None
  27. super(EventPushActionsStore, self).__init__(hs)
  28. self.register_background_index_update(
  29. self.EPA_HIGHLIGHT_INDEX,
  30. index_name="event_push_actions_u_highlight",
  31. table="event_push_actions",
  32. columns=["user_id", "stream_ordering"],
  33. )
  34. self.register_background_index_update(
  35. "event_push_actions_highlights_index",
  36. index_name="event_push_actions_highlights_index",
  37. table="event_push_actions",
  38. columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
  39. where_clause="highlight=1"
  40. )
  41. def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
  42. """
  43. Args:
  44. event: the event set actions for
  45. tuples: list of tuples of (user_id, actions)
  46. """
  47. values = []
  48. for uid, actions in tuples:
  49. values.append({
  50. 'room_id': event.room_id,
  51. 'event_id': event.event_id,
  52. 'user_id': uid,
  53. 'actions': json.dumps(actions),
  54. 'stream_ordering': event.internal_metadata.stream_ordering,
  55. 'topological_ordering': event.depth,
  56. 'notif': 1,
  57. 'highlight': 1 if _action_has_highlight(actions) else 0,
  58. })
  59. for uid, __ in tuples:
  60. txn.call_after(
  61. self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
  62. (event.room_id, uid)
  63. )
  64. self._simple_insert_many_txn(txn, "event_push_actions", values)
  65. @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
  66. def get_unread_event_push_actions_by_room_for_user(
  67. self, room_id, user_id, last_read_event_id
  68. ):
  69. def _get_unread_event_push_actions_by_room(txn):
  70. sql = (
  71. "SELECT stream_ordering, topological_ordering"
  72. " FROM events"
  73. " WHERE room_id = ? AND event_id = ?"
  74. )
  75. txn.execute(
  76. sql, (room_id, last_read_event_id)
  77. )
  78. results = txn.fetchall()
  79. if len(results) == 0:
  80. return {"notify_count": 0, "highlight_count": 0}
  81. stream_ordering = results[0][0]
  82. topological_ordering = results[0][1]
  83. token = RoomStreamToken(
  84. topological_ordering, stream_ordering
  85. )
  86. # First get number of notifications.
  87. # We don't need to put a notif=1 clause as all rows always have
  88. # notif=1
  89. sql = (
  90. "SELECT count(*)"
  91. " FROM event_push_actions ea"
  92. " WHERE"
  93. " user_id = ?"
  94. " AND room_id = ?"
  95. " AND %s"
  96. ) % (lower_bound(token, self.database_engine, inclusive=False),)
  97. txn.execute(sql, (user_id, room_id))
  98. row = txn.fetchone()
  99. notify_count = row[0] if row else 0
  100. # Now get the number of highlights
  101. sql = (
  102. "SELECT count(*)"
  103. " FROM event_push_actions ea"
  104. " WHERE"
  105. " highlight = 1"
  106. " AND user_id = ?"
  107. " AND room_id = ?"
  108. " AND %s"
  109. ) % (lower_bound(token, self.database_engine, inclusive=False),)
  110. txn.execute(sql, (user_id, room_id))
  111. row = txn.fetchone()
  112. highlight_count = row[0] if row else 0
  113. return {
  114. "notify_count": notify_count,
  115. "highlight_count": highlight_count,
  116. }
  117. ret = yield self.runInteraction(
  118. "get_unread_event_push_actions_by_room",
  119. _get_unread_event_push_actions_by_room
  120. )
  121. defer.returnValue(ret)
  122. @defer.inlineCallbacks
  123. def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
  124. def f(txn):
  125. sql = (
  126. "SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
  127. " stream_ordering >= ? AND stream_ordering <= ?"
  128. )
  129. txn.execute(sql, (min_stream_ordering, max_stream_ordering))
  130. return [r[0] for r in txn.fetchall()]
  131. ret = yield self.runInteraction("get_push_action_users_in_range", f)
  132. defer.returnValue(ret)
  133. @defer.inlineCallbacks
  134. def get_unread_push_actions_for_user_in_range_for_http(
  135. self, user_id, min_stream_ordering, max_stream_ordering, limit=20
  136. ):
  137. """Get a list of the most recent unread push actions for a given user,
  138. within the given stream ordering range. Called by the httppusher.
  139. Args:
  140. user_id (str): The user to fetch push actions for.
  141. min_stream_ordering(int): The exclusive lower bound on the
  142. stream ordering of event push actions to fetch.
  143. max_stream_ordering(int): The inclusive upper bound on the
  144. stream ordering of event push actions to fetch.
  145. limit (int): The maximum number of rows to return.
  146. Returns:
  147. A promise which resolves to a list of dicts with the keys "event_id",
  148. "room_id", "stream_ordering", "actions".
  149. The list will be ordered by ascending stream_ordering.
  150. The list will have between 0~limit entries.
  151. """
  152. # find rooms that have a read receipt in them and return the next
  153. # push actions
  154. def get_after_receipt(txn):
  155. # find rooms that have a read receipt in them and return the next
  156. # push actions
  157. sql = (
  158. "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions"
  159. " FROM ("
  160. " SELECT room_id,"
  161. " MAX(topological_ordering) as topological_ordering,"
  162. " MAX(stream_ordering) as stream_ordering"
  163. " FROM events"
  164. " INNER JOIN receipts_linearized USING (room_id, event_id)"
  165. " WHERE receipt_type = 'm.read' AND user_id = ?"
  166. " GROUP BY room_id"
  167. ") AS rl,"
  168. " event_push_actions AS ep"
  169. " WHERE"
  170. " ep.room_id = rl.room_id"
  171. " AND ("
  172. " ep.topological_ordering > rl.topological_ordering"
  173. " OR ("
  174. " ep.topological_ordering = rl.topological_ordering"
  175. " AND ep.stream_ordering > rl.stream_ordering"
  176. " )"
  177. " )"
  178. " AND ep.user_id = ?"
  179. " AND ep.stream_ordering > ?"
  180. " AND ep.stream_ordering <= ?"
  181. " ORDER BY ep.stream_ordering ASC LIMIT ?"
  182. )
  183. args = [
  184. user_id, user_id,
  185. min_stream_ordering, max_stream_ordering, limit,
  186. ]
  187. txn.execute(sql, args)
  188. return txn.fetchall()
  189. after_read_receipt = yield self.runInteraction(
  190. "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt
  191. )
  192. # There are rooms with push actions in them but you don't have a read receipt in
  193. # them e.g. rooms you've been invited to, so get push actions for rooms which do
  194. # not have read receipts in them too.
  195. def get_no_receipt(txn):
  196. sql = (
  197. "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
  198. " e.received_ts"
  199. " FROM event_push_actions AS ep"
  200. " INNER JOIN events AS e USING (room_id, event_id)"
  201. " WHERE"
  202. " ep.room_id NOT IN ("
  203. " SELECT room_id FROM receipts_linearized"
  204. " WHERE receipt_type = 'm.read' AND user_id = ?"
  205. " GROUP BY room_id"
  206. " )"
  207. " AND ep.user_id = ?"
  208. " AND ep.stream_ordering > ?"
  209. " AND ep.stream_ordering <= ?"
  210. " ORDER BY ep.stream_ordering ASC LIMIT ?"
  211. )
  212. args = [
  213. user_id, user_id,
  214. min_stream_ordering, max_stream_ordering, limit,
  215. ]
  216. txn.execute(sql, args)
  217. return txn.fetchall()
  218. no_read_receipt = yield self.runInteraction(
  219. "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt
  220. )
  221. notifs = [
  222. {
  223. "event_id": row[0],
  224. "room_id": row[1],
  225. "stream_ordering": row[2],
  226. "actions": json.loads(row[3]),
  227. } for row in after_read_receipt + no_read_receipt
  228. ]
  229. # Now sort it so it's ordered correctly, since currently it will
  230. # contain results from the first query, correctly ordered, followed
  231. # by results from the second query, but we want them all ordered
  232. # by stream_ordering, oldest first.
  233. notifs.sort(key=lambda r: r['stream_ordering'])
  234. # Take only up to the limit. We have to stop at the limit because
  235. # one of the subqueries may have hit the limit.
  236. defer.returnValue(notifs[:limit])
  237. @defer.inlineCallbacks
  238. def get_unread_push_actions_for_user_in_range_for_email(
  239. self, user_id, min_stream_ordering, max_stream_ordering, limit=20
  240. ):
  241. """Get a list of the most recent unread push actions for a given user,
  242. within the given stream ordering range. Called by the emailpusher
  243. Args:
  244. user_id (str): The user to fetch push actions for.
  245. min_stream_ordering(int): The exclusive lower bound on the
  246. stream ordering of event push actions to fetch.
  247. max_stream_ordering(int): The inclusive upper bound on the
  248. stream ordering of event push actions to fetch.
  249. limit (int): The maximum number of rows to return.
  250. Returns:
  251. A promise which resolves to a list of dicts with the keys "event_id",
  252. "room_id", "stream_ordering", "actions", "received_ts".
  253. The list will be ordered by descending received_ts.
  254. The list will have between 0~limit entries.
  255. """
  256. # find rooms that have a read receipt in them and return the most recent
  257. # push actions
  258. def get_after_receipt(txn):
  259. sql = (
  260. "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
  261. " e.received_ts"
  262. " FROM ("
  263. " SELECT room_id,"
  264. " MAX(topological_ordering) as topological_ordering,"
  265. " MAX(stream_ordering) as stream_ordering"
  266. " FROM events"
  267. " INNER JOIN receipts_linearized USING (room_id, event_id)"
  268. " WHERE receipt_type = 'm.read' AND user_id = ?"
  269. " GROUP BY room_id"
  270. ") AS rl,"
  271. " event_push_actions AS ep"
  272. " INNER JOIN events AS e USING (room_id, event_id)"
  273. " WHERE"
  274. " ep.room_id = rl.room_id"
  275. " AND ("
  276. " ep.topological_ordering > rl.topological_ordering"
  277. " OR ("
  278. " ep.topological_ordering = rl.topological_ordering"
  279. " AND ep.stream_ordering > rl.stream_ordering"
  280. " )"
  281. " )"
  282. " AND ep.user_id = ?"
  283. " AND ep.stream_ordering > ?"
  284. " AND ep.stream_ordering <= ?"
  285. " ORDER BY ep.stream_ordering DESC LIMIT ?"
  286. )
  287. args = [
  288. user_id, user_id,
  289. min_stream_ordering, max_stream_ordering, limit,
  290. ]
  291. txn.execute(sql, args)
  292. return txn.fetchall()
  293. after_read_receipt = yield self.runInteraction(
  294. "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt
  295. )
  296. # There are rooms with push actions in them but you don't have a read receipt in
  297. # them e.g. rooms you've been invited to, so get push actions for rooms which do
  298. # not have read receipts in them too.
  299. def get_no_receipt(txn):
  300. sql = (
  301. "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
  302. " e.received_ts"
  303. " FROM event_push_actions AS ep"
  304. " INNER JOIN events AS e USING (room_id, event_id)"
  305. " WHERE"
  306. " ep.room_id NOT IN ("
  307. " SELECT room_id FROM receipts_linearized"
  308. " WHERE receipt_type = 'm.read' AND user_id = ?"
  309. " GROUP BY room_id"
  310. " )"
  311. " AND ep.user_id = ?"
  312. " AND ep.stream_ordering > ?"
  313. " AND ep.stream_ordering <= ?"
  314. " ORDER BY ep.stream_ordering DESC LIMIT ?"
  315. )
  316. args = [
  317. user_id, user_id,
  318. min_stream_ordering, max_stream_ordering, limit,
  319. ]
  320. txn.execute(sql, args)
  321. return txn.fetchall()
  322. no_read_receipt = yield self.runInteraction(
  323. "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt
  324. )
  325. # Make a list of dicts from the two sets of results.
  326. notifs = [
  327. {
  328. "event_id": row[0],
  329. "room_id": row[1],
  330. "stream_ordering": row[2],
  331. "actions": json.loads(row[3]),
  332. "received_ts": row[4],
  333. } for row in after_read_receipt + no_read_receipt
  334. ]
  335. # Now sort it so it's ordered correctly, since currently it will
  336. # contain results from the first query, correctly ordered, followed
  337. # by results from the second query, but we want them all ordered
  338. # by received_ts (most recent first)
  339. notifs.sort(key=lambda r: -(r['received_ts'] or 0))
  340. # Now return the first `limit`
  341. defer.returnValue(notifs[:limit])
  342. @defer.inlineCallbacks
  343. def get_push_actions_for_user(self, user_id, before=None, limit=50,
  344. only_highlight=False):
  345. def f(txn):
  346. before_clause = ""
  347. if before:
  348. before_clause = "AND epa.stream_ordering < ?"
  349. args = [user_id, before, limit]
  350. else:
  351. args = [user_id, limit]
  352. if only_highlight:
  353. if len(before_clause) > 0:
  354. before_clause += " "
  355. before_clause += "AND epa.highlight = 1"
  356. # NB. This assumes event_ids are globally unique since
  357. # it makes the query easier to index
  358. sql = (
  359. "SELECT epa.event_id, epa.room_id,"
  360. " epa.stream_ordering, epa.topological_ordering,"
  361. " epa.actions, epa.profile_tag, e.received_ts"
  362. " FROM event_push_actions epa, events e"
  363. " WHERE epa.event_id = e.event_id"
  364. " AND epa.user_id = ? %s"
  365. " ORDER BY epa.stream_ordering DESC"
  366. " LIMIT ?"
  367. % (before_clause,)
  368. )
  369. txn.execute(sql, args)
  370. return self.cursor_to_dict(txn)
  371. push_actions = yield self.runInteraction(
  372. "get_push_actions_for_user", f
  373. )
  374. for pa in push_actions:
  375. pa["actions"] = json.loads(pa["actions"])
  376. defer.returnValue(push_actions)
  377. @defer.inlineCallbacks
  378. def get_time_of_last_push_action_before(self, stream_ordering):
  379. def f(txn):
  380. sql = (
  381. "SELECT e.received_ts"
  382. " FROM event_push_actions AS ep"
  383. " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
  384. " WHERE ep.stream_ordering > ?"
  385. " ORDER BY ep.stream_ordering ASC"
  386. " LIMIT 1"
  387. )
  388. txn.execute(sql, (stream_ordering,))
  389. return txn.fetchone()
  390. result = yield self.runInteraction("get_time_of_last_push_action_before", f)
  391. defer.returnValue(result[0] if result else None)
  392. @defer.inlineCallbacks
  393. def get_latest_push_action_stream_ordering(self):
  394. def f(txn):
  395. txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
  396. return txn.fetchone()
  397. result = yield self.runInteraction(
  398. "get_latest_push_action_stream_ordering", f
  399. )
  400. defer.returnValue(result[0] or 0)
  401. def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
  402. # Sad that we have to blow away the cache for the whole room here
  403. txn.call_after(
  404. self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
  405. (room_id,)
  406. )
  407. txn.execute(
  408. "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
  409. (room_id, event_id)
  410. )
  411. def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
  412. topological_ordering):
  413. """
  414. Purges old, stale push actions for a user and room before a given
  415. topological_ordering
  416. Args:
  417. txn: The transcation
  418. room_id: Room ID to delete from
  419. user_id: user ID to delete for
  420. topological_ordering: The lowest topological ordering which will
  421. not be deleted.
  422. """
  423. txn.call_after(
  424. self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
  425. (room_id, user_id, )
  426. )
  427. # We need to join on the events table to get the received_ts for
  428. # event_push_actions and sqlite won't let us use a join in a delete so
  429. # we can't just delete where received_ts < x. Furthermore we can
  430. # only identify event_push_actions by a tuple of room_id, event_id
  431. # we we can't use a subquery.
  432. # Instead, we look up the stream ordering for the last event in that
  433. # room received before the threshold time and delete event_push_actions
  434. # in the room with a stream_odering before that.
  435. txn.execute(
  436. "DELETE FROM event_push_actions "
  437. " WHERE user_id = ? AND room_id = ? AND "
  438. " topological_ordering < ? AND stream_ordering < ?",
  439. (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
  440. )
  441. @defer.inlineCallbacks
  442. def _find_stream_orderings_for_times(self):
  443. yield self.runInteraction(
  444. "_find_stream_orderings_for_times",
  445. self._find_stream_orderings_for_times_txn
  446. )
  447. def _find_stream_orderings_for_times_txn(self, txn):
  448. logger.info("Searching for stream ordering 1 month ago")
  449. self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
  450. txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
  451. )
  452. logger.info(
  453. "Found stream ordering 1 month ago: it's %d",
  454. self.stream_ordering_month_ago
  455. )
  456. def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
  457. """
  458. Find the stream_ordering of the first event that was received after
  459. a given timestamp. This is relatively slow as there is no index on
  460. received_ts but we can then use this to delete push actions before
  461. this.
  462. received_ts must necessarily be in the same order as stream_ordering
  463. and stream_ordering is indexed, so we manually binary search using
  464. stream_ordering
  465. """
  466. txn.execute("SELECT MAX(stream_ordering) FROM events")
  467. max_stream_ordering = txn.fetchone()[0]
  468. if max_stream_ordering is None:
  469. return 0
  470. range_start = 0
  471. range_end = max_stream_ordering
  472. sql = (
  473. "SELECT received_ts FROM events"
  474. " WHERE stream_ordering > ?"
  475. " ORDER BY stream_ordering"
  476. " LIMIT 1"
  477. )
  478. while range_end - range_start > 1:
  479. middle = int((range_end + range_start) / 2)
  480. txn.execute(sql, (middle,))
  481. middle_ts = txn.fetchone()[0]
  482. if ts > middle_ts:
  483. range_start = middle
  484. else:
  485. range_end = middle
  486. return range_end
  487. def _action_has_highlight(actions):
  488. for action in actions:
  489. try:
  490. if action.get("set_tweak", None) == "highlight":
  491. return action.get("value", True)
  492. except AttributeError:
  493. pass
  494. return False