123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- # -*- coding: utf-8 -*-
- # Copyright 2019 The Matrix.org Foundation C.I.C.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import logging
- import os.path
- import sys
- import typing
- import warnings
- from typing import List
- import attr
- from constantly import NamedConstant, Names, ValueConstant, Values
- from zope.interface import implementer
- from twisted.logger import (
- FileLogObserver,
- FilteringLogObserver,
- ILogObserver,
- LogBeginner,
- Logger,
- LogLevel,
- LogLevelFilterPredicate,
- LogPublisher,
- eventAsText,
- jsonFileLogObserver,
- )
- from synapse.config._base import ConfigError
- from synapse.logging._terse_json import (
- TerseJSONToConsoleLogObserver,
- TerseJSONToTCPLogObserver,
- )
- from synapse.logging.context import current_context
- def stdlib_log_level_to_twisted(level: str) -> LogLevel:
- """
- Convert a stdlib log level to Twisted's log level.
- """
- lvl = level.lower().replace("warning", "warn")
- return LogLevel.levelWithName(lvl)
- @attr.s
- @implementer(ILogObserver)
- class LogContextObserver:
- """
- An ILogObserver which adds Synapse-specific log context information.
- Attributes:
- observer (ILogObserver): The target parent observer.
- """
- observer = attr.ib()
- def __call__(self, event: dict) -> None:
- """
- Consume a log event and emit it to the parent observer after filtering
- and adding log context information.
- Args:
- event (dict)
- """
- # Filter out some useless events that Twisted outputs
- if "log_text" in event:
- if event["log_text"].startswith("DNSDatagramProtocol starting on "):
- return
- if event["log_text"].startswith("(UDP Port "):
- return
- if event["log_text"].startswith("Timing out client") or event[
- "log_format"
- ].startswith("Timing out client"):
- return
- context = current_context()
- # Copy the context information to the log event.
- if context is not None:
- context.copy_to_twisted_log_entry(event)
- else:
- # If there's no logging context, not even the root one, we might be
- # starting up or it might be from non-Synapse code. Log it as if it
- # came from the root logger.
- event["request"] = None
- event["scope"] = None
- self.observer(event)
- class PythonStdlibToTwistedLogger(logging.Handler):
- """
- Transform a Python stdlib log message into a Twisted one.
- """
- def __init__(self, observer, *args, **kwargs):
- """
- Args:
- observer (ILogObserver): A Twisted logging observer.
- *args, **kwargs: Args/kwargs to be passed to logging.Handler.
- """
- self.observer = observer
- super().__init__(*args, **kwargs)
- def emit(self, record: logging.LogRecord) -> None:
- """
- Emit a record to Twisted's observer.
- Args:
- record (logging.LogRecord)
- """
- self.observer(
- {
- "log_time": record.created,
- "log_text": record.getMessage(),
- "log_format": "{log_text}",
- "log_namespace": record.name,
- "log_level": stdlib_log_level_to_twisted(record.levelname),
- }
- )
- def SynapseFileLogObserver(outFile: typing.IO[str]) -> FileLogObserver:
- """
- A log observer that formats events like the traditional log formatter and
- sends them to `outFile`.
- Args:
- outFile (file object): The file object to write to.
- """
- def formatEvent(_event: dict) -> str:
- event = dict(_event)
- event["log_level"] = event["log_level"].name.upper()
- event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + (
- event.get("log_format", "{log_text}") or "{log_text}"
- )
- return eventAsText(event, includeSystem=False) + "\n"
- return FileLogObserver(outFile, formatEvent)
- class DrainType(Names):
- CONSOLE = NamedConstant()
- CONSOLE_JSON = NamedConstant()
- CONSOLE_JSON_TERSE = NamedConstant()
- FILE = NamedConstant()
- FILE_JSON = NamedConstant()
- NETWORK_JSON_TERSE = NamedConstant()
- class OutputPipeType(Values):
- stdout = ValueConstant(sys.__stdout__)
- stderr = ValueConstant(sys.__stderr__)
- @attr.s
- class DrainConfiguration:
- name = attr.ib()
- type = attr.ib()
- location = attr.ib()
- options = attr.ib(default=None)
- @attr.s
- class NetworkJSONTerseOptions:
- maximum_buffer = attr.ib(type=int)
- DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}}
- def parse_drain_configs(
- drains: dict,
- ) -> typing.Generator[DrainConfiguration, None, None]:
- """
- Parse the drain configurations.
- Args:
- drains (dict): A list of drain configurations.
- Yields:
- DrainConfiguration instances.
- Raises:
- ConfigError: If any of the drain configuration items are invalid.
- """
- for name, config in drains.items():
- if "type" not in config:
- raise ConfigError("Logging drains require a 'type' key.")
- try:
- logging_type = DrainType.lookupByName(config["type"].upper())
- except ValueError:
- raise ConfigError(
- "%s is not a known logging drain type." % (config["type"],)
- )
- if logging_type in [
- DrainType.CONSOLE,
- DrainType.CONSOLE_JSON,
- DrainType.CONSOLE_JSON_TERSE,
- ]:
- location = config.get("location")
- if location is None or location not in ["stdout", "stderr"]:
- raise ConfigError(
- (
- "The %s drain needs the 'location' key set to "
- "either 'stdout' or 'stderr'."
- )
- % (logging_type,)
- )
- pipe = OutputPipeType.lookupByName(location).value
- yield DrainConfiguration(name=name, type=logging_type, location=pipe)
- elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]:
- if "location" not in config:
- raise ConfigError(
- "The %s drain needs the 'location' key set." % (logging_type,)
- )
- location = config.get("location")
- if os.path.abspath(location) != location:
- raise ConfigError(
- "File paths need to be absolute, '%s' is a relative path"
- % (location,)
- )
- yield DrainConfiguration(name=name, type=logging_type, location=location)
- elif logging_type in [DrainType.NETWORK_JSON_TERSE]:
- host = config.get("host")
- port = config.get("port")
- maximum_buffer = config.get("maximum_buffer", 1000)
- yield DrainConfiguration(
- name=name,
- type=logging_type,
- location=(host, port),
- options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer),
- )
- else:
- raise ConfigError(
- "The %s drain type is currently not implemented."
- % (config["type"].upper(),)
- )
- class StoppableLogPublisher(LogPublisher):
- """
- A log publisher that can tell its observers to shut down any external
- communications.
- """
- def stop(self):
- for obs in self._observers:
- if hasattr(obs, "stop"):
- obs.stop()
- def setup_structured_logging(
- hs,
- config,
- log_config: dict,
- logBeginner: LogBeginner,
- redirect_stdlib_logging: bool = True,
- ) -> LogPublisher:
- """
- Set up Twisted's structured logging system.
- Args:
- hs: The homeserver to use.
- config (HomeserverConfig): The configuration of the Synapse homeserver.
- log_config (dict): The log configuration to use.
- """
- if config.no_redirect_stdio:
- raise ConfigError(
- "no_redirect_stdio cannot be defined using structured logging."
- )
- logger = Logger()
- if "drains" not in log_config:
- raise ConfigError("The logging configuration requires a list of drains.")
- observers = [] # type: List[ILogObserver]
- for observer in parse_drain_configs(log_config["drains"]):
- # Pipe drains
- if observer.type == DrainType.CONSOLE:
- logger.debug(
- "Starting up the {name} console logger drain", name=observer.name
- )
- observers.append(SynapseFileLogObserver(observer.location))
- elif observer.type == DrainType.CONSOLE_JSON:
- logger.debug(
- "Starting up the {name} JSON console logger drain", name=observer.name
- )
- observers.append(jsonFileLogObserver(observer.location))
- elif observer.type == DrainType.CONSOLE_JSON_TERSE:
- logger.debug(
- "Starting up the {name} terse JSON console logger drain",
- name=observer.name,
- )
- observers.append(
- TerseJSONToConsoleLogObserver(observer.location, metadata={})
- )
- # File drains
- elif observer.type == DrainType.FILE:
- logger.debug("Starting up the {name} file logger drain", name=observer.name)
- log_file = open(observer.location, "at", buffering=1, encoding="utf8")
- observers.append(SynapseFileLogObserver(log_file))
- elif observer.type == DrainType.FILE_JSON:
- logger.debug(
- "Starting up the {name} JSON file logger drain", name=observer.name
- )
- log_file = open(observer.location, "at", buffering=1, encoding="utf8")
- observers.append(jsonFileLogObserver(log_file))
- elif observer.type == DrainType.NETWORK_JSON_TERSE:
- metadata = {"server_name": hs.config.server_name}
- log_observer = TerseJSONToTCPLogObserver(
- hs=hs,
- host=observer.location[0],
- port=observer.location[1],
- metadata=metadata,
- maximum_buffer=observer.options.maximum_buffer,
- )
- log_observer.start()
- observers.append(log_observer)
- else:
- # We should never get here, but, just in case, throw an error.
- raise ConfigError("%s drain type cannot be configured" % (observer.type,))
- publisher = StoppableLogPublisher(*observers)
- log_filter = LogLevelFilterPredicate()
- for namespace, namespace_config in log_config.get(
- "loggers", DEFAULT_LOGGERS
- ).items():
- # Set the log level for twisted.logger.Logger namespaces
- log_filter.setLogLevelForNamespace(
- namespace,
- stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")),
- )
- # Also set the log levels for the stdlib logger namespaces, to prevent
- # them getting to PythonStdlibToTwistedLogger and having to be formatted
- if "level" in namespace_config:
- logging.getLogger(namespace).setLevel(namespace_config.get("level"))
- f = FilteringLogObserver(publisher, [log_filter])
- lco = LogContextObserver(f)
- if redirect_stdlib_logging:
- stuff_into_twisted = PythonStdlibToTwistedLogger(lco)
- stdliblogger = logging.getLogger()
- stdliblogger.addHandler(stuff_into_twisted)
- # Always redirect standard I/O, otherwise other logging outputs might miss
- # it.
- logBeginner.beginLoggingTo([lco], redirectStandardIO=True)
- return publisher
- def reload_structured_logging(*args, log_config=None) -> None:
- warnings.warn(
- "Currently the structured logging system can not be reloaded, doing nothing"
- )
|