123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200 |
- # -*- coding: utf-8 -*-
- # Copyright 2014-2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import inspect
- import logging
- import time
- from functools import wraps
- from inspect import getcallargs
- _TIME_FUNC_ID = 0
- def _log_debug_as_f(f, msg, msg_args):
- name = f.__module__
- logger = logging.getLogger(name)
- if logger.isEnabledFor(logging.DEBUG):
- lineno = f.__code__.co_firstlineno
- pathname = f.__code__.co_filename
- record = logging.LogRecord(
- name=name,
- level=logging.DEBUG,
- pathname=pathname,
- lineno=lineno,
- msg=msg,
- args=msg_args,
- exc_info=None,
- )
- logger.handle(record)
- def log_function(f):
- """ Function decorator that logs every call to that function.
- """
- func_name = f.__name__
- @wraps(f)
- def wrapped(*args, **kwargs):
- name = f.__module__
- logger = logging.getLogger(name)
- level = logging.DEBUG
- if logger.isEnabledFor(level):
- bound_args = getcallargs(f, *args, **kwargs)
- def format(value):
- r = str(value)
- if len(r) > 50:
- r = r[:50] + "..."
- return r
- func_args = ["%s=%s" % (k, format(v)) for k, v in bound_args.items()]
- msg_args = {"func_name": func_name, "args": ", ".join(func_args)}
- _log_debug_as_f(f, "Invoked '%(func_name)s' with args: %(args)s", msg_args)
- return f(*args, **kwargs)
- wrapped.__name__ = func_name
- return wrapped
- def time_function(f):
- func_name = f.__name__
- @wraps(f)
- def wrapped(*args, **kwargs):
- global _TIME_FUNC_ID
- id = _TIME_FUNC_ID
- _TIME_FUNC_ID += 1
- start = time.clock()
- try:
- _log_debug_as_f(f, "[FUNC START] {%s-%d}", (func_name, id))
- r = f(*args, **kwargs)
- finally:
- end = time.clock()
- _log_debug_as_f(
- f, "[FUNC END] {%s-%d} %.3f sec", (func_name, id, end - start)
- )
- return r
- return wrapped
- def trace_function(f):
- func_name = f.__name__
- linenum = f.func_code.co_firstlineno
- pathname = f.func_code.co_filename
- @wraps(f)
- def wrapped(*args, **kwargs):
- name = f.__module__
- logger = logging.getLogger(name)
- level = logging.DEBUG
- frame = inspect.currentframe()
- if frame is None:
- raise Exception("Can't get current frame!")
- s = frame.f_back
- to_print = [
- "\t%s:%s %s. Args: args=%s, kwargs=%s"
- % (pathname, linenum, func_name, args, kwargs)
- ]
- while s:
- if True or s.f_globals["__name__"].startswith("synapse"):
- filename, lineno, function, _, _ = inspect.getframeinfo(s)
- args_string = inspect.formatargvalues(*inspect.getargvalues(s))
- to_print.append(
- "\t%s:%d %s. Args: %s" % (filename, lineno, function, args_string)
- )
- s = s.f_back
- msg = "\nTraceback for %s:\n" % (func_name,) + "\n".join(to_print)
- record = logging.LogRecord(
- name=name,
- level=level,
- pathname=pathname,
- lineno=lineno,
- msg=msg,
- args=(),
- exc_info=None,
- )
- logger.handle(record)
- return f(*args, **kwargs)
- wrapped.__name__ = func_name
- return wrapped
- def get_previous_frames():
- frame = inspect.currentframe()
- if frame is None:
- raise Exception("Can't get current frame!")
- s = frame.f_back.f_back
- to_return = []
- while s:
- if s.f_globals["__name__"].startswith("synapse"):
- filename, lineno, function, _, _ = inspect.getframeinfo(s)
- args_string = inspect.formatargvalues(*inspect.getargvalues(s))
- to_return.append(
- "{{ %s:%d %s - Args: %s }}" % (filename, lineno, function, args_string)
- )
- s = s.f_back
- return ", ".join(to_return)
- def get_previous_frame(ignore=[]):
- frame = inspect.currentframe()
- if frame is None:
- raise Exception("Can't get current frame!")
- s = frame.f_back.f_back
- while s:
- if s.f_globals["__name__"].startswith("synapse"):
- if not any(s.f_globals["__name__"].startswith(ig) for ig in ignore):
- filename, lineno, function, _, _ = inspect.getframeinfo(s)
- args_string = inspect.formatargvalues(*inspect.getargvalues(s))
- return "{{ %s:%d %s - Args: %s }}" % (
- filename,
- lineno,
- function,
- args_string,
- )
- s = s.f_back
- return None
|