__init__.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import functools
  16. import gc
  17. import logging
  18. import os
  19. import platform
  20. import threading
  21. import time
  22. from typing import Callable, Dict, Iterable, Optional, Tuple, Union
  23. import six
  24. import attr
  25. from prometheus_client import Counter, Gauge, Histogram
  26. from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily
  27. from twisted.internet import reactor
  28. import synapse
  29. from synapse.metrics._exposition import (
  30. MetricsResource,
  31. generate_latest,
  32. start_http_server,
  33. )
  34. from synapse.util.versionstring import get_version_string
  35. logger = logging.getLogger(__name__)
  36. METRICS_PREFIX = "/_synapse/metrics"
  37. running_on_pypy = platform.python_implementation() == "PyPy"
  38. all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]]
  39. HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
  40. class RegistryProxy(object):
  41. @staticmethod
  42. def collect():
  43. for metric in REGISTRY.collect():
  44. if not metric.name.startswith("__"):
  45. yield metric
  46. @attr.s(hash=True)
  47. class LaterGauge(object):
  48. name = attr.ib(type=str)
  49. desc = attr.ib(type=str)
  50. labels = attr.ib(hash=False, type=Optional[Iterable[str]])
  51. # callback: should either return a value (if there are no labels for this metric),
  52. # or dict mapping from a label tuple to a value
  53. caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]])
  54. def collect(self):
  55. g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
  56. try:
  57. calls = self.caller()
  58. except Exception:
  59. logger.exception("Exception running callback for LaterGauge(%s)", self.name)
  60. yield g
  61. return
  62. if isinstance(calls, dict):
  63. for k, v in six.iteritems(calls):
  64. g.add_metric(k, v)
  65. else:
  66. g.add_metric([], calls)
  67. yield g
  68. def __attrs_post_init__(self):
  69. self._register()
  70. def _register(self):
  71. if self.name in all_gauges.keys():
  72. logger.warning("%s already registered, reregistering" % (self.name,))
  73. REGISTRY.unregister(all_gauges.pop(self.name))
  74. REGISTRY.register(self)
  75. all_gauges[self.name] = self
  76. class InFlightGauge(object):
  77. """Tracks number of things (e.g. requests, Measure blocks, etc) in flight
  78. at any given time.
  79. Each InFlightGauge will create a metric called `<name>_total` that counts
  80. the number of in flight blocks, as well as a metrics for each item in the
  81. given `sub_metrics` as `<name>_<sub_metric>` which will get updated by the
  82. callbacks.
  83. Args:
  84. name (str)
  85. desc (str)
  86. labels (list[str])
  87. sub_metrics (list[str]): A list of sub metrics that the callbacks
  88. will update.
  89. """
  90. def __init__(self, name, desc, labels, sub_metrics):
  91. self.name = name
  92. self.desc = desc
  93. self.labels = labels
  94. self.sub_metrics = sub_metrics
  95. # Create a class which have the sub_metrics values as attributes, which
  96. # default to 0 on initialization. Used to pass to registered callbacks.
  97. self._metrics_class = attr.make_class(
  98. "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
  99. )
  100. # Counts number of in flight blocks for a given set of label values
  101. self._registrations = {} # type: Dict
  102. # Protects access to _registrations
  103. self._lock = threading.Lock()
  104. self._register_with_collector()
  105. def register(self, key, callback):
  106. """Registers that we've entered a new block with labels `key`.
  107. `callback` gets called each time the metrics are collected. The same
  108. value must also be given to `unregister`.
  109. `callback` gets called with an object that has an attribute per
  110. sub_metric, which should be updated with the necessary values. Note that
  111. the metrics object is shared between all callbacks registered with the
  112. same key.
  113. Note that `callback` may be called on a separate thread.
  114. """
  115. with self._lock:
  116. self._registrations.setdefault(key, set()).add(callback)
  117. def unregister(self, key, callback):
  118. """Registers that we've exited a block with labels `key`.
  119. """
  120. with self._lock:
  121. self._registrations.setdefault(key, set()).discard(callback)
  122. def collect(self):
  123. """Called by prometheus client when it reads metrics.
  124. Note: may be called by a separate thread.
  125. """
  126. in_flight = GaugeMetricFamily(
  127. self.name + "_total", self.desc, labels=self.labels
  128. )
  129. metrics_by_key = {}
  130. # We copy so that we don't mutate the list while iterating
  131. with self._lock:
  132. keys = list(self._registrations)
  133. for key in keys:
  134. with self._lock:
  135. callbacks = set(self._registrations[key])
  136. in_flight.add_metric(key, len(callbacks))
  137. metrics = self._metrics_class()
  138. metrics_by_key[key] = metrics
  139. for callback in callbacks:
  140. callback(metrics)
  141. yield in_flight
  142. for name in self.sub_metrics:
  143. gauge = GaugeMetricFamily(
  144. "_".join([self.name, name]), "", labels=self.labels
  145. )
  146. for key, metrics in six.iteritems(metrics_by_key):
  147. gauge.add_metric(key, getattr(metrics, name))
  148. yield gauge
  149. def _register_with_collector(self):
  150. if self.name in all_gauges.keys():
  151. logger.warning("%s already registered, reregistering" % (self.name,))
  152. REGISTRY.unregister(all_gauges.pop(self.name))
  153. REGISTRY.register(self)
  154. all_gauges[self.name] = self
  155. @attr.s(hash=True)
  156. class BucketCollector(object):
  157. """
  158. Like a Histogram, but allows buckets to be point-in-time instead of
  159. incrementally added to.
  160. Args:
  161. name (str): Base name of metric to be exported to Prometheus.
  162. data_collector (callable -> dict): A synchronous callable that
  163. returns a dict mapping bucket to number of items in the
  164. bucket. If these buckets are not the same as the buckets
  165. given to this class, they will be remapped into them.
  166. buckets (list[float]): List of floats/ints of the buckets to
  167. give to Prometheus. +Inf is ignored, if given.
  168. """
  169. name = attr.ib()
  170. data_collector = attr.ib()
  171. buckets = attr.ib()
  172. def collect(self):
  173. # Fetch the data -- this must be synchronous!
  174. data = self.data_collector()
  175. buckets = {} # type: Dict[float, int]
  176. res = []
  177. for x in data.keys():
  178. for i, bound in enumerate(self.buckets):
  179. if x <= bound:
  180. buckets[bound] = buckets.get(bound, 0) + data[x]
  181. for i in self.buckets:
  182. res.append([str(i), buckets.get(i, 0)])
  183. res.append(["+Inf", sum(data.values())])
  184. metric = HistogramMetricFamily(
  185. self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items())
  186. )
  187. yield metric
  188. def __attrs_post_init__(self):
  189. self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
  190. if self.buckets != sorted(self.buckets):
  191. raise ValueError("Buckets not sorted")
  192. self.buckets = tuple(self.buckets)
  193. if self.name in all_gauges.keys():
  194. logger.warning("%s already registered, reregistering" % (self.name,))
  195. REGISTRY.unregister(all_gauges.pop(self.name))
  196. REGISTRY.register(self)
  197. all_gauges[self.name] = self
  198. #
  199. # Detailed CPU metrics
  200. #
  201. class CPUMetrics(object):
  202. def __init__(self):
  203. ticks_per_sec = 100
  204. try:
  205. # Try and get the system config
  206. ticks_per_sec = os.sysconf("SC_CLK_TCK")
  207. except (ValueError, TypeError, AttributeError):
  208. pass
  209. self.ticks_per_sec = ticks_per_sec
  210. def collect(self):
  211. if not HAVE_PROC_SELF_STAT:
  212. return
  213. with open("/proc/self/stat") as s:
  214. line = s.read()
  215. raw_stats = line.split(") ", 1)[1].split(" ")
  216. user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
  217. user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
  218. yield user
  219. sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
  220. sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
  221. yield sys
  222. REGISTRY.register(CPUMetrics())
  223. #
  224. # Python GC metrics
  225. #
  226. gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
  227. gc_time = Histogram(
  228. "python_gc_time",
  229. "Time taken to GC (sec)",
  230. ["gen"],
  231. buckets=[
  232. 0.0025,
  233. 0.005,
  234. 0.01,
  235. 0.025,
  236. 0.05,
  237. 0.10,
  238. 0.25,
  239. 0.50,
  240. 1.00,
  241. 2.50,
  242. 5.00,
  243. 7.50,
  244. 15.00,
  245. 30.00,
  246. 45.00,
  247. 60.00,
  248. ],
  249. )
  250. class GCCounts(object):
  251. def collect(self):
  252. cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
  253. for n, m in enumerate(gc.get_count()):
  254. cm.add_metric([str(n)], m)
  255. yield cm
  256. if not running_on_pypy:
  257. REGISTRY.register(GCCounts())
  258. #
  259. # Twisted reactor metrics
  260. #
  261. tick_time = Histogram(
  262. "python_twisted_reactor_tick_time",
  263. "Tick time of the Twisted reactor (sec)",
  264. buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
  265. )
  266. pending_calls_metric = Histogram(
  267. "python_twisted_reactor_pending_calls",
  268. "Pending calls",
  269. buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
  270. )
  271. #
  272. # Federation Metrics
  273. #
  274. sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
  275. events_processed_counter = Counter("synapse_federation_client_events_processed", "")
  276. event_processing_loop_counter = Counter(
  277. "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
  278. )
  279. event_processing_loop_room_count = Counter(
  280. "synapse_event_processing_loop_room_count",
  281. "Rooms seen per event processing loop iteration",
  282. ["name"],
  283. )
  284. # Used to track where various components have processed in the event stream,
  285. # e.g. federation sending, appservice sending, etc.
  286. event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
  287. # Used to track the current max events stream position
  288. event_persisted_position = Gauge("synapse_event_persisted_position", "")
  289. # Used to track the received_ts of the last event processed by various
  290. # components
  291. event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
  292. # Used to track the lag processing events. This is the time difference
  293. # between the last processed event's received_ts and the time it was
  294. # finished being processed.
  295. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
  296. # Build info of the running server.
  297. build_info = Gauge(
  298. "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
  299. )
  300. build_info.labels(
  301. " ".join([platform.python_implementation(), platform.python_version()]),
  302. get_version_string(synapse),
  303. " ".join([platform.system(), platform.release()]),
  304. ).set(1)
  305. last_ticked = time.time()
  306. class ReactorLastSeenMetric(object):
  307. def collect(self):
  308. cm = GaugeMetricFamily(
  309. "python_twisted_reactor_last_seen",
  310. "Seconds since the Twisted reactor was last seen",
  311. )
  312. cm.add_metric([], time.time() - last_ticked)
  313. yield cm
  314. REGISTRY.register(ReactorLastSeenMetric())
  315. def runUntilCurrentTimer(func):
  316. @functools.wraps(func)
  317. def f(*args, **kwargs):
  318. now = reactor.seconds()
  319. num_pending = 0
  320. # _newTimedCalls is one long list of *all* pending calls. Below loop
  321. # is based off of impl of reactor.runUntilCurrent
  322. for delayed_call in reactor._newTimedCalls:
  323. if delayed_call.time > now:
  324. break
  325. if delayed_call.delayed_time > 0:
  326. continue
  327. num_pending += 1
  328. num_pending += len(reactor.threadCallQueue)
  329. start = time.time()
  330. ret = func(*args, **kwargs)
  331. end = time.time()
  332. # record the amount of wallclock time spent running pending calls.
  333. # This is a proxy for the actual amount of time between reactor polls,
  334. # since about 25% of time is actually spent running things triggered by
  335. # I/O events, but that is harder to capture without rewriting half the
  336. # reactor.
  337. tick_time.observe(end - start)
  338. pending_calls_metric.observe(num_pending)
  339. # Update the time we last ticked, for the metric to test whether
  340. # Synapse's reactor has frozen
  341. global last_ticked
  342. last_ticked = end
  343. if running_on_pypy:
  344. return ret
  345. # Check if we need to do a manual GC (since its been disabled), and do
  346. # one if necessary.
  347. threshold = gc.get_threshold()
  348. counts = gc.get_count()
  349. for i in (2, 1, 0):
  350. if threshold[i] < counts[i]:
  351. if i == 0:
  352. logger.debug("Collecting gc %d", i)
  353. else:
  354. logger.info("Collecting gc %d", i)
  355. start = time.time()
  356. unreachable = gc.collect(i)
  357. end = time.time()
  358. gc_time.labels(i).observe(end - start)
  359. gc_unreachable.labels(i).set(unreachable)
  360. return ret
  361. return f
  362. try:
  363. # Ensure the reactor has all the attributes we expect
  364. reactor.runUntilCurrent
  365. reactor._newTimedCalls
  366. reactor.threadCallQueue
  367. # runUntilCurrent is called when we have pending calls. It is called once
  368. # per iteratation after fd polling.
  369. reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
  370. # We manually run the GC each reactor tick so that we can get some metrics
  371. # about time spent doing GC,
  372. if not running_on_pypy:
  373. gc.disable()
  374. except AttributeError:
  375. pass
  376. __all__ = [
  377. "MetricsResource",
  378. "generate_latest",
  379. "start_http_server",
  380. "LaterGauge",
  381. "InFlightGauge",
  382. "BucketCollector",
  383. ]