123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263 |
- # -*- coding: utf-8 -*-
- # Copyright 2015, 2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- import functools
- import time
- import gc
- import os
- import platform
- import attr
- from prometheus_client import Gauge, Histogram, Counter
- from prometheus_client.core import GaugeMetricFamily, REGISTRY
- from twisted.internet import reactor
- logger = logging.getLogger(__name__)
- running_on_pypy = platform.python_implementation() == "PyPy"
- all_metrics = []
- all_collectors = []
- all_gauges = {}
- HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
- class RegistryProxy(object):
- @staticmethod
- def collect():
- for metric in REGISTRY.collect():
- if not metric.name.startswith("__"):
- yield metric
- @attr.s(hash=True)
- class LaterGauge(object):
- name = attr.ib()
- desc = attr.ib()
- labels = attr.ib(hash=False)
- caller = attr.ib()
- def collect(self):
- g = GaugeMetricFamily(self.name, self.desc, labels=self.labels)
- try:
- calls = self.caller()
- except Exception:
- logger.exception(
- "Exception running callback for LaterGuage(%s)",
- self.name,
- )
- yield g
- return
- if isinstance(calls, dict):
- for k, v in calls.items():
- g.add_metric(k, v)
- else:
- g.add_metric([], calls)
- yield g
- def __attrs_post_init__(self):
- self._register()
- def _register(self):
- if self.name in all_gauges.keys():
- logger.warning("%s already registered, reregistering" % (self.name,))
- REGISTRY.unregister(all_gauges.pop(self.name))
- REGISTRY.register(self)
- all_gauges[self.name] = self
- #
- # Detailed CPU metrics
- #
- class CPUMetrics(object):
- def __init__(self):
- ticks_per_sec = 100
- try:
- # Try and get the system config
- ticks_per_sec = os.sysconf('SC_CLK_TCK')
- except (ValueError, TypeError, AttributeError):
- pass
- self.ticks_per_sec = ticks_per_sec
- def collect(self):
- if not HAVE_PROC_SELF_STAT:
- return
- with open("/proc/self/stat") as s:
- line = s.read()
- raw_stats = line.split(") ", 1)[1].split(" ")
- user = GaugeMetricFamily("process_cpu_user_seconds_total", "")
- user.add_metric([], float(raw_stats[11]) / self.ticks_per_sec)
- yield user
- sys = GaugeMetricFamily("process_cpu_system_seconds_total", "")
- sys.add_metric([], float(raw_stats[12]) / self.ticks_per_sec)
- yield sys
- REGISTRY.register(CPUMetrics())
- #
- # Python GC metrics
- #
- gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"])
- gc_time = Histogram(
- "python_gc_time",
- "Time taken to GC (sec)",
- ["gen"],
- buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
- 5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
- )
- class GCCounts(object):
- def collect(self):
- cm = GaugeMetricFamily("python_gc_counts", "GC cycle counts", labels=["gen"])
- for n, m in enumerate(gc.get_count()):
- cm.add_metric([str(n)], m)
- yield cm
- REGISTRY.register(GCCounts())
- #
- # Twisted reactor metrics
- #
- tick_time = Histogram(
- "python_twisted_reactor_tick_time",
- "Tick time of the Twisted reactor (sec)",
- buckets=[0.001, 0.002, 0.005, 0.01, 0.025, 0.05, 0.1, 0.2, 0.5, 1, 2, 5],
- )
- pending_calls_metric = Histogram(
- "python_twisted_reactor_pending_calls",
- "Pending calls",
- buckets=[1, 2, 5, 10, 25, 50, 100, 250, 500, 1000],
- )
- #
- # Federation Metrics
- #
- sent_edus_counter = Counter("synapse_federation_client_sent_edus", "")
- sent_transactions_counter = Counter("synapse_federation_client_sent_transactions", "")
- events_processed_counter = Counter("synapse_federation_client_events_processed", "")
- # Used to track where various components have processed in the event stream,
- # e.g. federation sending, appservice sending, etc.
- event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
- # Used to track the current max events stream position
- event_persisted_position = Gauge("synapse_event_persisted_position", "")
- # Used to track the received_ts of the last event processed by various
- # components
- event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"])
- # Used to track the lag processing events. This is the time difference
- # between the last processed event's received_ts and the time it was
- # finished being processed.
- event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
- def runUntilCurrentTimer(func):
- @functools.wraps(func)
- def f(*args, **kwargs):
- now = reactor.seconds()
- num_pending = 0
- # _newTimedCalls is one long list of *all* pending calls. Below loop
- # is based off of impl of reactor.runUntilCurrent
- for delayed_call in reactor._newTimedCalls:
- if delayed_call.time > now:
- break
- if delayed_call.delayed_time > 0:
- continue
- num_pending += 1
- num_pending += len(reactor.threadCallQueue)
- start = time.time()
- ret = func(*args, **kwargs)
- end = time.time()
- # record the amount of wallclock time spent running pending calls.
- # This is a proxy for the actual amount of time between reactor polls,
- # since about 25% of time is actually spent running things triggered by
- # I/O events, but that is harder to capture without rewriting half the
- # reactor.
- tick_time.observe(end - start)
- pending_calls_metric.observe(num_pending)
- if running_on_pypy:
- return ret
- # Check if we need to do a manual GC (since its been disabled), and do
- # one if necessary.
- threshold = gc.get_threshold()
- counts = gc.get_count()
- for i in (2, 1, 0):
- if threshold[i] < counts[i]:
- logger.info("Collecting gc %d", i)
- start = time.time()
- unreachable = gc.collect(i)
- end = time.time()
- gc_time.labels(i).observe(end - start)
- gc_unreachable.labels(i).set(unreachable)
- return ret
- return f
- try:
- # Ensure the reactor has all the attributes we expect
- reactor.runUntilCurrent
- reactor._newTimedCalls
- reactor.threadCallQueue
- # runUntilCurrent is called when we have pending calls. It is called once
- # per iteratation after fd polling.
- reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
- # We manually run the GC each reactor tick so that we can get some metrics
- # about time spent doing GC,
- if not running_on_pypy:
- gc.disable()
- except AttributeError:
- pass
|