background_process_metrics.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354
  1. # Copyright 2018 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import threading
  16. from contextlib import nullcontext
  17. from functools import wraps
  18. from types import TracebackType
  19. from typing import (
  20. TYPE_CHECKING,
  21. Any,
  22. Awaitable,
  23. Callable,
  24. Dict,
  25. Iterable,
  26. Optional,
  27. Set,
  28. Type,
  29. TypeVar,
  30. Union,
  31. )
  32. from prometheus_client import Metric
  33. from prometheus_client.core import REGISTRY, Counter, Gauge
  34. from typing_extensions import ParamSpec
  35. from twisted.internet import defer
  36. from synapse.logging.context import (
  37. ContextResourceUsage,
  38. LoggingContext,
  39. PreserveLoggingContext,
  40. )
  41. from synapse.logging.opentracing import SynapseTags, start_active_span
  42. from synapse.metrics._types import Collector
  43. if TYPE_CHECKING:
  44. import resource
  45. logger = logging.getLogger(__name__)
  46. _background_process_start_count = Counter(
  47. "synapse_background_process_start_count",
  48. "Number of background processes started",
  49. ["name"],
  50. )
  51. _background_process_in_flight_count = Gauge(
  52. "synapse_background_process_in_flight_count",
  53. "Number of background processes in flight",
  54. labelnames=["name"],
  55. )
  56. # we set registry=None in all of these to stop them getting registered with
  57. # the default registry. Instead we collect them all via the CustomCollector,
  58. # which ensures that we can update them before they are collected.
  59. #
  60. _background_process_ru_utime = Counter(
  61. "synapse_background_process_ru_utime_seconds",
  62. "User CPU time used by background processes, in seconds",
  63. ["name"],
  64. registry=None,
  65. )
  66. _background_process_ru_stime = Counter(
  67. "synapse_background_process_ru_stime_seconds",
  68. "System CPU time used by background processes, in seconds",
  69. ["name"],
  70. registry=None,
  71. )
  72. _background_process_db_txn_count = Counter(
  73. "synapse_background_process_db_txn_count",
  74. "Number of database transactions done by background processes",
  75. ["name"],
  76. registry=None,
  77. )
  78. _background_process_db_txn_duration = Counter(
  79. "synapse_background_process_db_txn_duration_seconds",
  80. (
  81. "Seconds spent by background processes waiting for database "
  82. "transactions, excluding scheduling time"
  83. ),
  84. ["name"],
  85. registry=None,
  86. )
  87. _background_process_db_sched_duration = Counter(
  88. "synapse_background_process_db_sched_duration_seconds",
  89. "Seconds spent by background processes waiting for database connections",
  90. ["name"],
  91. registry=None,
  92. )
  93. # map from description to a counter, so that we can name our logcontexts
  94. # incrementally. (It actually duplicates _background_process_start_count, but
  95. # it's much simpler to do so than to try to combine them.)
  96. _background_process_counts: Dict[str, int] = {}
  97. # Set of all running background processes that became active active since the
  98. # last time metrics were scraped (i.e. background processes that performed some
  99. # work since the last scrape.)
  100. #
  101. # We do it like this to handle the case where we have a large number of
  102. # background processes stacking up behind a lock or linearizer, where we then
  103. # only need to iterate over and update metrics for the process that have
  104. # actually been active and can ignore the idle ones.
  105. _background_processes_active_since_last_scrape: "Set[_BackgroundProcess]" = set()
  106. # A lock that covers the above set and dict
  107. _bg_metrics_lock = threading.Lock()
  108. class _Collector(Collector):
  109. """A custom metrics collector for the background process metrics.
  110. Ensures that all of the metrics are up-to-date with any in-flight processes
  111. before they are returned.
  112. """
  113. def collect(self) -> Iterable[Metric]:
  114. global _background_processes_active_since_last_scrape
  115. # We swap out the _background_processes set with an empty one so that
  116. # we can safely iterate over the set without holding the lock.
  117. with _bg_metrics_lock:
  118. _background_processes_copy = _background_processes_active_since_last_scrape
  119. _background_processes_active_since_last_scrape = set()
  120. for process in _background_processes_copy:
  121. process.update_metrics()
  122. # now we need to run collect() over each of the static Counters, and
  123. # yield each metric they return.
  124. for m in (
  125. _background_process_ru_utime,
  126. _background_process_ru_stime,
  127. _background_process_db_txn_count,
  128. _background_process_db_txn_duration,
  129. _background_process_db_sched_duration,
  130. ):
  131. yield from m.collect()
  132. REGISTRY.register(_Collector())
  133. class _BackgroundProcess:
  134. def __init__(self, desc: str, ctx: LoggingContext):
  135. self.desc = desc
  136. self._context = ctx
  137. self._reported_stats: Optional[ContextResourceUsage] = None
  138. def update_metrics(self) -> None:
  139. """Updates the metrics with values from this process."""
  140. new_stats = self._context.get_resource_usage()
  141. if self._reported_stats is None:
  142. diff = new_stats
  143. else:
  144. diff = new_stats - self._reported_stats
  145. self._reported_stats = new_stats
  146. # For unknown reasons, the difference in times can be negative. See comment in
  147. # synapse.http.request_metrics.RequestMetrics.update_metrics.
  148. _background_process_ru_utime.labels(self.desc).inc(max(diff.ru_utime, 0))
  149. _background_process_ru_stime.labels(self.desc).inc(max(diff.ru_stime, 0))
  150. _background_process_db_txn_count.labels(self.desc).inc(diff.db_txn_count)
  151. _background_process_db_txn_duration.labels(self.desc).inc(
  152. diff.db_txn_duration_sec
  153. )
  154. _background_process_db_sched_duration.labels(self.desc).inc(
  155. diff.db_sched_duration_sec
  156. )
  157. R = TypeVar("R")
  158. def run_as_background_process(
  159. desc: str,
  160. func: Callable[..., Awaitable[Optional[R]]],
  161. *args: Any,
  162. bg_start_span: bool = True,
  163. **kwargs: Any,
  164. ) -> "defer.Deferred[Optional[R]]":
  165. """Run the given function in its own logcontext, with resource metrics
  166. This should be used to wrap processes which are fired off to run in the
  167. background, instead of being associated with a particular request.
  168. It returns a Deferred which completes when the function completes, but it doesn't
  169. follow the synapse logcontext rules, which makes it appropriate for passing to
  170. clock.looping_call and friends (or for firing-and-forgetting in the middle of a
  171. normal synapse async function).
  172. Args:
  173. desc: a description for this background process type
  174. func: a function, which may return a Deferred or a coroutine
  175. bg_start_span: Whether to start an opentracing span. Defaults to True.
  176. Should only be disabled for processes that will not log to or tag
  177. a span.
  178. args: positional args for func
  179. kwargs: keyword args for func
  180. Returns:
  181. Deferred which returns the result of func, or `None` if func raises.
  182. Note that the returned Deferred does not follow the synapse logcontext
  183. rules.
  184. """
  185. async def run() -> Optional[R]:
  186. with _bg_metrics_lock:
  187. count = _background_process_counts.get(desc, 0)
  188. _background_process_counts[desc] = count + 1
  189. _background_process_start_count.labels(desc).inc()
  190. _background_process_in_flight_count.labels(desc).inc()
  191. with BackgroundProcessLoggingContext(desc, count) as context:
  192. try:
  193. if bg_start_span:
  194. ctx = start_active_span(
  195. f"bgproc.{desc}", tags={SynapseTags.REQUEST_ID: str(context)}
  196. )
  197. else:
  198. ctx = nullcontext() # type: ignore[assignment]
  199. with ctx:
  200. return await func(*args, **kwargs)
  201. except Exception:
  202. logger.exception(
  203. "Background process '%s' threw an exception",
  204. desc,
  205. )
  206. return None
  207. finally:
  208. _background_process_in_flight_count.labels(desc).dec()
  209. with PreserveLoggingContext():
  210. # Note that we return a Deferred here so that it can be used in a
  211. # looping_call and other places that expect a Deferred.
  212. return defer.ensureDeferred(run())
  213. P = ParamSpec("P")
  214. def wrap_as_background_process(
  215. desc: str,
  216. ) -> Callable[
  217. [Callable[P, Awaitable[Optional[R]]]],
  218. Callable[P, "defer.Deferred[Optional[R]]"],
  219. ]:
  220. """Decorator that wraps an asynchronous function `func`, returning a synchronous
  221. decorated function. Calling the decorated version runs `func` as a background
  222. process, forwarding all arguments verbatim.
  223. That is,
  224. @wrap_as_background_process
  225. def func(*args): ...
  226. func(1, 2, third=3)
  227. is equivalent to:
  228. def func(*args): ...
  229. run_as_background_process(func, 1, 2, third=3)
  230. The former can be convenient if `func` needs to be run as a background process in
  231. multiple places.
  232. """
  233. def wrap_as_background_process_inner(
  234. func: Callable[P, Awaitable[Optional[R]]]
  235. ) -> Callable[P, "defer.Deferred[Optional[R]]"]:
  236. @wraps(func)
  237. def wrap_as_background_process_inner_2(
  238. *args: P.args, **kwargs: P.kwargs
  239. ) -> "defer.Deferred[Optional[R]]":
  240. # type-ignore: mypy is confusing kwargs with the bg_start_span kwarg.
  241. # Argument 4 to "run_as_background_process" has incompatible type
  242. # "**P.kwargs"; expected "bool"
  243. # See https://github.com/python/mypy/issues/8862
  244. return run_as_background_process(desc, func, *args, **kwargs) # type: ignore[arg-type]
  245. return wrap_as_background_process_inner_2
  246. return wrap_as_background_process_inner
  247. class BackgroundProcessLoggingContext(LoggingContext):
  248. """A logging context that tracks in flight metrics for background
  249. processes.
  250. """
  251. __slots__ = ["_proc"]
  252. def __init__(self, name: str, instance_id: Optional[Union[int, str]] = None):
  253. """
  254. Args:
  255. name: The name of the background process. Each distinct `name` gets a
  256. separate prometheus time series.
  257. instance_id: an identifer to add to `name` to distinguish this instance of
  258. the named background process in the logs. If this is `None`, one is
  259. made up based on id(self).
  260. """
  261. if instance_id is None:
  262. instance_id = id(self)
  263. super().__init__("%s-%s" % (name, instance_id))
  264. self._proc = _BackgroundProcess(name, self)
  265. def start(self, rusage: "Optional[resource.struct_rusage]") -> None:
  266. """Log context has started running (again)."""
  267. super().start(rusage)
  268. # We've become active again so we make sure we're in the list of active
  269. # procs. (Note that "start" here means we've become active, as opposed
  270. # to starting for the first time.)
  271. with _bg_metrics_lock:
  272. _background_processes_active_since_last_scrape.add(self._proc)
  273. def __exit__(
  274. self,
  275. type: Optional[Type[BaseException]],
  276. value: Optional[BaseException],
  277. traceback: Optional[TracebackType],
  278. ) -> None:
  279. """Log context has finished."""
  280. super().__exit__(type, value, traceback)
  281. # The background process has finished. We explicitly remove and manually
  282. # update the metrics here so that if nothing is scraping metrics the set
  283. # doesn't infinitely grow.
  284. with _bg_metrics_lock:
  285. _background_processes_active_since_last_scrape.discard(self._proc)
  286. self._proc.update_metrics()