__init__.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. # Copyright 2022 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import itertools
  16. import logging
  17. import os
  18. import platform
  19. import threading
  20. from typing import (
  21. Callable,
  22. Dict,
  23. Generic,
  24. Iterable,
  25. Mapping,
  26. Optional,
  27. Sequence,
  28. Set,
  29. Tuple,
  30. Type,
  31. TypeVar,
  32. Union,
  33. cast,
  34. )
  35. import attr
  36. from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric
  37. from prometheus_client.core import (
  38. REGISTRY,
  39. GaugeHistogramMetricFamily,
  40. GaugeMetricFamily,
  41. )
  42. from twisted.python.threadpool import ThreadPool
  43. # This module is imported for its side effects; flake8 needn't warn that it's unused.
  44. import synapse.metrics._reactor_metrics # noqa: F401
  45. from synapse.metrics._exposition import (
  46. MetricsResource,
  47. generate_latest,
  48. start_http_server,
  49. )
  50. from synapse.metrics._gc import MIN_TIME_BETWEEN_GCS, install_gc_manager
  51. from synapse.metrics._types import Collector
  52. from synapse.util import SYNAPSE_VERSION
  53. logger = logging.getLogger(__name__)
  54. METRICS_PREFIX = "/_synapse/metrics"
  55. all_gauges: Dict[str, Collector] = {}
  56. HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
  57. class _RegistryProxy:
  58. @staticmethod
  59. def collect() -> Iterable[Metric]:
  60. for metric in REGISTRY.collect():
  61. if not metric.name.startswith("__"):
  62. yield metric
  63. # A little bit nasty, but collect() above is static so a Protocol doesn't work.
  64. # _RegistryProxy matches the signature of a CollectorRegistry instance enough
  65. # for it to be usable in the contexts in which we use it.
  66. # TODO Do something nicer about this.
  67. RegistryProxy = cast(CollectorRegistry, _RegistryProxy)
  68. @attr.s(slots=True, hash=True, auto_attribs=True)
  69. class LaterGauge(Collector):
  70. name: str
  71. desc: str
  72. labels: Optional[Sequence[str]] = attr.ib(hash=False)
  73. # callback: should either return a value (if there are no labels for this metric),
  74. # or dict mapping from a label tuple to a value
  75. caller: Callable[
  76. [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]]
  77. ]
  78. def collect(self) -> Iterable[Metric]:
  79. g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
  80. try:
  81. calls = self.caller()
  82. except Exception:
  83. logger.exception("Exception running callback for LaterGauge(%s)", self.name)
  84. yield g
  85. return
  86. if isinstance(calls, (int, float)):
  87. g.add_metric([], calls)
  88. else:
  89. for k, v in calls.items():
  90. g.add_metric(k, v)
  91. yield g
  92. def __attrs_post_init__(self) -> None:
  93. self._register()
  94. def _register(self) -> None:
  95. if self.name in all_gauges.keys():
  96. logger.warning("%s already registered, reregistering" % (self.name,))
  97. REGISTRY.unregister(all_gauges.pop(self.name))
  98. REGISTRY.register(self)
  99. all_gauges[self.name] = self
  100. # `MetricsEntry` only makes sense when it is a `Protocol`,
  101. # but `Protocol` can't be used as a `TypeVar` bound.
  102. MetricsEntry = TypeVar("MetricsEntry")
  103. class InFlightGauge(Generic[MetricsEntry], Collector):
  104. """Tracks number of things (e.g. requests, Measure blocks, etc) in flight
  105. at any given time.
  106. Each InFlightGauge will create a metric called `<name>_total` that counts
  107. the number of in flight blocks, as well as a metrics for each item in the
  108. given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
  109. callbacks.
  110. Args:
  111. name
  112. desc
  113. labels
  114. sub_metrics: A list of sub metrics that the callbacks will update.
  115. """
  116. def __init__(
  117. self,
  118. name: str,
  119. desc: str,
  120. labels: Sequence[str],
  121. sub_metrics: Sequence[str],
  122. ):
  123. self.name = name
  124. self.desc = desc
  125. self.labels = labels
  126. self.sub_metrics = sub_metrics
  127. # Create a class which have the sub_metrics values as attributes, which
  128. # default to 0 on initialization. Used to pass to registered callbacks.
  129. self._metrics_class: Type[MetricsEntry] = attr.make_class(
  130. "_MetricsEntry",
  131. attrs={x: attr.ib(default=0) for x in sub_metrics},
  132. slots=True,
  133. )
  134. # Counts number of in flight blocks for a given set of label values
  135. self._registrations: Dict[
  136. Tuple[str, ...], Set[Callable[[MetricsEntry], None]]
  137. ] = {}
  138. # Protects access to _registrations
  139. self._lock = threading.Lock()
  140. self._register_with_collector()
  141. def register(
  142. self,
  143. key: Tuple[str, ...],
  144. callback: Callable[[MetricsEntry], None],
  145. ) -> None:
  146. """Registers that we've entered a new block with labels `key`.
  147. `callback` gets called each time the metrics are collected. The same
  148. value must also be given to `unregister`.
  149. `callback` gets called with an object that has an attribute per
  150. sub_metric, which should be updated with the necessary values. Note that
  151. the metrics object is shared between all callbacks registered with the
  152. same key.
  153. Note that `callback` may be called on a separate thread.
  154. """
  155. with self._lock:
  156. self._registrations.setdefault(key, set()).add(callback)
  157. def unregister(
  158. self,
  159. key: Tuple[str, ...],
  160. callback: Callable[[MetricsEntry], None],
  161. ) -> None:
  162. """Registers that we've exited a block with labels `key`."""
  163. with self._lock:
  164. self._registrations.setdefault(key, set()).discard(callback)
  165. def collect(self) -> Iterable[Metric]:
  166. """Called by prometheus client when it reads metrics.
  167. Note: may be called by a separate thread.
  168. """
  169. in_flight = GaugeMetricFamily(
  170. self.name + "_total", self.desc, labels=self.labels
  171. )
  172. metrics_by_key = {}
  173. # We copy so that we don't mutate the list while iterating
  174. with self._lock:
  175. keys = list(self._registrations)
  176. for key in keys:
  177. with self._lock:
  178. callbacks = set(self._registrations[key])
  179. in_flight.add_metric(key, len(callbacks))
  180. metrics = self._metrics_class()
  181. metrics_by_key[key] = metrics
  182. for callback in callbacks:
  183. callback(metrics)
  184. yield in_flight
  185. for name in self.sub_metrics:
  186. gauge = GaugeMetricFamily(
  187. "_".join([self.name, name]), "", labels=self.labels
  188. )
  189. for key, metrics in metrics_by_key.items():
  190. gauge.add_metric(key, getattr(metrics, name))
  191. yield gauge
  192. def _register_with_collector(self) -> None:
  193. if self.name in all_gauges.keys():
  194. logger.warning("%s already registered, reregistering" % (self.name,))
  195. REGISTRY.unregister(all_gauges.pop(self.name))
  196. REGISTRY.register(self)
  197. all_gauges[self.name] = self
  198. class GaugeBucketCollector(Collector):
  199. """Like a Histogram, but the buckets are Gauges which are updated atomically.
  200. The data is updated by calling `update_data` with an iterable of measurements.
  201. We assume that the data is updated less frequently than it is reported to
  202. Prometheus, and optimise for that case.
  203. """
  204. __slots__ = (
  205. "_name",
  206. "_documentation",
  207. "_bucket_bounds",
  208. "_metric",
  209. )
  210. def __init__(
  211. self,
  212. name: str,
  213. documentation: str,
  214. buckets: Iterable[float],
  215. registry: CollectorRegistry = REGISTRY,
  216. ):
  217. """
  218. Args:
  219. name: base name of metric to be exported to Prometheus. (a _bucket suffix
  220. will be added.)
  221. documentation: help text for the metric
  222. buckets: The top bounds of the buckets to report
  223. registry: metric registry to register with
  224. """
  225. self._name = name
  226. self._documentation = documentation
  227. # the tops of the buckets
  228. self._bucket_bounds = [float(b) for b in buckets]
  229. if self._bucket_bounds != sorted(self._bucket_bounds):
  230. raise ValueError("Buckets not in sorted order")
  231. if self._bucket_bounds[-1] != float("inf"):
  232. self._bucket_bounds.append(float("inf"))
  233. # We initially set this to None. We won't report metrics until
  234. # this has been initialised after a successful data update
  235. self._metric: Optional[GaugeHistogramMetricFamily] = None
  236. registry.register(self)
  237. def collect(self) -> Iterable[Metric]:
  238. # Don't report metrics unless we've already collected some data
  239. if self._metric is not None:
  240. yield self._metric
  241. def update_data(self, values: Iterable[float]) -> None:
  242. """Update the data to be reported by the metric
  243. The existing data is cleared, and each measurement in the input is assigned
  244. to the relevant bucket.
  245. """
  246. self._metric = self._values_to_metric(values)
  247. def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
  248. total = 0.0
  249. bucket_values = [0 for _ in self._bucket_bounds]
  250. for v in values:
  251. # assign each value to a bucket
  252. for i, bound in enumerate(self._bucket_bounds):
  253. if v <= bound:
  254. bucket_values[i] += 1
  255. break
  256. # ... and increment the sum
  257. total += v
  258. # now, aggregate the bucket values so that they count the number of entries in
  259. # that bucket or below.
  260. accumulated_values = itertools.accumulate(bucket_values)
  261. return GaugeHistogramMetricFamily(
  262. self._name,
  263. self._documentation,
  264. buckets=list(
  265. zip((str(b) for b in self._bucket_bounds), accumulated_values)
  266. ),
  267. gsum_value=total,
  268. )
  269. #
  270. # Detailed CPU metrics
  271. #
  272. class CPUMetrics(Collector):
  273. def __init__(self) -> None:
  274. ticks_per_sec = 100
  275. try:
  276. # Try and get the system config
  277. ticks_per_sec = os.sysconf("SC_CLK_TCK")
  278. except (ValueError, TypeError, AttributeError):
  279. pass
  280. self.ticks_per_sec = ticks_per_sec
  281. def collect(self) -> Iterable[Metric]:
  282. if not HAVE_PROC_SELF_STAT:
  283. return
  284. with open("/proc/self/stat") as s:
  285. line = s.read()
  286. raw_stats = line.split(") ", 1)[1].split(" ")
  287. user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
  288. user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
  289. yield user
  290. sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
  291. sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
  292. yield sys
  293. REGISTRY.register(CPUMetrics())
  294. #
  295. # Federation Metrics
  296. #
  297. sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
  298. events_processed_counter = Counter("synapse_federation_client_events_processed", "")
  299. event_processing_loop_counter = Counter(
  300. "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
  301. )
  302. event_processing_loop_room_count = Counter(
  303. "synapse_event_processing_loop_room_count",
  304. "Rooms seen per event processing loop iteration",
  305. ["name"],
  306. )
  307. # Used to track where various components have processed in the event stream,
  308. # e.g. federation sending, appservice sending, etc.
  309. event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
  310. # Used to track the current max events stream position
  311. event_persisted_position = Gauge("synapse_event_persisted_position", "")
  312. # Used to track the received_ts of the last event processed by various
  313. # components
  314. event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
  315. # Used to track the lag processing events. This is the time difference
  316. # between the last processed event's received_ts and the time it was
  317. # finished being processed.
  318. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
  319. event_processing_lag_by_event = Histogram(
  320. "synapse_event_processing_lag_by_event",
  321. "Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
  322. ["name"],
  323. )
  324. # Build info of the running server.
  325. build_info = Gauge(
  326. "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
  327. )
  328. build_info.labels(
  329. " ".join([platform.python_implementation(), platform.python_version()]),
  330. SYNAPSE_VERSION,
  331. " ".join([platform.system(), platform.release()]),
  332. ).set(1)
  333. # 3PID send info
  334. threepid_send_requests = Histogram(
  335. "synapse_threepid_send_requests_with_tries",
  336. documentation="Number of requests for a 3pid token by try count. Note if"
  337. " there is a request with try count of 4, then there would have been one"
  338. " each for 1, 2 and 3",
  339. buckets=(1, 2, 3, 4, 5, 10),
  340. labelnames=("type", "reason"),
  341. )
  342. threadpool_total_threads = Gauge(
  343. "synapse_threadpool_total_threads",
  344. "Total number of threads currently in the threadpool",
  345. ["name"],
  346. )
  347. threadpool_total_working_threads = Gauge(
  348. "synapse_threadpool_working_threads",
  349. "Number of threads currently working in the threadpool",
  350. ["name"],
  351. )
  352. threadpool_total_min_threads = Gauge(
  353. "synapse_threadpool_min_threads",
  354. "Minimum number of threads configured in the threadpool",
  355. ["name"],
  356. )
  357. threadpool_total_max_threads = Gauge(
  358. "synapse_threadpool_max_threads",
  359. "Maximum number of threads configured in the threadpool",
  360. ["name"],
  361. )
  362. def register_threadpool(name: str, threadpool: ThreadPool) -> None:
  363. """Add metrics for the threadpool."""
  364. threadpool_total_min_threads.labels(name).set(threadpool.min)
  365. threadpool_total_max_threads.labels(name).set(threadpool.max)
  366. threadpool_total_threads.labels(name).set_function(lambda: len(threadpool.threads))
  367. threadpool_total_working_threads.labels(name).set_function(
  368. lambda: len(threadpool.working)
  369. )
  370. __all__ = [
  371. "Collector",
  372. "MetricsResource",
  373. "generate_latest",
  374. "start_http_server",
  375. "LaterGauge",
  376. "InFlightGauge",
  377. "GaugeBucketCollector",
  378. "MIN_TIME_BETWEEN_GCS",
  379. "install_gc_manager",
  380. ]