context.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718
  1. # Copyright 2014-2016 OpenMarket Ltd
  2. # Copyright 2019 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """ Thread-local-alike tracking of log contexts within synapse
  16. This module provides objects and utilities for tracking contexts through
  17. synapse code, so that log lines can include a request identifier, and so that
  18. CPU and database activity can be accounted for against the request that caused
  19. them.
  20. See doc/log_contexts.rst for details on how this works.
  21. """
  22. import logging
  23. import threading
  24. import types
  25. from typing import Any, List
  26. from twisted.internet import defer, threads
  27. logger = logging.getLogger(__name__)
  28. try:
  29. import resource
  30. # Python doesn't ship with a definition of RUSAGE_THREAD but it's defined
  31. # to be 1 on linux so we hard code it.
  32. RUSAGE_THREAD = 1
  33. # If the system doesn't support RUSAGE_THREAD then this should throw an
  34. # exception.
  35. resource.getrusage(RUSAGE_THREAD)
  36. is_thread_resource_usage_supported = True
  37. def get_thread_resource_usage():
  38. return resource.getrusage(RUSAGE_THREAD)
  39. except Exception:
  40. # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we
  41. # won't track resource usage.
  42. is_thread_resource_usage_supported = False
  43. def get_thread_resource_usage():
  44. return None
  45. # get an id for the current thread.
  46. #
  47. # threading.get_ident doesn't actually return an OS-level tid, and annoyingly,
  48. # on Linux it actually returns the same value either side of a fork() call. However
  49. # we only fork in one place, so it's not worth the hoop-jumping to get a real tid.
  50. #
  51. get_thread_id = threading.get_ident
  52. class ContextResourceUsage(object):
  53. """Object for tracking the resources used by a log context
  54. Attributes:
  55. ru_utime (float): user CPU time (in seconds)
  56. ru_stime (float): system CPU time (in seconds)
  57. db_txn_count (int): number of database transactions done
  58. db_sched_duration_sec (float): amount of time spent waiting for a
  59. database connection
  60. db_txn_duration_sec (float): amount of time spent doing database
  61. transactions (excluding scheduling time)
  62. evt_db_fetch_count (int): number of events requested from the database
  63. """
  64. __slots__ = [
  65. "ru_stime",
  66. "ru_utime",
  67. "db_txn_count",
  68. "db_txn_duration_sec",
  69. "db_sched_duration_sec",
  70. "evt_db_fetch_count",
  71. ]
  72. def __init__(self, copy_from=None):
  73. """Create a new ContextResourceUsage
  74. Args:
  75. copy_from (ContextResourceUsage|None): if not None, an object to
  76. copy stats from
  77. """
  78. if copy_from is None:
  79. self.reset()
  80. else:
  81. self.ru_utime = copy_from.ru_utime
  82. self.ru_stime = copy_from.ru_stime
  83. self.db_txn_count = copy_from.db_txn_count
  84. self.db_txn_duration_sec = copy_from.db_txn_duration_sec
  85. self.db_sched_duration_sec = copy_from.db_sched_duration_sec
  86. self.evt_db_fetch_count = copy_from.evt_db_fetch_count
  87. def copy(self):
  88. return ContextResourceUsage(copy_from=self)
  89. def reset(self):
  90. self.ru_stime = 0.0
  91. self.ru_utime = 0.0
  92. self.db_txn_count = 0
  93. self.db_txn_duration_sec = 0
  94. self.db_sched_duration_sec = 0
  95. self.evt_db_fetch_count = 0
  96. def __repr__(self):
  97. return (
  98. "<ContextResourceUsage ru_stime='%r', ru_utime='%r', "
  99. "db_txn_count='%r', db_txn_duration_sec='%r', "
  100. "db_sched_duration_sec='%r', evt_db_fetch_count='%r'>"
  101. ) % (
  102. self.ru_stime,
  103. self.ru_utime,
  104. self.db_txn_count,
  105. self.db_txn_duration_sec,
  106. self.db_sched_duration_sec,
  107. self.evt_db_fetch_count,
  108. )
  109. def __iadd__(self, other):
  110. """Add another ContextResourceUsage's stats to this one's.
  111. Args:
  112. other (ContextResourceUsage): the other resource usage object
  113. """
  114. self.ru_utime += other.ru_utime
  115. self.ru_stime += other.ru_stime
  116. self.db_txn_count += other.db_txn_count
  117. self.db_txn_duration_sec += other.db_txn_duration_sec
  118. self.db_sched_duration_sec += other.db_sched_duration_sec
  119. self.evt_db_fetch_count += other.evt_db_fetch_count
  120. return self
  121. def __isub__(self, other):
  122. self.ru_utime -= other.ru_utime
  123. self.ru_stime -= other.ru_stime
  124. self.db_txn_count -= other.db_txn_count
  125. self.db_txn_duration_sec -= other.db_txn_duration_sec
  126. self.db_sched_duration_sec -= other.db_sched_duration_sec
  127. self.evt_db_fetch_count -= other.evt_db_fetch_count
  128. return self
  129. def __add__(self, other):
  130. res = ContextResourceUsage(copy_from=self)
  131. res += other
  132. return res
  133. def __sub__(self, other):
  134. res = ContextResourceUsage(copy_from=self)
  135. res -= other
  136. return res
  137. class LoggingContext(object):
  138. """Additional context for log formatting. Contexts are scoped within a
  139. "with" block.
  140. If a parent is given when creating a new context, then:
  141. - logging fields are copied from the parent to the new context on entry
  142. - when the new context exits, the cpu usage stats are copied from the
  143. child to the parent
  144. Args:
  145. name (str): Name for the context for debugging.
  146. parent_context (LoggingContext|None): The parent of the new context
  147. """
  148. __slots__ = [
  149. "previous_context",
  150. "name",
  151. "parent_context",
  152. "_resource_usage",
  153. "usage_start",
  154. "main_thread",
  155. "alive",
  156. "request",
  157. "tag",
  158. "scope",
  159. ]
  160. thread_local = threading.local()
  161. class Sentinel(object):
  162. """Sentinel to represent the root context"""
  163. __slots__ = [] # type: List[Any]
  164. def __str__(self):
  165. return "sentinel"
  166. def copy_to(self, record):
  167. pass
  168. def copy_to_twisted_log_entry(self, record):
  169. record["request"] = None
  170. record["scope"] = None
  171. def start(self):
  172. pass
  173. def stop(self):
  174. pass
  175. def add_database_transaction(self, duration_sec):
  176. pass
  177. def add_database_scheduled(self, sched_sec):
  178. pass
  179. def record_event_fetch(self, event_count):
  180. pass
  181. def __nonzero__(self):
  182. return False
  183. __bool__ = __nonzero__ # python3
  184. sentinel = Sentinel()
  185. def __init__(self, name=None, parent_context=None, request=None):
  186. self.previous_context = LoggingContext.current_context()
  187. self.name = name
  188. # track the resources used by this context so far
  189. self._resource_usage = ContextResourceUsage()
  190. # If alive has the thread resource usage when the logcontext last
  191. # became active.
  192. self.usage_start = None
  193. self.main_thread = get_thread_id()
  194. self.request = None
  195. self.tag = ""
  196. self.alive = True
  197. self.scope = None
  198. self.parent_context = parent_context
  199. if self.parent_context is not None:
  200. self.parent_context.copy_to(self)
  201. if request is not None:
  202. # the request param overrides the request from the parent context
  203. self.request = request
  204. def __str__(self):
  205. if self.request:
  206. return str(self.request)
  207. return "%s@%x" % (self.name, id(self))
  208. @classmethod
  209. def current_context(cls):
  210. """Get the current logging context from thread local storage
  211. Returns:
  212. LoggingContext: the current logging context
  213. """
  214. return getattr(cls.thread_local, "current_context", cls.sentinel)
  215. @classmethod
  216. def set_current_context(cls, context):
  217. """Set the current logging context in thread local storage
  218. Args:
  219. context(LoggingContext): The context to activate.
  220. Returns:
  221. The context that was previously active
  222. """
  223. current = cls.current_context()
  224. if current is not context:
  225. current.stop()
  226. cls.thread_local.current_context = context
  227. context.start()
  228. return current
  229. def __enter__(self):
  230. """Enters this logging context into thread local storage"""
  231. old_context = self.set_current_context(self)
  232. if self.previous_context != old_context:
  233. logger.warning(
  234. "Expected previous context %r, found %r",
  235. self.previous_context,
  236. old_context,
  237. )
  238. self.alive = True
  239. return self
  240. def __exit__(self, type, value, traceback):
  241. """Restore the logging context in thread local storage to the state it
  242. was before this context was entered.
  243. Returns:
  244. None to avoid suppressing any exceptions that were thrown.
  245. """
  246. current = self.set_current_context(self.previous_context)
  247. if current is not self:
  248. if current is self.sentinel:
  249. logger.warning("Expected logging context %s was lost", self)
  250. else:
  251. logger.warning(
  252. "Expected logging context %s but found %s", self, current
  253. )
  254. self.previous_context = None
  255. self.alive = False
  256. # if we have a parent, pass our CPU usage stats on
  257. if self.parent_context is not None and hasattr(
  258. self.parent_context, "_resource_usage"
  259. ):
  260. self.parent_context._resource_usage += self._resource_usage
  261. # reset them in case we get entered again
  262. self._resource_usage.reset()
  263. def copy_to(self, record):
  264. """Copy logging fields from this context to a log record or
  265. another LoggingContext
  266. """
  267. # we track the current request
  268. record.request = self.request
  269. # we also track the current scope:
  270. record.scope = self.scope
  271. def copy_to_twisted_log_entry(self, record):
  272. """
  273. Copy logging fields from this context to a Twisted log record.
  274. """
  275. record["request"] = self.request
  276. record["scope"] = self.scope
  277. def start(self):
  278. if get_thread_id() != self.main_thread:
  279. logger.warning("Started logcontext %s on different thread", self)
  280. return
  281. # If we haven't already started record the thread resource usage so
  282. # far
  283. if not self.usage_start:
  284. self.usage_start = get_thread_resource_usage()
  285. def stop(self):
  286. if get_thread_id() != self.main_thread:
  287. logger.warning("Stopped logcontext %s on different thread", self)
  288. return
  289. # When we stop, let's record the cpu used since we started
  290. if not self.usage_start:
  291. # Log a warning on platforms that support thread usage tracking
  292. if is_thread_resource_usage_supported:
  293. logger.warning(
  294. "Called stop on logcontext %s without calling start", self
  295. )
  296. return
  297. utime_delta, stime_delta = self._get_cputime()
  298. self._resource_usage.ru_utime += utime_delta
  299. self._resource_usage.ru_stime += stime_delta
  300. self.usage_start = None
  301. def get_resource_usage(self):
  302. """Get resources used by this logcontext so far.
  303. Returns:
  304. ContextResourceUsage: a *copy* of the object tracking resource
  305. usage so far
  306. """
  307. # we always return a copy, for consistency
  308. res = self._resource_usage.copy()
  309. # If we are on the correct thread and we're currently running then we
  310. # can include resource usage so far.
  311. is_main_thread = get_thread_id() == self.main_thread
  312. if self.alive and self.usage_start and is_main_thread:
  313. utime_delta, stime_delta = self._get_cputime()
  314. res.ru_utime += utime_delta
  315. res.ru_stime += stime_delta
  316. return res
  317. def _get_cputime(self):
  318. """Get the cpu usage time so far
  319. Returns: Tuple[float, float]: seconds in user mode, seconds in system mode
  320. """
  321. current = get_thread_resource_usage()
  322. utime_delta = current.ru_utime - self.usage_start.ru_utime
  323. stime_delta = current.ru_stime - self.usage_start.ru_stime
  324. # sanity check
  325. if utime_delta < 0:
  326. logger.error(
  327. "utime went backwards! %f < %f",
  328. current.ru_utime,
  329. self.usage_start.ru_utime,
  330. )
  331. utime_delta = 0
  332. if stime_delta < 0:
  333. logger.error(
  334. "stime went backwards! %f < %f",
  335. current.ru_stime,
  336. self.usage_start.ru_stime,
  337. )
  338. stime_delta = 0
  339. return utime_delta, stime_delta
  340. def add_database_transaction(self, duration_sec):
  341. if duration_sec < 0:
  342. raise ValueError("DB txn time can only be non-negative")
  343. self._resource_usage.db_txn_count += 1
  344. self._resource_usage.db_txn_duration_sec += duration_sec
  345. def add_database_scheduled(self, sched_sec):
  346. """Record a use of the database pool
  347. Args:
  348. sched_sec (float): number of seconds it took us to get a
  349. connection
  350. """
  351. if sched_sec < 0:
  352. raise ValueError("DB scheduling time can only be non-negative")
  353. self._resource_usage.db_sched_duration_sec += sched_sec
  354. def record_event_fetch(self, event_count):
  355. """Record a number of events being fetched from the db
  356. Args:
  357. event_count (int): number of events being fetched
  358. """
  359. self._resource_usage.evt_db_fetch_count += event_count
  360. class LoggingContextFilter(logging.Filter):
  361. """Logging filter that adds values from the current logging context to each
  362. record.
  363. Args:
  364. **defaults: Default values to avoid formatters complaining about
  365. missing fields
  366. """
  367. def __init__(self, **defaults):
  368. self.defaults = defaults
  369. def filter(self, record):
  370. """Add each fields from the logging contexts to the record.
  371. Returns:
  372. True to include the record in the log output.
  373. """
  374. context = LoggingContext.current_context()
  375. for key, value in self.defaults.items():
  376. setattr(record, key, value)
  377. # context should never be None, but if it somehow ends up being, then
  378. # we end up in a death spiral of infinite loops, so let's check, for
  379. # robustness' sake.
  380. if context is not None:
  381. context.copy_to(record)
  382. return True
  383. class PreserveLoggingContext(object):
  384. """Captures the current logging context and restores it when the scope is
  385. exited. Used to restore the context after a function using
  386. @defer.inlineCallbacks is resumed by a callback from the reactor."""
  387. __slots__ = ["current_context", "new_context", "has_parent"]
  388. def __init__(self, new_context=None):
  389. if new_context is None:
  390. new_context = LoggingContext.sentinel
  391. self.new_context = new_context
  392. def __enter__(self):
  393. """Captures the current logging context"""
  394. self.current_context = LoggingContext.set_current_context(self.new_context)
  395. if self.current_context:
  396. self.has_parent = self.current_context.previous_context is not None
  397. if not self.current_context.alive:
  398. logger.debug("Entering dead context: %s", self.current_context)
  399. def __exit__(self, type, value, traceback):
  400. """Restores the current logging context"""
  401. context = LoggingContext.set_current_context(self.current_context)
  402. if context != self.new_context:
  403. if context is LoggingContext.sentinel:
  404. logger.warning("Expected logging context %s was lost", self.new_context)
  405. else:
  406. logger.warning(
  407. "Expected logging context %s but found %s",
  408. self.new_context,
  409. context,
  410. )
  411. if self.current_context is not LoggingContext.sentinel:
  412. if not self.current_context.alive:
  413. logger.debug("Restoring dead context: %s", self.current_context)
  414. def nested_logging_context(suffix, parent_context=None):
  415. """Creates a new logging context as a child of another.
  416. The nested logging context will have a 'request' made up of the parent context's
  417. request, plus the given suffix.
  418. CPU/db usage stats will be added to the parent context's on exit.
  419. Normal usage looks like:
  420. with nested_logging_context(suffix):
  421. # ... do stuff
  422. Args:
  423. suffix (str): suffix to add to the parent context's 'request'.
  424. parent_context (LoggingContext|None): parent context. Will use the current context
  425. if None.
  426. Returns:
  427. LoggingContext: new logging context.
  428. """
  429. if parent_context is None:
  430. parent_context = LoggingContext.current_context()
  431. return LoggingContext(
  432. parent_context=parent_context, request=parent_context.request + "-" + suffix
  433. )
  434. def preserve_fn(f):
  435. """Function decorator which wraps the function with run_in_background"""
  436. def g(*args, **kwargs):
  437. return run_in_background(f, *args, **kwargs)
  438. return g
  439. def run_in_background(f, *args, **kwargs):
  440. """Calls a function, ensuring that the current context is restored after
  441. return from the function, and that the sentinel context is set once the
  442. deferred returned by the function completes.
  443. Useful for wrapping functions that return a deferred or coroutine, which you don't
  444. yield or await on (for instance because you want to pass it to
  445. deferred.gatherResults()).
  446. Note that if you completely discard the result, you should make sure that
  447. `f` doesn't raise any deferred exceptions, otherwise a scary-looking
  448. CRITICAL error about an unhandled error will be logged without much
  449. indication about where it came from.
  450. """
  451. current = LoggingContext.current_context()
  452. try:
  453. res = f(*args, **kwargs)
  454. except: # noqa: E722
  455. # the assumption here is that the caller doesn't want to be disturbed
  456. # by synchronous exceptions, so let's turn them into Failures.
  457. return defer.fail()
  458. if isinstance(res, types.CoroutineType):
  459. res = defer.ensureDeferred(res)
  460. if not isinstance(res, defer.Deferred):
  461. return res
  462. if res.called and not res.paused:
  463. # The function should have maintained the logcontext, so we can
  464. # optimise out the messing about
  465. return res
  466. # The function may have reset the context before returning, so
  467. # we need to restore it now.
  468. ctx = LoggingContext.set_current_context(current)
  469. # The original context will be restored when the deferred
  470. # completes, but there is nothing waiting for it, so it will
  471. # get leaked into the reactor or some other function which
  472. # wasn't expecting it. We therefore need to reset the context
  473. # here.
  474. #
  475. # (If this feels asymmetric, consider it this way: we are
  476. # effectively forking a new thread of execution. We are
  477. # probably currently within a ``with LoggingContext()`` block,
  478. # which is supposed to have a single entry and exit point. But
  479. # by spawning off another deferred, we are effectively
  480. # adding a new exit point.)
  481. res.addBoth(_set_context_cb, ctx)
  482. return res
  483. def make_deferred_yieldable(deferred):
  484. """Given a deferred, make it follow the Synapse logcontext rules:
  485. If the deferred has completed (or is not actually a Deferred), essentially
  486. does nothing (just returns another completed deferred with the
  487. result/failure).
  488. If the deferred has not yet completed, resets the logcontext before
  489. returning a deferred. Then, when the deferred completes, restores the
  490. current logcontext before running callbacks/errbacks.
  491. (This is more-or-less the opposite operation to run_in_background.)
  492. """
  493. if not isinstance(deferred, defer.Deferred):
  494. return deferred
  495. if deferred.called and not deferred.paused:
  496. # it looks like this deferred is ready to run any callbacks we give it
  497. # immediately. We may as well optimise out the logcontext faffery.
  498. return deferred
  499. # ok, we can't be sure that a yield won't block, so let's reset the
  500. # logcontext, and add a callback to the deferred to restore it.
  501. prev_context = LoggingContext.set_current_context(LoggingContext.sentinel)
  502. deferred.addBoth(_set_context_cb, prev_context)
  503. return deferred
  504. def _set_context_cb(result, context):
  505. """A callback function which just sets the logging context"""
  506. LoggingContext.set_current_context(context)
  507. return result
  508. def defer_to_thread(reactor, f, *args, **kwargs):
  509. """
  510. Calls the function `f` using a thread from the reactor's default threadpool and
  511. returns the result as a Deferred.
  512. Creates a new logcontext for `f`, which is created as a child of the current
  513. logcontext (so its CPU usage metrics will get attributed to the current
  514. logcontext). `f` should preserve the logcontext it is given.
  515. The result deferred follows the Synapse logcontext rules: you should `yield`
  516. on it.
  517. Args:
  518. reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
  519. the Deferred will be invoked, and whose threadpool we should use for the
  520. function.
  521. Normally this will be hs.get_reactor().
  522. f (callable): The function to call.
  523. args: positional arguments to pass to f.
  524. kwargs: keyword arguments to pass to f.
  525. Returns:
  526. Deferred: A Deferred which fires a callback with the result of `f`, or an
  527. errback if `f` throws an exception.
  528. """
  529. return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs)
  530. def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs):
  531. """
  532. A wrapper for twisted.internet.threads.deferToThreadpool, which handles
  533. logcontexts correctly.
  534. Calls the function `f` using a thread from the given threadpool and returns
  535. the result as a Deferred.
  536. Creates a new logcontext for `f`, which is created as a child of the current
  537. logcontext (so its CPU usage metrics will get attributed to the current
  538. logcontext). `f` should preserve the logcontext it is given.
  539. The result deferred follows the Synapse logcontext rules: you should `yield`
  540. on it.
  541. Args:
  542. reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread
  543. the Deferred will be invoked. Normally this will be hs.get_reactor().
  544. threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for
  545. running `f`. Normally this will be hs.get_reactor().getThreadPool().
  546. f (callable): The function to call.
  547. args: positional arguments to pass to f.
  548. kwargs: keyword arguments to pass to f.
  549. Returns:
  550. Deferred: A Deferred which fires a callback with the result of `f`, or an
  551. errback if `f` throws an exception.
  552. """
  553. logcontext = LoggingContext.current_context()
  554. def g():
  555. with LoggingContext(parent_context=logcontext):
  556. return f(*args, **kwargs)
  557. return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g))