123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128 |
- # -*- coding: utf-8 -*-
- # Copyright 2014-2016 OpenMarket Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- import twisted
- import twisted.logger
- from twisted.trial import unittest
- from synapse.util.logcontext import LoggingContextFilter
- # Set up putting Synapse's logs into Trial's.
- rootLogger = logging.getLogger()
- log_format = (
- "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s - %(message)s"
- )
- class ToTwistedHandler(logging.Handler):
- tx_log = twisted.logger.Logger()
- def emit(self, record):
- log_entry = self.format(record)
- log_level = record.levelname.lower().replace('warning', 'warn')
- self.tx_log.emit(
- twisted.logger.LogLevel.levelWithName(log_level),
- log_entry.replace("{", r"(").replace("}", r")"),
- )
- handler = ToTwistedHandler()
- formatter = logging.Formatter(log_format)
- handler.setFormatter(formatter)
- handler.addFilter(LoggingContextFilter(request=""))
- rootLogger.addHandler(handler)
- def around(target):
- """A CLOS-style 'around' modifier, which wraps the original method of the
- given instance with another piece of code.
- @around(self)
- def method_name(orig, *args, **kwargs):
- return orig(*args, **kwargs)
- """
- def _around(code):
- name = code.__name__
- orig = getattr(target, name)
- def new(*args, **kwargs):
- return code(orig, *args, **kwargs)
- setattr(target, name, new)
- return _around
- class TestCase(unittest.TestCase):
- """A subclass of twisted.trial's TestCase which looks for 'loglevel'
- attributes on both itself and its individual test methods, to override the
- root logger's logging level while that test (case|method) runs."""
- def __init__(self, methodName, *args, **kwargs):
- super(TestCase, self).__init__(methodName, *args, **kwargs)
- method = getattr(self, methodName)
- level = getattr(method, "loglevel", getattr(self, "loglevel", logging.ERROR))
- @around(self)
- def setUp(orig):
- # enable debugging of delayed calls - this means that we get a
- # traceback when a unit test exits leaving things on the reactor.
- twisted.internet.base.DelayedCall.debug = True
- old_level = logging.getLogger().level
- if old_level != level:
- @around(self)
- def tearDown(orig):
- ret = orig()
- logging.getLogger().setLevel(old_level)
- return ret
- logging.getLogger().setLevel(level)
- return orig()
- def assertObjectHasAttributes(self, attrs, obj):
- """Asserts that the given object has each of the attributes given, and
- that the value of each matches according to assertEquals."""
- for (key, value) in attrs.items():
- if not hasattr(obj, key):
- raise AssertionError("Expected obj to have a '.%s'" % key)
- try:
- self.assertEquals(attrs[key], getattr(obj, key))
- except AssertionError as e:
- raise (type(e))(e.message + " for '.%s'" % key)
- def assert_dict(self, required, actual):
- """Does a partial assert of a dict.
- Args:
- required (dict): The keys and value which MUST be in 'actual'.
- actual (dict): The test result. Extra keys will not be checked.
- """
- for key in required:
- self.assertEquals(required[key], actual[key],
- msg="%s mismatch. %s" % (key, actual))
- def DEBUG(target):
- """A decorator to set the .loglevel attribute to logging.DEBUG.
- Can apply to either a TestCase or an individual test method."""
- target.loglevel = logging.DEBUG
- return target
|