stream_change_cache.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import math
  17. from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union
  18. from sortedcontainers import SortedDict
  19. from synapse.types import Collection
  20. from synapse.util import caches
  21. logger = logging.getLogger(__name__)
  22. # for now, assume all entities in the cache are strings
  23. EntityType = str
  24. class StreamChangeCache:
  25. """Keeps track of the stream positions of the latest change in a set of entities.
  26. Typically the entity will be a room or user id.
  27. Given a list of entities and a stream position, it will give a subset of
  28. entities that may have changed since that position. If position key is too
  29. old then the cache will simply return all given entities.
  30. """
  31. def __init__(
  32. self,
  33. name: str,
  34. current_stream_pos: int,
  35. max_size=10000,
  36. prefilled_cache: Optional[Mapping[EntityType, int]] = None,
  37. ):
  38. self._original_max_size = max_size
  39. self._max_size = math.floor(max_size)
  40. self._entity_to_key = {} # type: Dict[EntityType, int]
  41. # map from stream id to the a set of entities which changed at that stream id.
  42. self._cache = SortedDict() # type: SortedDict[int, Set[EntityType]]
  43. # the earliest stream_pos for which we can reliably answer
  44. # get_all_entities_changed. In other words, one less than the earliest
  45. # stream_pos for which we know _cache is valid.
  46. #
  47. self._earliest_known_stream_pos = current_stream_pos
  48. self.name = name
  49. self.metrics = caches.register_cache(
  50. "cache", self.name, self._cache, resize_callback=self.set_cache_factor
  51. )
  52. if prefilled_cache:
  53. for entity, stream_pos in prefilled_cache.items():
  54. self.entity_has_changed(entity, stream_pos)
  55. def set_cache_factor(self, factor: float) -> bool:
  56. """
  57. Set the cache factor for this individual cache.
  58. This will trigger a resize if it changes, which may require evicting
  59. items from the cache.
  60. Returns:
  61. bool: Whether the cache changed size or not.
  62. """
  63. new_size = math.floor(self._original_max_size * factor)
  64. if new_size != self._max_size:
  65. self.max_size = new_size
  66. self._evict()
  67. return True
  68. return False
  69. def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
  70. """Returns True if the entity may have been updated since stream_pos
  71. """
  72. assert isinstance(stream_pos, int)
  73. if stream_pos < self._earliest_known_stream_pos:
  74. self.metrics.inc_misses()
  75. return True
  76. latest_entity_change_pos = self._entity_to_key.get(entity, None)
  77. if latest_entity_change_pos is None:
  78. self.metrics.inc_hits()
  79. return False
  80. if stream_pos < latest_entity_change_pos:
  81. self.metrics.inc_misses()
  82. return True
  83. self.metrics.inc_hits()
  84. return False
  85. def get_entities_changed(
  86. self, entities: Collection[EntityType], stream_pos: int
  87. ) -> Union[Set[EntityType], FrozenSet[EntityType]]:
  88. """
  89. Returns subset of entities that have had new things since the given
  90. position. Entities unknown to the cache will be returned. If the
  91. position is too old it will just return the given list.
  92. """
  93. changed_entities = self.get_all_entities_changed(stream_pos)
  94. if changed_entities is not None:
  95. # We now do an intersection, trying to do so in the most efficient
  96. # way possible (some of these sets are *large*). First check in the
  97. # given iterable is already set that we can reuse, otherwise we
  98. # create a set of the *smallest* of the two iterables and call
  99. # `intersection(..)` on it (this can be twice as fast as the reverse).
  100. if isinstance(entities, (set, frozenset)):
  101. result = entities.intersection(changed_entities)
  102. elif len(changed_entities) < len(entities):
  103. result = set(changed_entities).intersection(entities)
  104. else:
  105. result = set(entities).intersection(changed_entities)
  106. self.metrics.inc_hits()
  107. else:
  108. result = set(entities)
  109. self.metrics.inc_misses()
  110. return result
  111. def has_any_entity_changed(self, stream_pos: int) -> bool:
  112. """Returns if any entity has changed
  113. """
  114. assert type(stream_pos) is int
  115. if not self._cache:
  116. # If the cache is empty, nothing can have changed.
  117. return False
  118. if stream_pos >= self._earliest_known_stream_pos:
  119. self.metrics.inc_hits()
  120. return self._cache.bisect_right(stream_pos) < len(self._cache)
  121. else:
  122. self.metrics.inc_misses()
  123. return True
  124. def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]:
  125. """Returns all entities that have had new things since the given
  126. position. If the position is too old it will return None.
  127. Returns the entities in the order that they were changed.
  128. """
  129. assert type(stream_pos) is int
  130. if stream_pos < self._earliest_known_stream_pos:
  131. return None
  132. changed_entities = [] # type: List[EntityType]
  133. for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)):
  134. changed_entities.extend(self._cache[k])
  135. return changed_entities
  136. def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None:
  137. """Informs the cache that the entity has been changed at the given
  138. position.
  139. """
  140. assert type(stream_pos) is int
  141. if stream_pos <= self._earliest_known_stream_pos:
  142. return
  143. old_pos = self._entity_to_key.get(entity, None)
  144. if old_pos is not None:
  145. if old_pos >= stream_pos:
  146. # nothing to do
  147. return
  148. e = self._cache[old_pos]
  149. e.remove(entity)
  150. if not e:
  151. # cache at this point is now empty
  152. del self._cache[old_pos]
  153. e1 = self._cache.get(stream_pos)
  154. if e1 is None:
  155. e1 = self._cache[stream_pos] = set()
  156. e1.add(entity)
  157. self._entity_to_key[entity] = stream_pos
  158. self._evict()
  159. # if the cache is too big, remove entries
  160. while len(self._cache) > self._max_size:
  161. k, r = self._cache.popitem(0)
  162. self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
  163. for entity in r:
  164. del self._entity_to_key[entity]
  165. def _evict(self):
  166. while len(self._cache) > self._max_size:
  167. k, r = self._cache.popitem(0)
  168. self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
  169. for entity in r:
  170. self._entity_to_key.pop(entity, None)
  171. def get_max_pos_of_last_change(self, entity: EntityType) -> int:
  172. """Returns an upper bound of the stream id of the last change to an
  173. entity.
  174. """
  175. return self._entity_to_key.get(entity, self._earliest_known_stream_pos)