logging.py 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2019 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import warnings
  16. from io import StringIO
  17. from mock import Mock
  18. from pyperf import perf_counter
  19. from synmark import make_homeserver
  20. from twisted.internet.defer import Deferred
  21. from twisted.internet.protocol import ServerFactory
  22. from twisted.logger import LogBeginner, Logger, LogPublisher
  23. from twisted.protocols.basic import LineOnlyReceiver
  24. from synapse.logging._structured import setup_structured_logging
  25. class LineCounter(LineOnlyReceiver):
  26. delimiter = b"\n"
  27. def __init__(self, *args, **kwargs):
  28. self.count = 0
  29. super().__init__(*args, **kwargs)
  30. def lineReceived(self, line):
  31. self.count += 1
  32. if self.count >= self.factory.wait_for and self.factory.on_done:
  33. on_done = self.factory.on_done
  34. self.factory.on_done = None
  35. on_done.callback(True)
  36. async def main(reactor, loops):
  37. """
  38. Benchmark how long it takes to send `loops` messages.
  39. """
  40. servers = []
  41. def protocol():
  42. p = LineCounter()
  43. servers.append(p)
  44. return p
  45. logger_factory = ServerFactory.forProtocol(protocol)
  46. logger_factory.wait_for = loops
  47. logger_factory.on_done = Deferred()
  48. port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1")
  49. hs, wait, cleanup = await make_homeserver(reactor)
  50. errors = StringIO()
  51. publisher = LogPublisher()
  52. mock_sys = Mock()
  53. beginner = LogBeginner(
  54. publisher, errors, mock_sys, warnings, initialBufferSize=loops
  55. )
  56. log_config = {
  57. "loggers": {"synapse": {"level": "DEBUG"}},
  58. "drains": {
  59. "tersejson": {
  60. "type": "network_json_terse",
  61. "host": "127.0.0.1",
  62. "port": port.getHost().port,
  63. "maximum_buffer": 100,
  64. }
  65. },
  66. }
  67. logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher)
  68. logging_system = setup_structured_logging(
  69. hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False
  70. )
  71. # Wait for it to connect...
  72. await logging_system._observers[0]._service.whenConnected()
  73. start = perf_counter()
  74. # Send a bunch of useful messages
  75. for i in range(0, loops):
  76. logger.info("test message %s" % (i,))
  77. if (
  78. len(logging_system._observers[0]._buffer)
  79. == logging_system._observers[0].maximum_buffer
  80. ):
  81. while (
  82. len(logging_system._observers[0]._buffer)
  83. > logging_system._observers[0].maximum_buffer / 2
  84. ):
  85. await wait(0.01)
  86. await logger_factory.on_done
  87. end = perf_counter() - start
  88. logging_system.stop()
  89. port.stopListening()
  90. cleanup()
  91. return end