__init__.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. # Because otherwise 'resource' collides with synapse.metrics.resource
  16. from __future__ import absolute_import
  17. import logging
  18. from resource import getrusage, RUSAGE_SELF
  19. import functools
  20. import os
  21. import stat
  22. import time
  23. from twisted.internet import reactor
  24. from .metric import (
  25. CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
  26. )
  27. logger = logging.getLogger(__name__)
  28. # We'll keep all the available metrics in a single toplevel dict, one shared
  29. # for the entire process. We don't currently support per-HomeServer instances
  30. # of metrics, because in practice any one python VM will host only one
  31. # HomeServer anyway. This makes a lot of implementation neater
  32. all_metrics = {}
  33. class Metrics(object):
  34. """ A single Metrics object gives a (mutable) slice view of the all_metrics
  35. dict, allowing callers to easily register new metrics that are namespaced
  36. nicely."""
  37. def __init__(self, name):
  38. self.name_prefix = name
  39. def _register(self, metric_class, name, *args, **kwargs):
  40. full_name = "%s_%s" % (self.name_prefix, name)
  41. metric = metric_class(full_name, *args, **kwargs)
  42. all_metrics[full_name] = metric
  43. return metric
  44. def register_counter(self, *args, **kwargs):
  45. return self._register(CounterMetric, *args, **kwargs)
  46. def register_callback(self, *args, **kwargs):
  47. return self._register(CallbackMetric, *args, **kwargs)
  48. def register_distribution(self, *args, **kwargs):
  49. return self._register(DistributionMetric, *args, **kwargs)
  50. def register_cache(self, *args, **kwargs):
  51. return self._register(CacheMetric, *args, **kwargs)
  52. def get_metrics_for(pkg_name):
  53. """ Returns a Metrics instance for conveniently creating metrics
  54. namespaced with the given name prefix. """
  55. # Convert a "package.name" to "package_name" because Prometheus doesn't
  56. # let us use . in metric names
  57. return Metrics(pkg_name.replace(".", "_"))
  58. def render_all():
  59. strs = []
  60. # TODO(paul): Internal hack
  61. update_resource_metrics()
  62. for name in sorted(all_metrics.keys()):
  63. try:
  64. strs += all_metrics[name].render()
  65. except Exception:
  66. strs += ["# FAILED to render %s" % name]
  67. logger.exception("Failed to render %s metric", name)
  68. strs.append("") # to generate a final CRLF
  69. return "\n".join(strs)
  70. # Now register some standard process-wide state metrics, to give indications of
  71. # process resource usage
  72. rusage = None
  73. def update_resource_metrics():
  74. global rusage
  75. rusage = getrusage(RUSAGE_SELF)
  76. resource_metrics = get_metrics_for("process.resource")
  77. # msecs
  78. resource_metrics.register_callback("utime", lambda: rusage.ru_utime * 1000)
  79. resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000)
  80. # kilobytes
  81. resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * 1024)
  82. TYPES = {
  83. stat.S_IFSOCK: "SOCK",
  84. stat.S_IFLNK: "LNK",
  85. stat.S_IFREG: "REG",
  86. stat.S_IFBLK: "BLK",
  87. stat.S_IFDIR: "DIR",
  88. stat.S_IFCHR: "CHR",
  89. stat.S_IFIFO: "FIFO",
  90. }
  91. def _process_fds():
  92. counts = {(k,): 0 for k in TYPES.values()}
  93. counts[("other",)] = 0
  94. # Not every OS will have a /proc/self/fd directory
  95. if not os.path.exists("/proc/self/fd"):
  96. return counts
  97. for fd in os.listdir("/proc/self/fd"):
  98. try:
  99. s = os.stat("/proc/self/fd/%s" % (fd))
  100. fmt = stat.S_IFMT(s.st_mode)
  101. if fmt in TYPES:
  102. t = TYPES[fmt]
  103. else:
  104. t = "other"
  105. counts[(t,)] += 1
  106. except OSError:
  107. # the dirh itself used by listdir() is usually missing by now
  108. pass
  109. return counts
  110. get_metrics_for("process").register_callback("fds", _process_fds, labels=["type"])
  111. reactor_metrics = get_metrics_for("reactor")
  112. tick_time = reactor_metrics.register_distribution("tick_time")
  113. pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
  114. def runUntilCurrentTimer(func):
  115. @functools.wraps(func)
  116. def f(*args, **kwargs):
  117. now = reactor.seconds()
  118. num_pending = 0
  119. # _newTimedCalls is one long list of *all* pending calls. Below loop
  120. # is based off of impl of reactor.runUntilCurrent
  121. for delayed_call in reactor._newTimedCalls:
  122. if delayed_call.time > now:
  123. break
  124. if delayed_call.delayed_time > 0:
  125. continue
  126. num_pending += 1
  127. num_pending += len(reactor.threadCallQueue)
  128. start = time.time() * 1000
  129. ret = func(*args, **kwargs)
  130. end = time.time() * 1000
  131. tick_time.inc_by(end - start)
  132. pending_calls_metric.inc_by(num_pending)
  133. return ret
  134. return f
  135. try:
  136. # Ensure the reactor has all the attributes we expect
  137. reactor.runUntilCurrent
  138. reactor._newTimedCalls
  139. reactor.threadCallQueue
  140. # runUntilCurrent is called when we have pending calls. It is called once
  141. # per iteratation after fd polling.
  142. reactor.runUntilCurrent = runUntilCurrentTimer(reactor.runUntilCurrent)
  143. except AttributeError:
  144. pass