logcontext.py 12 KB


  1. # Copyright 2014-2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from twisted.internet import defer
  15. import threading
  16. import logging
  17. logger = logging.getLogger(__name__)
  18. try:
  19. import resource
  20. # Python doesn't ship with a definition of RUSAGE_THREAD but it's defined
  21. # to be 1 on linux so we hard code it.
  22. RUSAGE_THREAD = 1
  23. # If the system doesn't support RUSAGE_THREAD then this should throw an
  24. # exception.
  25. resource.getrusage(RUSAGE_THREAD)
  26. def get_thread_resource_usage():
  27. return resource.getrusage(RUSAGE_THREAD)
  28. except:
  29. # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we
  30. # won't track resource usage by returning None.
  31. def get_thread_resource_usage():
  32. return None
  33. class LoggingContext(object):
  34. """Additional context for log formatting. Contexts are scoped within a
  35. "with" block.
  36. Args:
  37. name (str): Name for the context for debugging.
  38. """
  39. __slots__ = [
  40. "previous_context", "name", "usage_start", "usage_end", "main_thread",
  41. "__dict__", "tag", "alive",
  42. ]
  43. thread_local = threading.local()
  44. class Sentinel(object):
  45. """Sentinel to represent the root context"""
  46. __slots__ = []
  47. def __str__(self):
  48. return "sentinel"
  49. def copy_to(self, record):
  50. pass
  51. def start(self):
  52. pass
  53. def stop(self):
  54. pass
  55. def add_database_transaction(self, duration_ms):
  56. pass
  57. def __nonzero__(self):
  58. return False
  59. sentinel = Sentinel()
  60. def __init__(self, name=None):
  61. self.previous_context = LoggingContext.current_context()
  62. self.name = name
  63. self.ru_stime = 0.
  64. self.ru_utime = 0.
  65. self.db_txn_count = 0
  66. self.db_txn_duration = 0.
  67. self.usage_start = None
  68. self.main_thread = threading.current_thread()
  69. self.tag = ""
  70. self.alive = True
  71. def __str__(self):
  72. return "%s@%x" % (self.name, id(self))
  73. @classmethod
  74. def current_context(cls):
  75. """Get the current logging context from thread local storage"""
  76. return getattr(cls.thread_local, "current_context", cls.sentinel)
  77. @classmethod
  78. def set_current_context(cls, context):
  79. """Set the current logging context in thread local storage
  80. Args:
  81. context(LoggingContext): The context to activate.
  82. Returns:
  83. The context that was previously active
  84. """
  85. current = cls.current_context()
  86. if current is not context:
  87. current.stop()
  88. cls.thread_local.current_context = context
  89. context.start()
  90. return current
  91. def __enter__(self):
  92. """Enters this logging context into thread local storage"""
  93. old_context = self.set_current_context(self)
  94. if self.previous_context != old_context:
  95. logger.warn(
  96. "Expected previous context %r, found %r",
  97. self.previous_context, old_context
  98. )
  99. self.alive = True
  100. return self
  101. def __exit__(self, type, value, traceback):
  102. """Restore the logging context in thread local storage to the state it
  103. was before this context was entered.
  104. Returns:
  105. None to avoid suppressing any exeptions that were thrown.
  106. """
  107. current = self.set_current_context(self.previous_context)
  108. if current is not self:
  109. if current is self.sentinel:
  110. logger.debug("Expected logging context %s has been lost", self)
  111. else:
  112. logger.warn(
  113. "Current logging context %s is not expected context %s",
  114. current,
  115. self
  116. )
  117. self.previous_context = None
  118. self.alive = False
  119. def copy_to(self, record):
  120. """Copy fields from this context to the record"""
  121. for key, value in self.__dict__.items():
  122. setattr(record, key, value)
  123. record.ru_utime, record.ru_stime = self.get_resource_usage()
  124. def start(self):
  125. if threading.current_thread() is not self.main_thread:
  126. return
  127. if self.usage_start and self.usage_end:
  128. self.ru_utime += self.usage_end.ru_utime - self.usage_start.ru_utime
  129. self.ru_stime += self.usage_end.ru_stime - self.usage_start.ru_stime
  130. self.usage_start = None
  131. self.usage_end = None
  132. if not self.usage_start:
  133. self.usage_start = get_thread_resource_usage()
  134. def stop(self):
  135. if threading.current_thread() is not self.main_thread:
  136. return
  137. if self.usage_start:
  138. self.usage_end = get_thread_resource_usage()
  139. def get_resource_usage(self):
  140. ru_utime = self.ru_utime
  141. ru_stime = self.ru_stime
  142. if self.usage_start and threading.current_thread() is self.main_thread:
  143. current = get_thread_resource_usage()
  144. ru_utime += current.ru_utime - self.usage_start.ru_utime
  145. ru_stime += current.ru_stime - self.usage_start.ru_stime
  146. return ru_utime, ru_stime
  147. def add_database_transaction(self, duration_ms):
  148. self.db_txn_count += 1
  149. self.db_txn_duration += duration_ms / 1000.
  150. class LoggingContextFilter(logging.Filter):
  151. """Logging filter that adds values from the current logging context to each
  152. record.
  153. Args:
  154. **defaults: Default values to avoid formatters complaining about
  155. missing fields
  156. """
  157. def __init__(self, **defaults):
  158. self.defaults = defaults
  159. def filter(self, record):
  160. """Add each fields from the logging contexts to the record.
  161. Returns:
  162. True to include the record in the log output.
  163. """
  164. context = LoggingContext.current_context()
  165. for key, value in self.defaults.items():
  166. setattr(record, key, value)
  167. context.copy_to(record)
  168. return True
  169. class PreserveLoggingContext(object):
  170. """Captures the current logging context and restores it when the scope is
  171. exited. Used to restore the context after a function using
  172. @defer.inlineCallbacks is resumed by a callback from the reactor."""
  173. __slots__ = ["current_context", "new_context", "has_parent"]
  174. def __init__(self, new_context=LoggingContext.sentinel):
  175. self.new_context = new_context
  176. def __enter__(self):
  177. """Captures the current logging context"""
  178. self.current_context = LoggingContext.set_current_context(
  179. self.new_context
  180. )
  181. if self.current_context:
  182. self.has_parent = self.current_context.previous_context is not None
  183. if not self.current_context.alive:
  184. logger.debug(
  185. "Entering dead context: %s",
  186. self.current_context,
  187. )
  188. def __exit__(self, type, value, traceback):
  189. """Restores the current logging context"""
  190. context = LoggingContext.set_current_context(self.current_context)
  191. if context != self.new_context:
  192. logger.debug(
  193. "Unexpected logging context: %s is not %s",
  194. context, self.new_context,
  195. )
  196. if self.current_context is not LoggingContext.sentinel:
  197. if not self.current_context.alive:
  198. logger.debug(
  199. "Restoring dead context: %s",
  200. self.current_context,
  201. )
  202. class _PreservingContextDeferred(defer.Deferred):
  203. """A deferred that ensures that all callbacks and errbacks are called with
  204. the given logging context.
  205. """
  206. def __init__(self, context):
  207. self._log_context = context
  208. defer.Deferred.__init__(self)
  209. def addCallbacks(self, callback, errback=None,
  210. callbackArgs=None, callbackKeywords=None,
  211. errbackArgs=None, errbackKeywords=None):
  212. callback = self._wrap_callback(callback)
  213. errback = self._wrap_callback(errback)
  214. return defer.Deferred.addCallbacks(
  215. self, callback,
  216. errback=errback,
  217. callbackArgs=callbackArgs,
  218. callbackKeywords=callbackKeywords,
  219. errbackArgs=errbackArgs,
  220. errbackKeywords=errbackKeywords,
  221. )
  222. def _wrap_callback(self, f):
  223. def g(res, *args, **kwargs):
  224. with PreserveLoggingContext(self._log_context):
  225. res = f(res, *args, **kwargs)
  226. return res
  227. return g
  228. def preserve_context_over_fn(fn, *args, **kwargs):
  229. """Takes a function and invokes it with the given arguments, but removes
  230. and restores the current logging context while doing so.
  231. If the result is a deferred, call preserve_context_over_deferred before
  232. returning it.
  233. """
  234. with PreserveLoggingContext():
  235. res = fn(*args, **kwargs)
  236. if isinstance(res, defer.Deferred):
  237. return preserve_context_over_deferred(res)
  238. else:
  239. return res
  240. def preserve_context_over_deferred(deferred, context=None):
  241. """Given a deferred wrap it such that any callbacks added later to it will
  242. be invoked with the current context.
  243. """
  244. if context is None:
  245. context = LoggingContext.current_context()
  246. d = _PreservingContextDeferred(context)
  247. deferred.chainDeferred(d)
  248. return d
  249. def preserve_fn(f):
  250. """Ensures that function is called with correct context and that context is
  251. restored after return. Useful for wrapping functions that return a deferred
  252. which you don't yield on.
  253. """
  254. current = LoggingContext.current_context()
  255. def g(*args, **kwargs):
  256. with PreserveLoggingContext(current):
  257. res = f(*args, **kwargs)
  258. if isinstance(res, defer.Deferred):
  259. return preserve_context_over_deferred(
  260. res, context=LoggingContext.sentinel
  261. )
  262. else:
  263. return res
  264. return g
  265. # modules to ignore in `logcontext_tracer`
  266. _to_ignore = [
  267. "synapse.util.logcontext",
  268. "synapse.http.server",
  269. "synapse.storage._base",
  270. "synapse.util.async",
  271. ]
  272. def logcontext_tracer(frame, event, arg):
  273. """A tracer that logs whenever a logcontext "unexpectedly" changes within
  274. a function. Probably inaccurate.
  275. Use by calling `sys.settrace(logcontext_tracer)` in the main thread.
  276. """
  277. if event == 'call':
  278. name = frame.f_globals["__name__"]
  279. if name.startswith("synapse"):
  280. if name == "synapse.util.logcontext":
  281. if frame.f_code.co_name in ["__enter__", "__exit__"]:
  282. tracer = frame.f_back.f_trace
  283. if tracer:
  284. tracer.just_changed = True
  285. tracer = frame.f_trace
  286. if tracer:
  287. return tracer
  288. if not any(name.startswith(ig) for ig in _to_ignore):
  289. return LineTracer()
  290. class LineTracer(object):
  291. __slots__ = ["context", "just_changed"]
  292. def __init__(self):
  293. self.context = LoggingContext.current_context()
  294. self.just_changed = False
  295. def __call__(self, frame, event, arg):
  296. if event in 'line':
  297. if self.just_changed:
  298. self.context = LoggingContext.current_context()
  299. self.just_changed = False
  300. else:
  301. c = LoggingContext.current_context()
  302. if c != self.context:
  303. logger.info(
  304. "Context changed! %s -> %s, %s, %s",
  305. self.context, c,
  306. frame.f_code.co_filename, frame.f_lineno
  307. )
  308. self.context = c
  309. return self