state.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. # Copyright 2022 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import (
  16. TYPE_CHECKING,
  17. Any,
  18. Awaitable,
  19. Callable,
  20. Collection,
  21. Dict,
  22. Iterable,
  23. List,
  24. Mapping,
  25. Optional,
  26. Set,
  27. Tuple,
  28. )
  29. from synapse.api.constants import EventTypes
  30. from synapse.events import EventBase
  31. from synapse.logging.opentracing import trace
  32. from synapse.storage.roommember import ProfileInfo
  33. from synapse.storage.state import StateFilter
  34. from synapse.storage.util.partial_state_events_tracker import (
  35. PartialCurrentStateTracker,
  36. PartialStateEventsTracker,
  37. )
  38. from synapse.types import MutableStateMap, StateMap
  39. if TYPE_CHECKING:
  40. from synapse.server import HomeServer
  41. from synapse.storage.databases import Databases
  42. logger = logging.getLogger(__name__)
  43. class StateStorageController:
  44. """High level interface to fetching state for an event, or the current state
  45. in a room.
  46. """
  47. def __init__(self, hs: "HomeServer", stores: "Databases"):
  48. self._is_mine_id = hs.is_mine_id
  49. self.stores = stores
  50. self._partial_state_events_tracker = PartialStateEventsTracker(stores.main)
  51. self._partial_state_room_tracker = PartialCurrentStateTracker(stores.main)
  52. def notify_event_un_partial_stated(self, event_id: str) -> None:
  53. self._partial_state_events_tracker.notify_un_partial_stated(event_id)
  54. def notify_room_un_partial_stated(self, room_id: str) -> None:
  55. """Notify that the room no longer has any partial state.
  56. Must be called after `DataStore.clear_partial_state_room`
  57. """
  58. self._partial_state_room_tracker.notify_un_partial_stated(room_id)
  59. async def get_state_group_delta(
  60. self, state_group: int
  61. ) -> Tuple[Optional[int], Optional[StateMap[str]]]:
  62. """Given a state group try to return a previous group and a delta between
  63. the old and the new.
  64. Args:
  65. state_group: The state group used to retrieve state deltas.
  66. Returns:
  67. A tuple of the previous group and a state map of the event IDs which
  68. make up the delta between the old and new state groups.
  69. """
  70. state_group_delta = await self.stores.state.get_state_group_delta(state_group)
  71. return state_group_delta.prev_group, state_group_delta.delta_ids
  72. async def get_state_groups_ids(
  73. self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True
  74. ) -> Dict[int, MutableStateMap[str]]:
  75. """Get the event IDs of all the state for the state groups for the given events
  76. Args:
  77. _room_id: id of the room for these events
  78. event_ids: ids of the events
  79. await_full_state: if `True`, will block if we do not yet have complete
  80. state at these events.
  81. Returns:
  82. dict of state_group_id -> (dict of (type, state_key) -> event id)
  83. Raises:
  84. RuntimeError if we don't have a state group for one or more of the events
  85. (ie they are outliers or unknown)
  86. """
  87. if not event_ids:
  88. return {}
  89. event_to_groups = await self.get_state_group_for_events(
  90. event_ids, await_full_state=await_full_state
  91. )
  92. groups = set(event_to_groups.values())
  93. group_to_state = await self.stores.state._get_state_for_groups(groups)
  94. return group_to_state
  95. async def get_state_ids_for_group(
  96. self, state_group: int, state_filter: Optional[StateFilter] = None
  97. ) -> StateMap[str]:
  98. """Get the event IDs of all the state in the given state group
  99. Args:
  100. state_group: A state group for which we want to get the state IDs.
  101. state_filter: specifies the type of state event to fetch from DB, example: EventTypes.JoinRules
  102. Returns:
  103. Resolves to a map of (type, state_key) -> event_id
  104. """
  105. group_to_state = await self.get_state_for_groups((state_group,), state_filter)
  106. return group_to_state[state_group]
  107. async def get_state_groups(
  108. self, room_id: str, event_ids: Collection[str]
  109. ) -> Dict[int, List[EventBase]]:
  110. """Get the state groups for the given list of event_ids
  111. Args:
  112. room_id: ID of the room for these events.
  113. event_ids: The event IDs to retrieve state for.
  114. Returns:
  115. dict of state_group_id -> list of state events.
  116. """
  117. if not event_ids:
  118. return {}
  119. group_to_ids = await self.get_state_groups_ids(room_id, event_ids)
  120. state_event_map = await self.stores.main.get_events(
  121. [
  122. ev_id
  123. for group_ids in group_to_ids.values()
  124. for ev_id in group_ids.values()
  125. ],
  126. get_prev_content=False,
  127. )
  128. return {
  129. group: [
  130. state_event_map[v]
  131. for v in event_id_map.values()
  132. if v in state_event_map
  133. ]
  134. for group, event_id_map in group_to_ids.items()
  135. }
  136. def _get_state_groups_from_groups(
  137. self, groups: List[int], state_filter: StateFilter
  138. ) -> Awaitable[Dict[int, StateMap[str]]]:
  139. """Returns the state groups for a given set of groups, filtering on
  140. types of state events.
  141. Args:
  142. groups: list of state group IDs to query
  143. state_filter: The state filter used to fetch state
  144. from the database.
  145. Returns:
  146. Dict of state group to state map.
  147. """
  148. return self.stores.state._get_state_groups_from_groups(groups, state_filter)
  149. @trace
  150. async def get_state_for_events(
  151. self, event_ids: Collection[str], state_filter: Optional[StateFilter] = None
  152. ) -> Dict[str, StateMap[EventBase]]:
  153. """Given a list of event_ids and type tuples, return a list of state
  154. dicts for each event.
  155. Args:
  156. event_ids: The events to fetch the state of.
  157. state_filter: The state filter used to fetch state.
  158. Returns:
  159. A dict of (event_id) -> (type, state_key) -> [state_events]
  160. Raises:
  161. RuntimeError if we don't have a state group for one or more of the events
  162. (ie they are outliers or unknown)
  163. """
  164. await_full_state = True
  165. if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
  166. await_full_state = False
  167. event_to_groups = await self.get_state_group_for_events(
  168. event_ids, await_full_state=await_full_state
  169. )
  170. groups = set(event_to_groups.values())
  171. group_to_state = await self.stores.state._get_state_for_groups(
  172. groups, state_filter or StateFilter.all()
  173. )
  174. state_event_map = await self.stores.main.get_events(
  175. [ev_id for sd in group_to_state.values() for ev_id in sd.values()],
  176. get_prev_content=False,
  177. )
  178. event_to_state = {
  179. event_id: {
  180. k: state_event_map[v]
  181. for k, v in group_to_state[group].items()
  182. if v in state_event_map
  183. }
  184. for event_id, group in event_to_groups.items()
  185. }
  186. return {event: event_to_state[event] for event in event_ids}
  187. @trace
  188. async def get_state_ids_for_events(
  189. self,
  190. event_ids: Collection[str],
  191. state_filter: Optional[StateFilter] = None,
  192. ) -> Dict[str, StateMap[str]]:
  193. """
  194. Get the state dicts corresponding to a list of events, containing the event_ids
  195. of the state events (as opposed to the events themselves)
  196. Args:
  197. event_ids: events whose state should be returned
  198. state_filter: The state filter used to fetch state from the database.
  199. Returns:
  200. A dict from event_id -> (type, state_key) -> event_id
  201. Raises:
  202. RuntimeError if we don't have a state group for one or more of the events
  203. (ie they are outliers or unknown)
  204. """
  205. await_full_state = True
  206. if state_filter and not state_filter.must_await_full_state(self._is_mine_id):
  207. await_full_state = False
  208. event_to_groups = await self.get_state_group_for_events(
  209. event_ids, await_full_state=await_full_state
  210. )
  211. groups = set(event_to_groups.values())
  212. group_to_state = await self.stores.state._get_state_for_groups(
  213. groups, state_filter or StateFilter.all()
  214. )
  215. event_to_state = {
  216. event_id: group_to_state[group]
  217. for event_id, group in event_to_groups.items()
  218. }
  219. return {event: event_to_state[event] for event in event_ids}
  220. async def get_state_for_event(
  221. self, event_id: str, state_filter: Optional[StateFilter] = None
  222. ) -> StateMap[EventBase]:
  223. """
  224. Get the state dict corresponding to a particular event
  225. Args:
  226. event_id: event whose state should be returned
  227. state_filter: The state filter used to fetch state from the database.
  228. Returns:
  229. A dict from (type, state_key) -> state_event
  230. Raises:
  231. RuntimeError if we don't have a state group for the event (ie it is an
  232. outlier or is unknown)
  233. """
  234. state_map = await self.get_state_for_events(
  235. [event_id], state_filter or StateFilter.all()
  236. )
  237. return state_map[event_id]
  238. @trace
  239. async def get_state_ids_for_event(
  240. self, event_id: str, state_filter: Optional[StateFilter] = None
  241. ) -> StateMap[str]:
  242. """
  243. Get the state dict corresponding to a particular event
  244. Args:
  245. event_id: event whose state should be returned
  246. state_filter: The state filter used to fetch state from the database.
  247. Returns:
  248. A dict from (type, state_key) -> state_event_id
  249. Raises:
  250. RuntimeError if we don't have a state group for the event (ie it is an
  251. outlier or is unknown)
  252. """
  253. state_map = await self.get_state_ids_for_events(
  254. [event_id], state_filter or StateFilter.all()
  255. )
  256. return state_map[event_id]
  257. def get_state_for_groups(
  258. self, groups: Iterable[int], state_filter: Optional[StateFilter] = None
  259. ) -> Awaitable[Dict[int, MutableStateMap[str]]]:
  260. """Gets the state at each of a list of state groups, optionally
  261. filtering by type/state_key
  262. Args:
  263. groups: list of state groups for which we want to get the state.
  264. state_filter: The state filter used to fetch state.
  265. from the database.
  266. Returns:
  267. Dict of state group to state map.
  268. """
  269. return self.stores.state._get_state_for_groups(
  270. groups, state_filter or StateFilter.all()
  271. )
  272. @trace
  273. async def get_state_group_for_events(
  274. self,
  275. event_ids: Collection[str],
  276. await_full_state: bool = True,
  277. ) -> Mapping[str, int]:
  278. """Returns mapping event_id -> state_group
  279. Args:
  280. event_ids: events to get state groups for
  281. await_full_state: if true, will block if we do not yet have complete
  282. state at these events.
  283. Raises:
  284. RuntimeError if we don't have a state group for one or more of the events
  285. (ie. they are outliers or unknown)
  286. """
  287. if await_full_state:
  288. await self._partial_state_events_tracker.await_full_state(event_ids)
  289. return await self.stores.main._get_state_group_for_events(event_ids)
  290. async def store_state_group(
  291. self,
  292. event_id: str,
  293. room_id: str,
  294. prev_group: Optional[int],
  295. delta_ids: Optional[StateMap[str]],
  296. current_state_ids: Optional[StateMap[str]],
  297. ) -> int:
  298. """Store a new set of state, returning a newly assigned state group.
  299. Args:
  300. event_id: The event ID for which the state was calculated.
  301. room_id: ID of the room for which the state was calculated.
  302. prev_group: A previous state group for the room, optional.
  303. delta_ids: The delta between state at `prev_group` and
  304. `current_state_ids`, if `prev_group` was given. Same format as
  305. `current_state_ids`.
  306. current_state_ids: The state to store. Map of (type, state_key)
  307. to event_id.
  308. Returns:
  309. The state group ID
  310. """
  311. return await self.stores.state.store_state_group(
  312. event_id, room_id, prev_group, delta_ids, current_state_ids
  313. )
  314. async def get_current_state_ids(
  315. self,
  316. room_id: str,
  317. state_filter: Optional[StateFilter] = None,
  318. on_invalidate: Optional[Callable[[], None]] = None,
  319. ) -> StateMap[str]:
  320. """Get the current state event ids for a room based on the
  321. current_state_events table.
  322. If a state filter is given (that is not `StateFilter.all()`) the query
  323. result is *not* cached.
  324. Args:
  325. room_id: The room to get the state IDs of. state_filter: The state
  326. filter used to fetch state from the
  327. database.
  328. on_invalidate: Callback for when the `get_current_state_ids` cache
  329. for the room gets invalidated.
  330. Returns:
  331. The current state of the room.
  332. """
  333. if not state_filter or state_filter.must_await_full_state(self._is_mine_id):
  334. await self._partial_state_room_tracker.await_full_state(room_id)
  335. if state_filter and not state_filter.is_full():
  336. return await self.stores.main.get_partial_filtered_current_state_ids(
  337. room_id, state_filter
  338. )
  339. else:
  340. return await self.stores.main.get_partial_current_state_ids(
  341. room_id, on_invalidate=on_invalidate
  342. )
  343. async def get_canonical_alias_for_room(self, room_id: str) -> Optional[str]:
  344. """Get canonical alias for room, if any
  345. Args:
  346. room_id: The room ID
  347. Returns:
  348. The canonical alias, if any
  349. """
  350. state = await self.get_current_state_ids(
  351. room_id, StateFilter.from_types([(EventTypes.CanonicalAlias, "")])
  352. )
  353. event_id = state.get((EventTypes.CanonicalAlias, ""))
  354. if not event_id:
  355. return None
  356. event = await self.stores.main.get_event(event_id, allow_none=True)
  357. if not event:
  358. return None
  359. return event.content.get("canonical_alias")
  360. async def get_current_state_deltas(
  361. self, prev_stream_id: int, max_stream_id: int
  362. ) -> Tuple[int, List[Dict[str, Any]]]:
  363. """Fetch a list of room state changes since the given stream id
  364. Each entry in the result contains the following fields:
  365. - stream_id (int)
  366. - room_id (str)
  367. - type (str): event type
  368. - state_key (str):
  369. - event_id (str|None): new event_id for this state key. None if the
  370. state has been deleted.
  371. - prev_event_id (str|None): previous event_id for this state key. None
  372. if it's new state.
  373. Args:
  374. prev_stream_id: point to get changes since (exclusive)
  375. max_stream_id: the point that we know has been correctly persisted
  376. - ie, an upper limit to return changes from.
  377. Returns:
  378. A tuple consisting of:
  379. - the stream id which these results go up to
  380. - list of current_state_delta_stream rows. If it is empty, we are
  381. up to date.
  382. """
  383. # FIXME(faster_joins): what do we do here?
  384. # https://github.com/matrix-org/synapse/issues/12814
  385. # https://github.com/matrix-org/synapse/issues/12815
  386. # https://github.com/matrix-org/synapse/issues/13008
  387. return await self.stores.main.get_partial_current_state_deltas(
  388. prev_stream_id, max_stream_id
  389. )
  390. async def get_current_state(
  391. self, room_id: str, state_filter: Optional[StateFilter] = None
  392. ) -> StateMap[EventBase]:
  393. """Same as `get_current_state_ids` but also fetches the events"""
  394. state_map_ids = await self.get_current_state_ids(room_id, state_filter)
  395. event_map = await self.stores.main.get_events(list(state_map_ids.values()))
  396. state_map = {}
  397. for key, event_id in state_map_ids.items():
  398. event = event_map.get(event_id)
  399. if event:
  400. state_map[key] = event
  401. return state_map
  402. async def get_current_state_event(
  403. self, room_id: str, event_type: str, state_key: str
  404. ) -> Optional[EventBase]:
  405. """Get the current state event for the given type/state_key."""
  406. key = (event_type, state_key)
  407. state_map = await self.get_current_state(
  408. room_id, StateFilter.from_types((key,))
  409. )
  410. return state_map.get(key)
  411. async def get_current_hosts_in_room(self, room_id: str) -> Set[str]:
  412. """Get current hosts in room based on current state."""
  413. await self._partial_state_room_tracker.await_full_state(room_id)
  414. return await self.stores.main.get_current_hosts_in_room(room_id)
  415. async def get_users_in_room_with_profiles(
  416. self, room_id: str
  417. ) -> Dict[str, ProfileInfo]:
  418. """
  419. Get the current users in the room with their profiles.
  420. If the room is currently partial-stated, this will block until the room has
  421. full state.
  422. """
  423. await self._partial_state_room_tracker.await_full_state(room_id)
  424. return await self.stores.main.get_users_in_room_with_profiles(room_id)