bulk_push_rule_evaluator.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import ujson as json
  17. from twisted.internet import defer
  18. from .push_rule_evaluator import PushRuleEvaluatorForEvent
  19. from synapse.api.constants import EventTypes, Membership
  20. from synapse.visibility import filter_events_for_clients
  21. from synapse.util.logutils import log_duration
  22. logger = logging.getLogger(__name__)
  23. def decode_rule_json(rule):
  24. rule = dict(rule)
  25. rule['conditions'] = json.loads(rule['conditions'])
  26. rule['actions'] = json.loads(rule['actions'])
  27. return rule
  28. @defer.inlineCallbacks
  29. def _get_rules(room_id, user_ids, store):
  30. with log_duration("bulk_get_push_rules"):
  31. rules_by_user = yield store.bulk_get_push_rules(user_ids)
  32. rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None}
  33. defer.returnValue(rules_by_user)
  34. @defer.inlineCallbacks
  35. def evaluator_for_event(event, hs, store, current_state):
  36. room_id = event.room_id
  37. # We also will want to generate notifs for other people in the room so
  38. # their unread countss are correct in the event stream, but to avoid
  39. # generating them for bot / AS users etc, we only do so for people who've
  40. # sent a read receipt into the room.
  41. with log_duration("get_users_in_room"):
  42. local_users_in_room = set(
  43. e.state_key for e in current_state.values()
  44. if e.type == EventTypes.Member and e.membership == Membership.JOIN
  45. and hs.is_mine_id(e.state_key)
  46. )
  47. # users in the room who have pushers need to get push rules run because
  48. # that's how their pushers work
  49. with log_duration("get_users_with_pushers_in_room"):
  50. if_users_with_pushers = yield store.get_if_users_have_pushers(
  51. local_users_in_room
  52. )
  53. user_ids = set(
  54. uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
  55. )
  56. users_with_receipts = yield store.get_users_with_read_receipts_in_room(room_id)
  57. # any users with pushers must be ours: they have pushers
  58. for uid in users_with_receipts:
  59. if uid in local_users_in_room:
  60. user_ids.add(uid)
  61. # if this event is an invite event, we may need to run rules for the user
  62. # who's been invited, otherwise they won't get told they've been invited
  63. if event.type == 'm.room.member' and event.content['membership'] == 'invite':
  64. invited_user = event.state_key
  65. if invited_user and hs.is_mine_id(invited_user):
  66. has_pusher = yield store.user_has_pusher(invited_user)
  67. if has_pusher:
  68. user_ids.add(invited_user)
  69. rules_by_user = yield _get_rules(room_id, user_ids, store)
  70. defer.returnValue(BulkPushRuleEvaluator(
  71. room_id, rules_by_user, user_ids, store
  72. ))
  73. class BulkPushRuleEvaluator:
  74. """
  75. Runs push rules for all users in a room.
  76. This is faster than running PushRuleEvaluator for each user because it
  77. fetches all the rules for all the users in one (batched) db query
  78. rather than doing multiple queries per-user. It currently uses
  79. the same logic to run the actual rules, but could be optimised further
  80. (see https://matrix.org/jira/browse/SYN-562)
  81. """
  82. def __init__(self, room_id, rules_by_user, users_in_room, store):
  83. self.room_id = room_id
  84. self.rules_by_user = rules_by_user
  85. self.users_in_room = users_in_room
  86. self.store = store
  87. @defer.inlineCallbacks
  88. def action_for_event_by_user(self, event, current_state):
  89. actions_by_user = {}
  90. # None of these users can be peeking since this list of users comes
  91. # from the set of users in the room, so we know for sure they're all
  92. # actually in the room.
  93. user_tuples = [
  94. (u, False) for u in self.rules_by_user.keys()
  95. ]
  96. filtered_by_user = yield filter_events_for_clients(
  97. self.store, user_tuples, [event], {event.event_id: current_state}
  98. )
  99. room_members = set(
  100. e.state_key for e in current_state.values()
  101. if e.type == EventTypes.Member and e.membership == Membership.JOIN
  102. )
  103. evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
  104. condition_cache = {}
  105. display_names = {}
  106. for ev in current_state.values():
  107. nm = ev.content.get("displayname", None)
  108. if nm and ev.type == EventTypes.Member:
  109. display_names[ev.state_key] = nm
  110. for uid, rules in self.rules_by_user.items():
  111. display_name = display_names.get(uid, None)
  112. filtered = filtered_by_user[uid]
  113. if len(filtered) == 0:
  114. continue
  115. if filtered[0].sender == uid:
  116. continue
  117. for rule in rules:
  118. if 'enabled' in rule and not rule['enabled']:
  119. continue
  120. matches = _condition_checker(
  121. evaluator, rule['conditions'], uid, display_name, condition_cache
  122. )
  123. if matches:
  124. actions = [x for x in rule['actions'] if x != 'dont_notify']
  125. if actions and 'notify' in actions:
  126. actions_by_user[uid] = actions
  127. break
  128. defer.returnValue(actions_by_user)
  129. def _condition_checker(evaluator, conditions, uid, display_name, cache):
  130. for cond in conditions:
  131. _id = cond.get("_id", None)
  132. if _id:
  133. res = cache.get(_id, None)
  134. if res is False:
  135. return False
  136. elif res is True:
  137. continue
  138. res = evaluator.matches(cond, uid, display_name)
  139. if _id:
  140. cache[_id] = bool(res)
  141. if not res:
  142. return False
  143. return True