snapshot.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from typing import Optional, Union
  16. from six import iteritems
  17. import attr
  18. from frozendict import frozendict
  19. from twisted.internet import defer
  20. from synapse.appservice import ApplicationService
  21. from synapse.logging.context import make_deferred_yieldable, run_in_background
  22. from synapse.types import StateMap
  23. @attr.s(slots=True)
  24. class EventContext:
  25. """
  26. Holds information relevant to persisting an event
  27. Attributes:
  28. rejected: A rejection reason if the event was rejected, else False
  29. _state_group: The ID of the state group for this event. Note that state events
  30. are persisted with a state group which includes the new event, so this is
  31. effectively the state *after* the event in question.
  32. For a *rejected* state event, where the state of the rejected event is
  33. ignored, this state_group should never make it into the
  34. event_to_state_groups table. Indeed, inspecting this value for a rejected
  35. state event is almost certainly incorrect.
  36. For an outlier, where we don't have the state at the event, this will be
  37. None.
  38. Note that this is a private attribute: it should be accessed via
  39. the ``state_group`` property.
  40. state_group_before_event: The ID of the state group representing the state
  41. of the room before this event.
  42. If this is a non-state event, this will be the same as ``state_group``. If
  43. it's a state event, it will be the same as ``prev_group``.
  44. If ``state_group`` is None (ie, the event is an outlier),
  45. ``state_group_before_event`` will always also be ``None``.
  46. prev_group: If it is known, ``state_group``'s prev_group. Note that this being
  47. None does not necessarily mean that ``state_group`` does not have
  48. a prev_group!
  49. If the event is a state event, this is normally the same as ``prev_group``.
  50. If ``state_group`` is None (ie, the event is an outlier), ``prev_group``
  51. will always also be ``None``.
  52. Note that this *not* (necessarily) the state group associated with
  53. ``_prev_state_ids``.
  54. delta_ids: If ``prev_group`` is not None, the state delta between ``prev_group``
  55. and ``state_group``.
  56. app_service: If this event is being sent by a (local) application service, that
  57. app service.
  58. _current_state_ids: The room state map, including this event - ie, the state
  59. in ``state_group``.
  60. (type, state_key) -> event_id
  61. FIXME: what is this for an outlier? it seems ill-defined. It seems like
  62. it could be either {}, or the state we were given by the remote
  63. server, depending on $THINGS
  64. Note that this is a private attribute: it should be accessed via
  65. ``get_current_state_ids``. _AsyncEventContext impl calculates this
  66. on-demand: it will be None until that happens.
  67. _prev_state_ids: The room state map, excluding this event - ie, the state
  68. in ``state_group_before_event``. For a non-state
  69. event, this will be the same as _current_state_events.
  70. Note that it is a completely different thing to prev_group!
  71. (type, state_key) -> event_id
  72. FIXME: again, what is this for an outlier?
  73. As with _current_state_ids, this is a private attribute. It should be
  74. accessed via get_prev_state_ids.
  75. """
  76. rejected = attr.ib(default=False, type=Union[bool, str])
  77. _state_group = attr.ib(default=None, type=Optional[int])
  78. state_group_before_event = attr.ib(default=None, type=Optional[int])
  79. prev_group = attr.ib(default=None, type=Optional[int])
  80. delta_ids = attr.ib(default=None, type=Optional[StateMap[str]])
  81. app_service = attr.ib(default=None, type=Optional[ApplicationService])
  82. _current_state_ids = attr.ib(default=None, type=Optional[StateMap[str]])
  83. _prev_state_ids = attr.ib(default=None, type=Optional[StateMap[str]])
  84. @staticmethod
  85. def with_state(
  86. state_group,
  87. state_group_before_event,
  88. current_state_ids,
  89. prev_state_ids,
  90. prev_group=None,
  91. delta_ids=None,
  92. ):
  93. return EventContext(
  94. current_state_ids=current_state_ids,
  95. prev_state_ids=prev_state_ids,
  96. state_group=state_group,
  97. state_group_before_event=state_group_before_event,
  98. prev_group=prev_group,
  99. delta_ids=delta_ids,
  100. )
  101. @defer.inlineCallbacks
  102. def serialize(self, event, store):
  103. """Converts self to a type that can be serialized as JSON, and then
  104. deserialized by `deserialize`
  105. Args:
  106. event (FrozenEvent): The event that this context relates to
  107. Returns:
  108. dict
  109. """
  110. # We don't serialize the full state dicts, instead they get pulled out
  111. # of the DB on the other side. However, the other side can't figure out
  112. # the prev_state_ids, so if we're a state event we include the event
  113. # id that we replaced in the state.
  114. if event.is_state():
  115. prev_state_ids = yield self.get_prev_state_ids()
  116. prev_state_id = prev_state_ids.get((event.type, event.state_key))
  117. else:
  118. prev_state_id = None
  119. return {
  120. "prev_state_id": prev_state_id,
  121. "event_type": event.type,
  122. "event_state_key": event.state_key if event.is_state() else None,
  123. "state_group": self._state_group,
  124. "state_group_before_event": self.state_group_before_event,
  125. "rejected": self.rejected,
  126. "prev_group": self.prev_group,
  127. "delta_ids": _encode_state_dict(self.delta_ids),
  128. "app_service_id": self.app_service.id if self.app_service else None,
  129. }
  130. @staticmethod
  131. def deserialize(storage, input):
  132. """Converts a dict that was produced by `serialize` back into a
  133. EventContext.
  134. Args:
  135. storage (Storage): Used to convert AS ID to AS object and fetch
  136. state.
  137. input (dict): A dict produced by `serialize`
  138. Returns:
  139. EventContext
  140. """
  141. context = _AsyncEventContextImpl(
  142. # We use the state_group and prev_state_id stuff to pull the
  143. # current_state_ids out of the DB and construct prev_state_ids.
  144. storage=storage,
  145. prev_state_id=input["prev_state_id"],
  146. event_type=input["event_type"],
  147. event_state_key=input["event_state_key"],
  148. state_group=input["state_group"],
  149. state_group_before_event=input["state_group_before_event"],
  150. prev_group=input["prev_group"],
  151. delta_ids=_decode_state_dict(input["delta_ids"]),
  152. rejected=input["rejected"],
  153. )
  154. app_service_id = input["app_service_id"]
  155. if app_service_id:
  156. context.app_service = storage.main.get_app_service_by_id(app_service_id)
  157. return context
  158. @property
  159. def state_group(self) -> Optional[int]:
  160. """The ID of the state group for this event.
  161. Note that state events are persisted with a state group which includes the new
  162. event, so this is effectively the state *after* the event in question.
  163. For an outlier, where we don't have the state at the event, this will be None.
  164. It is an error to access this for a rejected event, since rejected state should
  165. not make it into the room state. Accessing this property will raise an exception
  166. if ``rejected`` is set.
  167. """
  168. if self.rejected:
  169. raise RuntimeError("Attempt to access state_group of rejected event")
  170. return self._state_group
  171. @defer.inlineCallbacks
  172. def get_current_state_ids(self):
  173. """
  174. Gets the room state map, including this event - ie, the state in ``state_group``
  175. It is an error to access this for a rejected event, since rejected state should
  176. not make it into the room state. This method will raise an exception if
  177. ``rejected`` is set.
  178. Returns:
  179. Deferred[dict[(str, str), str]|None]: Returns None if state_group
  180. is None, which happens when the associated event is an outlier.
  181. Maps a (type, state_key) to the event ID of the state event matching
  182. this tuple.
  183. """
  184. if self.rejected:
  185. raise RuntimeError("Attempt to access state_ids of rejected event")
  186. yield self._ensure_fetched()
  187. return self._current_state_ids
  188. @defer.inlineCallbacks
  189. def get_prev_state_ids(self):
  190. """
  191. Gets the room state map, excluding this event.
  192. For a non-state event, this will be the same as get_current_state_ids().
  193. Returns:
  194. Deferred[dict[(str, str), str]|None]: Returns None if state_group
  195. is None, which happens when the associated event is an outlier.
  196. Maps a (type, state_key) to the event ID of the state event matching
  197. this tuple.
  198. """
  199. yield self._ensure_fetched()
  200. return self._prev_state_ids
  201. def get_cached_current_state_ids(self):
  202. """Gets the current state IDs if we have them already cached.
  203. It is an error to access this for a rejected event, since rejected state should
  204. not make it into the room state. This method will raise an exception if
  205. ``rejected`` is set.
  206. Returns:
  207. dict[(str, str), str]|None: Returns None if we haven't cached the
  208. state or if state_group is None, which happens when the associated
  209. event is an outlier.
  210. """
  211. if self.rejected:
  212. raise RuntimeError("Attempt to access state_ids of rejected event")
  213. return self._current_state_ids
  214. def _ensure_fetched(self):
  215. return defer.succeed(None)
  216. @attr.s(slots=True)
  217. class _AsyncEventContextImpl(EventContext):
  218. """
  219. An implementation of EventContext which fetches _current_state_ids and
  220. _prev_state_ids from the database on demand.
  221. Attributes:
  222. _storage (Storage)
  223. _fetching_state_deferred (Deferred|None): Resolves when *_state_ids have
  224. been calculated. None if we haven't started calculating yet
  225. _event_type (str): The type of the event the context is associated with.
  226. _event_state_key (str): The state_key of the event the context is
  227. associated with.
  228. _prev_state_id (str|None): If the event associated with the context is
  229. a state event, then `_prev_state_id` is the event_id of the state
  230. that was replaced.
  231. """
  232. # This needs to have a default as we're inheriting
  233. _storage = attr.ib(default=None)
  234. _prev_state_id = attr.ib(default=None)
  235. _event_type = attr.ib(default=None)
  236. _event_state_key = attr.ib(default=None)
  237. _fetching_state_deferred = attr.ib(default=None)
  238. def _ensure_fetched(self):
  239. if not self._fetching_state_deferred:
  240. self._fetching_state_deferred = run_in_background(self._fill_out_state)
  241. return make_deferred_yieldable(self._fetching_state_deferred)
  242. @defer.inlineCallbacks
  243. def _fill_out_state(self):
  244. """Called to populate the _current_state_ids and _prev_state_ids
  245. attributes by loading from the database.
  246. """
  247. if self.state_group is None:
  248. return
  249. self._current_state_ids = yield self._storage.state.get_state_ids_for_group(
  250. self.state_group
  251. )
  252. if self._prev_state_id and self._event_state_key is not None:
  253. self._prev_state_ids = dict(self._current_state_ids)
  254. key = (self._event_type, self._event_state_key)
  255. self._prev_state_ids[key] = self._prev_state_id
  256. else:
  257. self._prev_state_ids = self._current_state_ids
  258. def _encode_state_dict(state_dict):
  259. """Since dicts of (type, state_key) -> event_id cannot be serialized in
  260. JSON we need to convert them to a form that can.
  261. """
  262. if state_dict is None:
  263. return None
  264. return [(etype, state_key, v) for (etype, state_key), v in iteritems(state_dict)]
  265. def _decode_state_dict(input):
  266. """Decodes a state dict encoded using `_encode_state_dict` above
  267. """
  268. if input is None:
  269. return None
  270. return frozendict({(etype, state_key): v for etype, state_key, v in input})