response_cache.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import (
  16. TYPE_CHECKING,
  17. Any,
  18. Awaitable,
  19. Callable,
  20. Dict,
  21. Generic,
  22. Iterable,
  23. Optional,
  24. TypeVar,
  25. )
  26. import attr
  27. from twisted.internet import defer
  28. from synapse.logging.context import make_deferred_yieldable, run_in_background
  29. from synapse.logging.opentracing import (
  30. active_span,
  31. start_active_span,
  32. start_active_span_follows_from,
  33. )
  34. from synapse.util import Clock
  35. from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred
  36. from synapse.util.caches import EvictionReason, register_cache
  37. logger = logging.getLogger(__name__)
  38. if TYPE_CHECKING:
  39. import opentracing
  40. # the type of the key in the cache
  41. KV = TypeVar("KV")
  42. # the type of the result from the operation
  43. RV = TypeVar("RV")
  44. @attr.s(auto_attribs=True)
  45. class ResponseCacheContext(Generic[KV]):
  46. """Information about a missed ResponseCache hit
  47. This object can be passed into the callback for additional feedback
  48. """
  49. cache_key: KV
  50. """The cache key that caused the cache miss
  51. This should be considered read-only.
  52. TODO: in attrs 20.1, make it frozen with an on_setattr.
  53. """
  54. should_cache: bool = True
  55. """Whether the result should be cached once the request completes.
  56. This can be modified by the callback if it decides its result should not be cached.
  57. """
  58. @attr.s(auto_attribs=True)
  59. class ResponseCacheEntry:
  60. result: AbstractObservableDeferred
  61. """The (possibly incomplete) result of the operation.
  62. Note that we continue to store an ObservableDeferred even after the operation
  63. completes (rather than switching to an immediate value), since that makes it
  64. easier to cache Failure results.
  65. """
  66. opentracing_span_context: "Optional[opentracing.SpanContext]"
  67. """The opentracing span which generated/is generating the result"""
  68. class ResponseCache(Generic[KV]):
  69. """
  70. This caches a deferred response. Until the deferred completes it will be
  71. returned from the cache. This means that if the client retries the request
  72. while the response is still being computed, that original response will be
  73. used rather than trying to compute a new response.
  74. """
  75. def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
  76. self._result_cache: Dict[KV, ResponseCacheEntry] = {}
  77. self.clock = clock
  78. self.timeout_sec = timeout_ms / 1000.0
  79. self._name = name
  80. self._metrics = register_cache("response_cache", name, self, resizable=False)
  81. def size(self) -> int:
  82. return len(self._result_cache)
  83. def __len__(self) -> int:
  84. return self.size()
  85. def keys(self) -> Iterable[KV]:
  86. """Get the keys currently in the result cache
  87. Returns both incomplete entries, and (if the timeout on this cache is non-zero),
  88. complete entries which are still in the cache.
  89. Note that the returned iterator is not safe in the face of concurrent execution:
  90. behaviour is undefined if `wrap` is called during iteration.
  91. """
  92. return self._result_cache.keys()
  93. def _get(self, key: KV) -> Optional[ResponseCacheEntry]:
  94. """Look up the given key.
  95. Args:
  96. key: key to get in the cache
  97. Returns:
  98. The entry for this key, if any; else None.
  99. """
  100. entry = self._result_cache.get(key)
  101. if entry is not None:
  102. self._metrics.inc_hits()
  103. return entry
  104. else:
  105. self._metrics.inc_misses()
  106. return None
  107. def _set(
  108. self,
  109. context: ResponseCacheContext[KV],
  110. deferred: "defer.Deferred[RV]",
  111. opentracing_span_context: "Optional[opentracing.SpanContext]",
  112. ) -> ResponseCacheEntry:
  113. """Set the entry for the given key to the given deferred.
  114. *deferred* should run its callbacks in the sentinel logcontext (ie,
  115. you should wrap normal synapse deferreds with
  116. synapse.logging.context.run_in_background).
  117. Args:
  118. context: Information about the cache miss
  119. deferred: The deferred which resolves to the result.
  120. opentracing_span_context: An opentracing span wrapping the calculation
  121. Returns:
  122. The cache entry object.
  123. """
  124. result = ObservableDeferred(deferred, consumeErrors=True)
  125. key = context.cache_key
  126. entry = ResponseCacheEntry(result, opentracing_span_context)
  127. self._result_cache[key] = entry
  128. def on_complete(r: RV) -> RV:
  129. # if this cache has a non-zero timeout, and the callback has not cleared
  130. # the should_cache bit, we leave it in the cache for now and schedule
  131. # its removal later.
  132. if self.timeout_sec and context.should_cache:
  133. self.clock.call_later(self.timeout_sec, self._entry_timeout, key)
  134. else:
  135. # otherwise, remove the result immediately.
  136. self.unset(key)
  137. return r
  138. # make sure we do this *after* adding the entry to result_cache,
  139. # in case the result is already complete (in which case flipping the order would
  140. # leave us with a stuck entry in the cache).
  141. result.addBoth(on_complete)
  142. return entry
  143. def unset(self, key: KV) -> None:
  144. """Remove the cached value for this key from the cache, if any.
  145. Args:
  146. key: key used to remove the cached value
  147. """
  148. self._metrics.inc_evictions(EvictionReason.invalidation)
  149. self._result_cache.pop(key, None)
  150. def _entry_timeout(self, key: KV) -> None:
  151. """For the call_later to remove from the cache"""
  152. self._metrics.inc_evictions(EvictionReason.time)
  153. self._result_cache.pop(key, None)
  154. async def wrap(
  155. self,
  156. key: KV,
  157. callback: Callable[..., Awaitable[RV]],
  158. *args: Any,
  159. cache_context: bool = False,
  160. **kwargs: Any,
  161. ) -> RV:
  162. """Wrap together a *get* and *set* call, taking care of logcontexts
  163. First looks up the key in the cache, and if it is present makes it
  164. follow the synapse logcontext rules and returns it.
  165. Otherwise, makes a call to *callback(*args, **kwargs)*, which should
  166. follow the synapse logcontext rules, and adds the result to the cache.
  167. Example usage:
  168. async def handle_request(request):
  169. # etc
  170. return result
  171. result = await response_cache.wrap(
  172. key,
  173. handle_request,
  174. request,
  175. )
  176. Args:
  177. key: key to get/set in the cache
  178. callback: function to call if the key is not found in
  179. the cache
  180. *args: positional parameters to pass to the callback, if it is used
  181. cache_context: if set, the callback will be given a `cache_context` kw arg,
  182. which will be a ResponseCacheContext object.
  183. **kwargs: named parameters to pass to the callback, if it is used
  184. Returns:
  185. The result of the callback (from the cache, or otherwise)
  186. """
  187. entry = self._get(key)
  188. if not entry:
  189. logger.debug(
  190. "[%s]: no cached result for [%s], calculating new one", self._name, key
  191. )
  192. context = ResponseCacheContext(cache_key=key)
  193. if cache_context:
  194. kwargs["cache_context"] = context
  195. span_context: Optional[opentracing.SpanContext] = None
  196. async def cb() -> RV:
  197. # NB it is important that we do not `await` before setting span_context!
  198. nonlocal span_context
  199. with start_active_span(f"ResponseCache[{self._name}].calculate"):
  200. span = active_span()
  201. if span:
  202. span_context = span.context
  203. return await callback(*args, **kwargs)
  204. d = run_in_background(cb)
  205. entry = self._set(context, d, span_context)
  206. return await make_deferred_yieldable(entry.result.observe())
  207. result = entry.result.observe()
  208. if result.called:
  209. logger.info("[%s]: using completed cached result for [%s]", self._name, key)
  210. else:
  211. logger.info(
  212. "[%s]: using incomplete cached result for [%s]", self._name, key
  213. )
  214. span_context = entry.opentracing_span_context
  215. with start_active_span_follows_from(
  216. f"ResponseCache[{self._name}].wait",
  217. contexts=(span_context,) if span_context else (),
  218. ):
  219. return await make_deferred_yieldable(result)