background_process_metrics.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import threading
  17. from asyncio import iscoroutine
  18. from functools import wraps
  19. import six
  20. from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
  21. from twisted.internet import defer
  22. from synapse.logging.context import LoggingContext, PreserveLoggingContext
  23. logger = logging.getLogger(__name__)
  24. _background_process_start_count = Counter(
  25. "synapse_background_process_start_count",
  26. "Number of background processes started",
  27. ["name"],
  28. )
  29. # we set registry=None in all of these to stop them getting registered with
  30. # the default registry. Instead we collect them all via the CustomCollector,
  31. # which ensures that we can update them before they are collected.
  32. #
  33. _background_process_ru_utime = Counter(
  34. "synapse_background_process_ru_utime_seconds",
  35. "User CPU time used by background processes, in seconds",
  36. ["name"],
  37. registry=None,
  38. )
  39. _background_process_ru_stime = Counter(
  40. "synapse_background_process_ru_stime_seconds",
  41. "System CPU time used by background processes, in seconds",
  42. ["name"],
  43. registry=None,
  44. )
  45. _background_process_db_txn_count = Counter(
  46. "synapse_background_process_db_txn_count",
  47. "Number of database transactions done by background processes",
  48. ["name"],
  49. registry=None,
  50. )
  51. _background_process_db_txn_duration = Counter(
  52. "synapse_background_process_db_txn_duration_seconds",
  53. (
  54. "Seconds spent by background processes waiting for database "
  55. "transactions, excluding scheduling time"
  56. ),
  57. ["name"],
  58. registry=None,
  59. )
  60. _background_process_db_sched_duration = Counter(
  61. "synapse_background_process_db_sched_duration_seconds",
  62. "Seconds spent by background processes waiting for database connections",
  63. ["name"],
  64. registry=None,
  65. )
  66. # map from description to a counter, so that we can name our logcontexts
  67. # incrementally. (It actually duplicates _background_process_start_count, but
  68. # it's much simpler to do so than to try to combine them.)
  69. _background_process_counts = dict() # type: dict[str, int]
  70. # map from description to the currently running background processes.
  71. #
  72. # it's kept as a dict of sets rather than a big set so that we can keep track
  73. # of process descriptions that no longer have any active processes.
  74. _background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
  75. # A lock that covers the above dicts
  76. _bg_metrics_lock = threading.Lock()
  77. class _Collector(object):
  78. """A custom metrics collector for the background process metrics.
  79. Ensures that all of the metrics are up-to-date with any in-flight processes
  80. before they are returned.
  81. """
  82. def collect(self):
  83. background_process_in_flight_count = GaugeMetricFamily(
  84. "synapse_background_process_in_flight_count",
  85. "Number of background processes in flight",
  86. labels=["name"],
  87. )
  88. # We copy the dict so that it doesn't change from underneath us.
  89. # We also copy the process lists as that can also change
  90. with _bg_metrics_lock:
  91. _background_processes_copy = {
  92. k: list(v) for k, v in six.iteritems(_background_processes)
  93. }
  94. for desc, processes in six.iteritems(_background_processes_copy):
  95. background_process_in_flight_count.add_metric((desc,), len(processes))
  96. for process in processes:
  97. process.update_metrics()
  98. yield background_process_in_flight_count
  99. # now we need to run collect() over each of the static Counters, and
  100. # yield each metric they return.
  101. for m in (
  102. _background_process_ru_utime,
  103. _background_process_ru_stime,
  104. _background_process_db_txn_count,
  105. _background_process_db_txn_duration,
  106. _background_process_db_sched_duration,
  107. ):
  108. for r in m.collect():
  109. yield r
  110. REGISTRY.register(_Collector())
  111. class _BackgroundProcess(object):
  112. def __init__(self, desc, ctx):
  113. self.desc = desc
  114. self._context = ctx
  115. self._reported_stats = None
  116. def update_metrics(self):
  117. """Updates the metrics with values from this process."""
  118. new_stats = self._context.get_resource_usage()
  119. if self._reported_stats is None:
  120. diff = new_stats
  121. else:
  122. diff = new_stats - self._reported_stats
  123. self._reported_stats = new_stats
  124. _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime)
  125. _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime)
  126. _background_process_db_txn_count.labels(self.desc).inc(diff.db_txn_count)
  127. _background_process_db_txn_duration.labels(self.desc).inc(
  128. diff.db_txn_duration_sec
  129. )
  130. _background_process_db_sched_duration.labels(self.desc).inc(
  131. diff.db_sched_duration_sec
  132. )
  133. def run_as_background_process(desc, func, *args, **kwargs):
  134. """Run the given function in its own logcontext, with resource metrics
  135. This should be used to wrap processes which are fired off to run in the
  136. background, instead of being associated with a particular request.
  137. It returns a Deferred which completes when the function completes, but it doesn't
  138. follow the synapse logcontext rules, which makes it appropriate for passing to
  139. clock.looping_call and friends (or for firing-and-forgetting in the middle of a
  140. normal synapse inlineCallbacks function).
  141. Args:
  142. desc (str): a description for this background process type
  143. func: a function, which may return a Deferred or a coroutine
  144. args: positional args for func
  145. kwargs: keyword args for func
  146. Returns: Deferred which returns the result of func, but note that it does not
  147. follow the synapse logcontext rules.
  148. """
  149. @defer.inlineCallbacks
  150. def run():
  151. with _bg_metrics_lock:
  152. count = _background_process_counts.get(desc, 0)
  153. _background_process_counts[desc] = count + 1
  154. _background_process_start_count.labels(desc).inc()
  155. with LoggingContext(desc) as context:
  156. context.request = "%s-%i" % (desc, count)
  157. proc = _BackgroundProcess(desc, context)
  158. with _bg_metrics_lock:
  159. _background_processes.setdefault(desc, set()).add(proc)
  160. try:
  161. result = func(*args, **kwargs)
  162. # We probably don't have an ensureDeferred in our call stack to handle
  163. # coroutine results, so we need to ensureDeferred here.
  164. #
  165. # But we need this check because ensureDeferred doesn't like being
  166. # called on immediate values (as opposed to Deferreds or coroutines).
  167. if iscoroutine(result):
  168. result = defer.ensureDeferred(result)
  169. return (yield result)
  170. except Exception:
  171. logger.exception("Background process '%s' threw an exception", desc)
  172. finally:
  173. proc.update_metrics()
  174. with _bg_metrics_lock:
  175. _background_processes[desc].remove(proc)
  176. with PreserveLoggingContext():
  177. return run()
  178. def wrap_as_background_process(desc):
  179. """Decorator that wraps a function that gets called as a background
  180. process.
  181. Equivalent of calling the function with `run_as_background_process`
  182. """
  183. def wrap_as_background_process_inner(func):
  184. @wraps(func)
  185. def wrap_as_background_process_inner_2(*args, **kwargs):
  186. return run_as_background_process(desc, func, *args, **kwargs)
  187. return wrap_as_background_process_inner_2
  188. return wrap_as_background_process_inner