__init__.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import functools
  17. import time
  18. import gc
  19. import os
  20. import platform
  21. import attr
  22. from prometheus_client import Gauge, Histogram, Counter
  23. from prometheus_client.core import GaugeMetricFamily, REGISTRY
  24. from twisted.internet import reactor
  25. logger = logging.getLogger(__name__)
  26. running_on_pypy = platform.python_implementation() == "PyPy"
  27. all_metrics = []
  28. all_collectors = []
  29. all_gauges = {}
  30. HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
  31. class RegistryProxy(object):
  32. @staticmethod
  33. def collect():
  34. for metric in REGISTRY.collect():
  35. if not metric.name.startswith("__"):
  36. yield metric
  37. @attr.s(hash=True)
  38. class LaterGauge(object):
  39. name = attr.ib()
  40. desc = attr.ib()
  41. labels = attr.ib(hash=False)
  42. caller = attr.ib()
  43. def collect(self):
  44. g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
  45. try:
  46. calls = self.caller()
  47. except Exception:
  48. logger.exception(
  49. "Exception running callback for LaterGuage(%s)",
  50. self.name,
  51. )
  52. yield g
  53. return
  54. if isinstance(calls, dict):
  55. for k, v in calls.items():
  56. g.add_metric(k, v)
  57. else:
  58. g.add_metric([], calls)
  59. yield g
  60. def __attrs_post_init__(self):
  61. self._register()
  62. def _register(self):
  63. if self.name in all_gauges.keys():
  64. logger.warning("%s already registered, reregistering" % (self.name,))
  65. REGISTRY.unregister(all_gauges.pop(self.name))
  66. REGISTRY.register(self)
  67. all_gauges[self.name] = self
  68. #
  69. # Detailed CPU metrics
  70. #
  71. class CPUMetrics(object):
  72. def __init__(self):
  73. ticks_per_sec = 100
  74. try:
  75. # Try and get the system config
  76. ticks_per_sec = os.sysconf('SC_CLK_TCK')
  77. except (ValueError, TypeError, AttributeError):
  78. pass
  79. self.ticks_per_sec = ticks_per_sec
  80. def collect(self):
  81. if not HAVE_PROC_SELF_STAT:
  82. return
  83. with open("/proc/self/stat") as s:
  84. line = s.read()
  85. raw_stats = line.split(") ", 1)[1].split(" ")
  86. user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
  87. user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
  88. yield user
  89. sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
  90. sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
  91. yield sys
  92. REGISTRY.register(CPUMetrics())
  93. #
  94. # Python GC metrics
  95. #
  96. gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
  97. gc_time = Histogram(
  98. "python_gc_time",
  99. "Time taken to GC (sec)",
  100. ["gen"],
  101. buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
  102. 5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
  103. )
  104. class GCCounts(object):
  105. def collect(self):
  106. cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
  107. for n, m in enumerate(gc.get_count()):
  108. cm.add_metric([str(n)], m)
  109. yield cm
  110. REGISTRY.register(GCCounts())
  111. #
  112. # Twisted reactor metrics
  113. #
  114. tick_time = Histogram(
  115. "python_twisted_reactor_tick_time",
  116. "Tick time of the Twisted reactor (sec)",
  117. buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
  118. )
  119. pending_calls_metric = Histogram(
  120. "python_twisted_reactor_pending_calls",
  121. "Pending calls",
  122. buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
  123. )
  124. #
  125. # Federation Metrics
  126. #
  127. sent_edus_counter = Counter("synapse_federation_client_sent_edus", "")
  128. sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
  129. events_processed_counter = Counter("synapse_federation_client_events_processed", "")
  130. # Used to track where various components have processed in the event stream,
  131. # e.g. federation sending, appservice sending, etc.
  132. event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
  133. # Used to track the current max events stream position
  134. event_persisted_position = Gauge("synapse_event_persisted_position", "")
  135. # Used to track the received_ts of the last event processed by various
  136. # components
  137. event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
  138. # Used to track the lag processing events. This is the time difference
  139. # between the last processed event's received_ts and the time it was
  140. # finished being processed.
  141. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
  142. def runUntilCurrentTimer(func):
  143. @functools.wraps(func)
  144. def f(*args, **kwargs):
  145. now = reactor.seconds()
  146. num_pending = 0
  147. # _newTimedCalls is one long list of *all* pending calls. Below loop
  148. # is based off of impl of reactor.runUntilCurrent
  149. for delayed_call in reactor._newTimedCalls:
  150. if delayed_call.time > now:
  151. break
  152. if delayed_call.delayed_time > 0:
  153. continue
  154. num_pending += 1
  155. num_pending += len(reactor.threadCallQueue)
  156. start = time.time()
  157. ret = func(*args, **kwargs)
  158. end = time.time()
  159. # record the amount of wallclock time spent running pending calls.
  160. # This is a proxy for the actual amount of time between reactor polls,
  161. # since about 25% of time is actually spent running things triggered by
  162. # I/O events, but that is harder to capture without rewriting half the
  163. # reactor.
  164. tick_time.observe(end - start)
  165. pending_calls_metric.observe(num_pending)
  166. if running_on_pypy:
  167. return ret
  168. # Check if we need to do a manual GC (since its been disabled), and do
  169. # one if necessary.
  170. threshold = gc.get_threshold()
  171. counts = gc.get_count()
  172. for i in (2, 1, 0):
  173. if threshold[i] < counts[i]:
  174. logger.info("Collecting gc %d", i)
  175. start = time.time()
  176. unreachable = gc.collect(i)
  177. end = time.time()
  178. gc_time.labels(i).observe(end - start)
  179. gc_unreachable.labels(i).set(unreachable)
  180. return ret
  181. return f
  182. try:
  183. # Ensure the reactor has all the attributes we expect
  184. reactor.runUntilCurrent
  185. reactor._newTimedCalls
  186. reactor.threadCallQueue
  187. # runUntilCurrent is called when we have pending calls. It is called once
  188. # per iteratation after fd polling.
  189. reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
  190. # We manually run the GC each reactor tick so that we can get some metrics
  191. # about time spent doing GC,
  192. if not running_on_pypy:
  193. gc.disable()
  194. except AttributeError:
  195. pass