state.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014, 2015 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from ._base import SQLBaseStore
  16. from synapse.util.caches.descriptors import (
  17. cached, cachedInlineCallbacks, cachedList
  18. )
  19. from twisted.internet import defer
  20. import logging
  21. logger = logging.getLogger(__name__)
  22. class StateStore(SQLBaseStore):
  23. """ Keeps track of the state at a given event.
  24. This is done by the concept of `state groups`. Every event is a assigned
  25. a state group (identified by an arbitrary string), which references a
  26. collection of state events. The current state of an event is then the
  27. collection of state events referenced by the event's state group.
  28. Hence, every change in the current state causes a new state group to be
  29. generated. However, if no change happens (e.g., if we get a message event
  30. with only one parent it inherits the state group from its parent.)
  31. There are three tables:
  32. * `state_groups`: Stores group name, first event with in the group and
  33. room id.
  34. * `event_to_state_groups`: Maps events to state groups.
  35. * `state_groups_state`: Maps state group to state events.
  36. """
  37. @defer.inlineCallbacks
  38. def get_state_groups(self, room_id, event_ids):
  39. """ Get the state groups for the given list of event_ids
  40. The return value is a dict mapping group names to lists of events.
  41. """
  42. if not event_ids:
  43. defer.returnValue({})
  44. event_to_groups = yield self._get_state_group_for_events(
  45. event_ids,
  46. )
  47. groups = set(event_to_groups.values())
  48. group_to_state = yield self._get_state_for_groups(groups)
  49. defer.returnValue({
  50. group: state_map.values()
  51. for group, state_map in group_to_state.items()
  52. })
  53. def _store_state_groups_txn(self, txn, event, context):
  54. return self._store_mult_state_groups_txn(txn, [(event, context)])
  55. def _store_mult_state_groups_txn(self, txn, events_and_contexts):
  56. state_groups = {}
  57. for event, context in events_and_contexts:
  58. if context.current_state is None:
  59. continue
  60. if context.state_group is not None:
  61. state_groups[event.event_id] = context.state_group
  62. continue
  63. state_events = dict(context.current_state)
  64. if event.is_state():
  65. state_events[(event.type, event.state_key)] = event
  66. state_group = self._state_groups_id_gen.get_next_txn(txn)
  67. self._simple_insert_txn(
  68. txn,
  69. table="state_groups",
  70. values={
  71. "id": state_group,
  72. "room_id": event.room_id,
  73. "event_id": event.event_id,
  74. },
  75. )
  76. self._simple_insert_many_txn(
  77. txn,
  78. table="state_groups_state",
  79. values=[
  80. {
  81. "state_group": state_group,
  82. "room_id": state.room_id,
  83. "type": state.type,
  84. "state_key": state.state_key,
  85. "event_id": state.event_id,
  86. }
  87. for state in state_events.values()
  88. ],
  89. )
  90. state_groups[event.event_id] = state_group
  91. self._simple_insert_many_txn(
  92. txn,
  93. table="event_to_state_groups",
  94. values=[
  95. {
  96. "state_group": state_groups[event.event_id],
  97. "event_id": event.event_id,
  98. }
  99. for event, context in events_and_contexts
  100. if context.current_state is not None
  101. ],
  102. )
  103. @defer.inlineCallbacks
  104. def get_current_state(self, room_id, event_type=None, state_key=""):
  105. if event_type and state_key is not None:
  106. result = yield self.get_current_state_for_key(
  107. room_id, event_type, state_key
  108. )
  109. defer.returnValue(result)
  110. def f(txn):
  111. sql = (
  112. "SELECT event_id FROM current_state_events"
  113. " WHERE room_id = ? "
  114. )
  115. if event_type and state_key is not None:
  116. sql += " AND type = ? AND state_key = ? "
  117. args = (room_id, event_type, state_key)
  118. elif event_type:
  119. sql += " AND type = ?"
  120. args = (room_id, event_type)
  121. else:
  122. args = (room_id, )
  123. txn.execute(sql, args)
  124. results = txn.fetchall()
  125. return [r[0] for r in results]
  126. event_ids = yield self.runInteraction("get_current_state", f)
  127. events = yield self._get_events(event_ids, get_prev_content=False)
  128. defer.returnValue(events)
  129. @cachedInlineCallbacks(num_args=3)
  130. def get_current_state_for_key(self, room_id, event_type, state_key):
  131. def f(txn):
  132. sql = (
  133. "SELECT event_id FROM current_state_events"
  134. " WHERE room_id = ? AND type = ? AND state_key = ?"
  135. )
  136. args = (room_id, event_type, state_key)
  137. txn.execute(sql, args)
  138. results = txn.fetchall()
  139. return [r[0] for r in results]
  140. event_ids = yield self.runInteraction("get_current_state_for_key", f)
  141. events = yield self._get_events(event_ids, get_prev_content=False)
  142. defer.returnValue(events)
  143. def _get_state_groups_from_groups(self, groups_and_types):
  144. """Returns dictionary state_group -> state event ids
  145. Args:
  146. groups_and_types (list): list of 2-tuple (`group`, `types`)
  147. """
  148. def f(txn):
  149. results = {}
  150. for group, types in groups_and_types:
  151. if types is not None:
  152. where_clause = "AND (%s)" % (
  153. " OR ".join(["(type = ? AND state_key = ?)"] * len(types)),
  154. )
  155. else:
  156. where_clause = ""
  157. sql = (
  158. "SELECT event_id FROM state_groups_state WHERE"
  159. " state_group = ? %s"
  160. ) % (where_clause,)
  161. args = [group]
  162. if types is not None:
  163. args.extend([i for typ in types for i in typ])
  164. txn.execute(sql, args)
  165. results[group] = [r[0] for r in txn.fetchall()]
  166. return results
  167. return self.runInteraction(
  168. "_get_state_groups_from_groups",
  169. f,
  170. )
  171. @defer.inlineCallbacks
  172. def get_state_for_events(self, event_ids, types):
  173. """Given a list of event_ids and type tuples, return a list of state
  174. dicts for each event. The state dicts will only have the type/state_keys
  175. that are in the `types` list.
  176. Args:
  177. event_ids (list)
  178. types (list): List of (type, state_key) tuples which are used to
  179. filter the state fetched. `state_key` may be None, which matches
  180. any `state_key`
  181. Returns:
  182. deferred: A list of dicts corresponding to the event_ids given.
  183. The dicts are mappings from (type, state_key) -> state_events
  184. """
  185. event_to_groups = yield self._get_state_group_for_events(
  186. event_ids,
  187. )
  188. groups = set(event_to_groups.values())
  189. group_to_state = yield self._get_state_for_groups(groups, types)
  190. event_to_state = {
  191. event_id: group_to_state[group]
  192. for event_id, group in event_to_groups.items()
  193. }
  194. defer.returnValue({event: event_to_state[event] for event in event_ids})
  195. @cached(num_args=2, lru=True, max_entries=10000)
  196. def _get_state_group_for_event(self, room_id, event_id):
  197. return self._simple_select_one_onecol(
  198. table="event_to_state_groups",
  199. keyvalues={
  200. "event_id": event_id,
  201. },
  202. retcol="state_group",
  203. allow_none=True,
  204. desc="_get_state_group_for_event",
  205. )
  206. @cachedList(cache=_get_state_group_for_event.cache, list_name="event_ids",
  207. num_args=1)
  208. def _get_state_group_for_events(self, event_ids):
  209. """Returns mapping event_id -> state_group
  210. """
  211. def f(txn):
  212. results = {}
  213. for event_id in event_ids:
  214. results[event_id] = self._simple_select_one_onecol_txn(
  215. txn,
  216. table="event_to_state_groups",
  217. keyvalues={
  218. "event_id": event_id,
  219. },
  220. retcol="state_group",
  221. allow_none=True,
  222. )
  223. return results
  224. return self.runInteraction("_get_state_group_for_events", f)
  225. def _get_some_state_from_cache(self, group, types):
  226. """Checks if group is in cache. See `_get_state_for_groups`
  227. Returns 3-tuple (`state_dict`, `missing_types`, `got_all`).
  228. `missing_types` is the list of types that aren't in the cache for that
  229. group. `got_all` is a bool indicating if we successfully retrieved all
  230. requests state from the cache, if False we need to query the DB for the
  231. missing state.
  232. Args:
  233. group: The state group to lookup
  234. types (list): List of 2-tuples of the form (`type`, `state_key`),
  235. where a `state_key` of `None` matches all state_keys for the
  236. `type`.
  237. """
  238. is_all, state_dict = self._state_group_cache.get(group)
  239. type_to_key = {}
  240. missing_types = set()
  241. for typ, state_key in types:
  242. if state_key is None:
  243. type_to_key[typ] = None
  244. missing_types.add((typ, state_key))
  245. else:
  246. if type_to_key.get(typ, object()) is not None:
  247. type_to_key.setdefault(typ, set()).add(state_key)
  248. if (typ, state_key) not in state_dict:
  249. missing_types.add((typ, state_key))
  250. sentinel = object()
  251. def include(typ, state_key):
  252. valid_state_keys = type_to_key.get(typ, sentinel)
  253. if valid_state_keys is sentinel:
  254. return False
  255. if valid_state_keys is None:
  256. return True
  257. if state_key in valid_state_keys:
  258. return True
  259. return False
  260. got_all = not (missing_types or types is None)
  261. return {
  262. k: v for k, v in state_dict.items()
  263. if include(k[0], k[1])
  264. }, missing_types, got_all
  265. def _get_all_state_from_cache(self, group):
  266. """Checks if group is in cache. See `_get_state_for_groups`
  267. Returns 2-tuple (`state_dict`, `got_all`). `got_all` is a bool
  268. indicating if we successfully retrieved all requests state from the
  269. cache, if False we need to query the DB for the missing state.
  270. Args:
  271. group: The state group to lookup
  272. """
  273. is_all, state_dict = self._state_group_cache.get(group)
  274. return state_dict, is_all
  275. @defer.inlineCallbacks
  276. def _get_state_for_groups(self, groups, types=None):
  277. """Given list of groups returns dict of group -> list of state events
  278. with matching types. `types` is a list of `(type, state_key)`, where
  279. a `state_key` of None matches all state_keys. If `types` is None then
  280. all events are returned.
  281. """
  282. results = {}
  283. missing_groups_and_types = []
  284. if types is not None:
  285. for group in set(groups):
  286. state_dict, missing_types, got_all = self._get_some_state_from_cache(
  287. group, types
  288. )
  289. results[group] = state_dict
  290. if not got_all:
  291. missing_groups_and_types.append((group, missing_types))
  292. else:
  293. for group in set(groups):
  294. state_dict, got_all = self._get_all_state_from_cache(
  295. group
  296. )
  297. results[group] = state_dict
  298. if not got_all:
  299. missing_groups_and_types.append((group, None))
  300. if not missing_groups_and_types:
  301. defer.returnValue({
  302. group: {
  303. type_tuple: event
  304. for type_tuple, event in state.items()
  305. if event
  306. }
  307. for group, state in results.items()
  308. })
  309. # Okay, so we have some missing_types, lets fetch them.
  310. cache_seq_num = self._state_group_cache.sequence
  311. group_state_dict = yield self._get_state_groups_from_groups(
  312. missing_groups_and_types
  313. )
  314. state_events = yield self._get_events(
  315. [e_id for l in group_state_dict.values() for e_id in l],
  316. get_prev_content=False
  317. )
  318. state_events = {e.event_id: e for e in state_events}
  319. # Now we want to update the cache with all the things we fetched
  320. # from the database.
  321. for group, state_ids in group_state_dict.items():
  322. if types:
  323. # We delibrately put key -> None mappings into the cache to
  324. # cache absence of the key, on the assumption that if we've
  325. # explicitly asked for some types then we will probably ask
  326. # for them again.
  327. state_dict = {key: None for key in types}
  328. state_dict.update(results[group])
  329. results[group] = state_dict
  330. else:
  331. state_dict = results[group]
  332. for event_id in state_ids:
  333. try:
  334. state_event = state_events[event_id]
  335. state_dict[(state_event.type, state_event.state_key)] = state_event
  336. except KeyError:
  337. # Hmm. So we do don't have that state event? Interesting.
  338. logger.warn(
  339. "Can't find state event %r for state group %r",
  340. event_id, group,
  341. )
  342. self._state_group_cache.update(
  343. cache_seq_num,
  344. key=group,
  345. value=state_dict,
  346. full=(types is None),
  347. )
  348. # Remove all the entries with None values. The None values were just
  349. # used for bookkeeping in the cache.
  350. for group, state_dict in results.items():
  351. results[group] = {
  352. key: event for key, event in state_dict.items() if event
  353. }
  354. defer.returnValue(results)