bulk_push_rule_evaluator.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. # Copyright 2015 OpenMarket Ltd
  2. # Copyright 2017 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from typing import (
  17. TYPE_CHECKING,
  18. Any,
  19. Collection,
  20. Dict,
  21. List,
  22. Mapping,
  23. Optional,
  24. Tuple,
  25. Union,
  26. )
  27. from prometheus_client import Counter
  28. from synapse.api.constants import MAIN_TIMELINE, EventTypes, Membership, RelationTypes
  29. from synapse.event_auth import auth_types_for_event, get_user_power_level
  30. from synapse.events import EventBase, relation_from_event
  31. from synapse.events.snapshot import EventContext
  32. from synapse.state import POWER_KEY
  33. from synapse.storage.databases.main.roommember import EventIdMembership
  34. from synapse.storage.state import StateFilter
  35. from synapse.synapse_rust.push import FilteredPushRules, PushRuleEvaluator
  36. from synapse.util.caches import register_cache
  37. from synapse.util.metrics import measure_func
  38. from synapse.visibility import filter_event_for_clients_with_state
  39. if TYPE_CHECKING:
  40. from synapse.server import HomeServer
  41. logger = logging.getLogger(__name__)
  42. push_rules_invalidation_counter = Counter(
  43. "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", ""
  44. )
  45. push_rules_state_size_counter = Counter(
  46. "synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", ""
  47. )
  48. STATE_EVENT_TYPES_TO_MARK_UNREAD = {
  49. EventTypes.Topic,
  50. EventTypes.Name,
  51. EventTypes.RoomAvatar,
  52. EventTypes.Tombstone,
  53. }
  54. def _should_count_as_unread(event: EventBase, context: EventContext) -> bool:
  55. # Exclude rejected and soft-failed events.
  56. if context.rejected or event.internal_metadata.is_soft_failed():
  57. return False
  58. # Exclude notices.
  59. if (
  60. not event.is_state()
  61. and event.type == EventTypes.Message
  62. and event.content.get("msgtype") == "m.notice"
  63. ):
  64. return False
  65. # Exclude edits.
  66. relates_to = relation_from_event(event)
  67. if relates_to and relates_to.rel_type == RelationTypes.REPLACE:
  68. return False
  69. # Mark events that have a non-empty string body as unread.
  70. body = event.content.get("body")
  71. if isinstance(body, str) and body:
  72. return True
  73. # Mark some state events as unread.
  74. if event.is_state() and event.type in STATE_EVENT_TYPES_TO_MARK_UNREAD:
  75. return True
  76. # Mark encrypted events as unread.
  77. if not event.is_state() and event.type == EventTypes.Encrypted:
  78. return True
  79. return False
  80. class BulkPushRuleEvaluator:
  81. """Calculates the outcome of push rules for an event for all users in the
  82. room at once.
  83. """
  84. def __init__(self, hs: "HomeServer"):
  85. self.hs = hs
  86. self.store = hs.get_datastores().main
  87. self.clock = hs.get_clock()
  88. self._event_auth_handler = hs.get_event_auth_handler()
  89. self.room_push_rule_cache_metrics = register_cache(
  90. "cache",
  91. "room_push_rule_cache",
  92. cache=[], # Meaningless size, as this isn't a cache that stores values,
  93. resizable=False,
  94. )
  95. async def _get_rules_for_event(
  96. self,
  97. event: EventBase,
  98. ) -> Dict[str, FilteredPushRules]:
  99. """Get the push rules for all users who may need to be notified about
  100. the event.
  101. Note: this does not check if the user is allowed to see the event.
  102. Returns:
  103. Mapping of user ID to their push rules.
  104. """
  105. # We get the users who may need to be notified by first fetching the
  106. # local users currently in the room, finding those that have push rules,
  107. # and *then* checking which users are actually allowed to see the event.
  108. #
  109. # The alternative is to first fetch all users that were joined at the
  110. # event, but that requires fetching the full state at the event, which
  111. # may be expensive for large rooms with few local users.
  112. local_users = await self.store.get_local_users_in_room(event.room_id)
  113. # Filter out appservice users.
  114. local_users = [
  115. u
  116. for u in local_users
  117. if not self.store.get_if_app_services_interested_in_user(u)
  118. ]
  119. # if this event is an invite event, we may need to run rules for the user
  120. # who's been invited, otherwise they won't get told they've been invited
  121. if event.type == EventTypes.Member and event.membership == Membership.INVITE:
  122. invited = event.state_key
  123. if invited and self.hs.is_mine_id(invited) and invited not in local_users:
  124. local_users = list(local_users)
  125. local_users.append(invited)
  126. rules_by_user = await self.store.bulk_get_push_rules(local_users)
  127. logger.debug("Users in room: %s", local_users)
  128. if logger.isEnabledFor(logging.DEBUG):
  129. logger.debug(
  130. "Returning push rules for %r %r",
  131. event.room_id,
  132. list(rules_by_user.keys()),
  133. )
  134. return rules_by_user
  135. async def _get_power_levels_and_sender_level(
  136. self, event: EventBase, context: EventContext
  137. ) -> Tuple[dict, Optional[int]]:
  138. # There are no power levels and sender levels possible to get from outlier
  139. if event.internal_metadata.is_outlier():
  140. return {}, None
  141. event_types = auth_types_for_event(event.room_version, event)
  142. prev_state_ids = await context.get_prev_state_ids(
  143. StateFilter.from_types(event_types)
  144. )
  145. pl_event_id = prev_state_ids.get(POWER_KEY)
  146. if pl_event_id:
  147. # fastpath: if there's a power level event, that's all we need, and
  148. # not having a power level event is an extreme edge case
  149. auth_events = {POWER_KEY: await self.store.get_event(pl_event_id)}
  150. else:
  151. auth_events_ids = self._event_auth_handler.compute_auth_events(
  152. event, prev_state_ids, for_verification=False
  153. )
  154. auth_events_dict = await self.store.get_events(auth_events_ids)
  155. auth_events = {(e.type, e.state_key): e for e in auth_events_dict.values()}
  156. sender_level = get_user_power_level(event.sender, auth_events)
  157. pl_event = auth_events.get(POWER_KEY)
  158. return pl_event.content if pl_event else {}, sender_level
  159. @measure_func("action_for_event_by_user")
  160. async def action_for_event_by_user(
  161. self, event: EventBase, context: EventContext
  162. ) -> None:
  163. """Given an event and context, evaluate the push rules, check if the message
  164. should increment the unread count, and insert the results into the
  165. event_push_actions_staging table.
  166. """
  167. if not event.internal_metadata.is_notifiable():
  168. # Push rules for events that aren't notifiable can't be processed by this
  169. return
  170. # Disable counting as unread unless the experimental configuration is
  171. # enabled, as it can cause additional (unwanted) rows to be added to the
  172. # event_push_actions table.
  173. count_as_unread = False
  174. if self.hs.config.experimental.msc2654_enabled:
  175. count_as_unread = _should_count_as_unread(event, context)
  176. rules_by_user = await self._get_rules_for_event(event)
  177. actions_by_user: Dict[str, Collection[Union[Mapping, str]]] = {}
  178. room_member_count = await self.store.get_number_joined_users_in_room(
  179. event.room_id
  180. )
  181. (
  182. power_levels,
  183. sender_power_level,
  184. ) = await self._get_power_levels_and_sender_level(event, context)
  185. # Find the event's thread ID.
  186. relation = relation_from_event(event)
  187. # If the event does not have a relation, then it cannot have a thread ID.
  188. thread_id = MAIN_TIMELINE
  189. if relation:
  190. # Recursively attempt to find the thread this event relates to.
  191. if relation.rel_type == RelationTypes.THREAD:
  192. thread_id = relation.parent_id
  193. else:
  194. # Since the event has not yet been persisted we check whether
  195. # the parent is part of a thread.
  196. thread_id = await self.store.get_thread_id(relation.parent_id) or "main"
  197. # It's possible that old room versions have non-integer power levels (floats or
  198. # strings). Workaround this by explicitly converting to int.
  199. notification_levels = power_levels.get("notifications", {})
  200. if not event.room_version.msc3667_int_only_power_levels:
  201. for user_id, level in notification_levels.items():
  202. notification_levels[user_id] = int(level)
  203. evaluator = PushRuleEvaluator(
  204. _flatten_dict(event),
  205. room_member_count,
  206. sender_power_level,
  207. notification_levels,
  208. )
  209. users = rules_by_user.keys()
  210. profiles = await self.store.get_subset_users_in_room_with_profiles(
  211. event.room_id, users
  212. )
  213. for uid, rules in rules_by_user.items():
  214. if event.sender == uid:
  215. continue
  216. display_name = None
  217. profile = profiles.get(uid)
  218. if profile:
  219. display_name = profile.display_name
  220. if not display_name:
  221. # Handle the case where we are pushing a membership event to
  222. # that user, as they might not be already joined.
  223. if event.type == EventTypes.Member and event.state_key == uid:
  224. display_name = event.content.get("displayname", None)
  225. if not isinstance(display_name, str):
  226. display_name = None
  227. if count_as_unread:
  228. # Add an element for the current user if the event needs to be marked as
  229. # unread, so that add_push_actions_to_staging iterates over it.
  230. # If the event shouldn't be marked as unread but should notify the
  231. # current user, it'll be added to the dict later.
  232. actions_by_user[uid] = []
  233. actions = evaluator.run(rules, uid, display_name)
  234. if "notify" in actions:
  235. # Push rules say we should notify the user of this event
  236. actions_by_user[uid] = actions
  237. # If there aren't any actions then we can skip the rest of the
  238. # processing.
  239. if not actions_by_user:
  240. return
  241. # This is a check for the case where user joins a room without being
  242. # allowed to see history, and then the server receives a delayed event
  243. # from before the user joined, which they should not be pushed for
  244. #
  245. # We do this *after* calculating the push actions as a) its unlikely
  246. # that we'll filter anyone out and b) for large rooms its likely that
  247. # most users will have push disabled and so the set of users to check is
  248. # much smaller.
  249. uids_with_visibility = await filter_event_for_clients_with_state(
  250. self.store, actions_by_user.keys(), event, context
  251. )
  252. for user_id in set(actions_by_user).difference(uids_with_visibility):
  253. actions_by_user.pop(user_id, None)
  254. # Mark in the DB staging area the push actions for users who should be
  255. # notified for this event. (This will then get handled when we persist
  256. # the event)
  257. await self.store.add_push_actions_to_staging(
  258. event.event_id,
  259. actions_by_user,
  260. count_as_unread,
  261. thread_id,
  262. )
  263. MemberMap = Dict[str, Optional[EventIdMembership]]
  264. Rule = Dict[str, dict]
  265. RulesByUser = Dict[str, List[Rule]]
  266. StateGroup = Union[object, int]
  267. def _flatten_dict(
  268. d: Union[EventBase, Mapping[str, Any]],
  269. prefix: Optional[List[str]] = None,
  270. result: Optional[Dict[str, str]] = None,
  271. ) -> Dict[str, str]:
  272. if prefix is None:
  273. prefix = []
  274. if result is None:
  275. result = {}
  276. for key, value in d.items():
  277. if isinstance(value, str):
  278. result[".".join(prefix + [key])] = value.lower()
  279. elif isinstance(value, Mapping):
  280. _flatten_dict(value, prefix=(prefix + [key]), result=result)
  281. return result