stream_change_cache.py 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from synapse.util.caches import register_cache, CACHE_SIZE_FACTOR
  16. from blist import sorteddict
  17. import logging
  18. logger = logging.getLogger(__name__)
  19. class StreamChangeCache(object):
  20. """Keeps track of the stream positions of the latest change in a set of entities.
  21. Typically the entity will be a room or user id.
  22. Given a list of entities and a stream position, it will give a subset of
  23. entities that may have changed since that position. If position key is too
  24. old then the cache will simply return all given entities.
  25. """
  26. def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache={}):
  27. self._max_size = int(max_size * CACHE_SIZE_FACTOR)
  28. self._entity_to_key = {}
  29. self._cache = sorteddict()
  30. self._earliest_known_stream_pos = current_stream_pos
  31. self.name = name
  32. self.metrics = register_cache("cache", self.name, self._cache)
  33. for entity, stream_pos in prefilled_cache.items():
  34. self.entity_has_changed(entity, stream_pos)
  35. def has_entity_changed(self, entity, stream_pos):
  36. """Returns True if the entity may have been updated since stream_pos
  37. """
  38. assert type(stream_pos) is int or type(stream_pos) is long
  39. if stream_pos < self._earliest_known_stream_pos:
  40. self.metrics.inc_misses()
  41. return True
  42. latest_entity_change_pos = self._entity_to_key.get(entity, None)
  43. if latest_entity_change_pos is None:
  44. self.metrics.inc_hits()
  45. return False
  46. if stream_pos < latest_entity_change_pos:
  47. self.metrics.inc_misses()
  48. return True
  49. self.metrics.inc_hits()
  50. return False
  51. def get_entities_changed(self, entities, stream_pos):
  52. """Returns subset of entities that have had new things since the
  53. given position. If the position is too old it will just return the given list.
  54. """
  55. assert type(stream_pos) is int
  56. if stream_pos >= self._earliest_known_stream_pos:
  57. keys = self._cache.keys()
  58. i = keys.bisect_right(stream_pos)
  59. result = set(
  60. self._cache[k] for k in keys[i:]
  61. ).intersection(entities)
  62. self.metrics.inc_hits()
  63. else:
  64. result = entities
  65. self.metrics.inc_misses()
  66. return result
  67. def has_any_entity_changed(self, stream_pos):
  68. """Returns if any entity has changed
  69. """
  70. assert type(stream_pos) is int
  71. if stream_pos >= self._earliest_known_stream_pos:
  72. self.metrics.inc_hits()
  73. keys = self._cache.keys()
  74. i = keys.bisect_right(stream_pos)
  75. return i < len(keys)
  76. else:
  77. self.metrics.inc_misses()
  78. return True
  79. def get_all_entities_changed(self, stream_pos):
  80. """Returns all entites that have had new things since the given
  81. position. If the position is too old it will return None.
  82. """
  83. assert type(stream_pos) is int
  84. if stream_pos >= self._earliest_known_stream_pos:
  85. keys = self._cache.keys()
  86. i = keys.bisect_right(stream_pos)
  87. return [self._cache[k] for k in keys[i:]]
  88. else:
  89. return None
  90. def entity_has_changed(self, entity, stream_pos):
  91. """Informs the cache that the entity has been changed at the given
  92. position.
  93. """
  94. assert type(stream_pos) is int
  95. if stream_pos > self._earliest_known_stream_pos:
  96. old_pos = self._entity_to_key.get(entity, None)
  97. if old_pos is not None:
  98. stream_pos = max(stream_pos, old_pos)
  99. self._cache.pop(old_pos, None)
  100. self._cache[stream_pos] = entity
  101. self._entity_to_key[entity] = stream_pos
  102. while len(self._cache) > self._max_size:
  103. k, r = self._cache.popitem()
  104. self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos)
  105. self._entity_to_key.pop(r, None)
  106. def get_max_pos_of_last_change(self, entity):
  107. """Returns an upper bound of the stream id of the last change to an
  108. entity.
  109. """
  110. return self._entity_to_key.get(entity, self._earliest_known_stream_pos)