test_remote_handler.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # Copyright 2019 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Tuple
  15. from twisted.internet.protocol import Protocol
  16. from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
  17. from synapse.logging import RemoteHandler
  18. from tests.logging import LoggerCleanupMixin
  19. from tests.server import FakeTransport, get_clock
  20. from tests.unittest import TestCase
  21. from tests.utils import checked_cast
  22. def connect_logging_client(
  23. reactor: MemoryReactorClock, client_id: int
  24. ) -> Tuple[Protocol, AccumulatingProtocol]:
  25. # This is essentially tests.server.connect_client, but disabling autoflush on
  26. # the client transport. This is necessary to avoid an infinite loop due to
  27. # sending of data via the logging transport causing additional logs to be
  28. # written.
  29. factory = reactor.tcpClients.pop(client_id)[2]
  30. client = factory.buildProtocol(None)
  31. server = AccumulatingProtocol()
  32. server.makeConnection(FakeTransport(client, reactor))
  33. client.makeConnection(FakeTransport(server, reactor, autoflush=False))
  34. return client, server
  35. class RemoteHandlerTestCase(LoggerCleanupMixin, TestCase):
  36. def setUp(self) -> None:
  37. self.reactor, _ = get_clock()
  38. def test_log_output(self) -> None:
  39. """
  40. The remote handler delivers logs over TCP.
  41. """
  42. handler = RemoteHandler("127.0.0.1", 9000, _reactor=self.reactor)
  43. logger = self.get_logger(handler)
  44. logger.info("Hello there, %s!", "wally")
  45. # Trigger the connection
  46. client, server = connect_logging_client(self.reactor, 0)
  47. # Trigger data being sent
  48. client_transport = checked_cast(FakeTransport, client.transport)
  49. client_transport.flush()
  50. # One log message, with a single trailing newline
  51. logs = server.data.decode("utf8").splitlines()
  52. self.assertEqual(len(logs), 1)
  53. self.assertEqual(server.data.count(b"\n"), 1)
  54. # Ensure the data passed through properly.
  55. self.assertEqual(logs[0], "Hello there, wally!")
  56. def test_log_backpressure_debug(self) -> None:
  57. """
  58. When backpressure is hit, DEBUG logs will be shed.
  59. """
  60. handler = RemoteHandler(
  61. "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor
  62. )
  63. logger = self.get_logger(handler)
  64. # Send some debug messages
  65. for i in range(3):
  66. logger.debug("debug %s" % (i,))
  67. # Send a bunch of useful messages
  68. for i in range(7):
  69. logger.info("info %s" % (i,))
  70. # The last debug message pushes it past the maximum buffer
  71. logger.debug("too much debug")
  72. # Allow the reconnection
  73. client, server = connect_logging_client(self.reactor, 0)
  74. client_transport = checked_cast(FakeTransport, client.transport)
  75. client_transport.flush()
  76. # Only the 7 infos made it through, the debugs were elided
  77. logs = server.data.splitlines()
  78. self.assertEqual(len(logs), 7)
  79. self.assertNotIn(b"debug", server.data)
  80. def test_log_backpressure_info(self) -> None:
  81. """
  82. When backpressure is hit, DEBUG and INFO logs will be shed.
  83. """
  84. handler = RemoteHandler(
  85. "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor
  86. )
  87. logger = self.get_logger(handler)
  88. # Send some debug messages
  89. for i in range(3):
  90. logger.debug("debug %s" % (i,))
  91. # Send a bunch of useful messages
  92. for i in range(10):
  93. logger.warning("warn %s" % (i,))
  94. # Send a bunch of info messages
  95. for i in range(3):
  96. logger.info("info %s" % (i,))
  97. # The last debug message pushes it past the maximum buffer
  98. logger.debug("too much debug")
  99. # Allow the reconnection
  100. client, server = connect_logging_client(self.reactor, 0)
  101. client_transport = checked_cast(FakeTransport, client.transport)
  102. client_transport.flush()
  103. # The 10 warnings made it through, the debugs and infos were elided
  104. logs = server.data.splitlines()
  105. self.assertEqual(len(logs), 10)
  106. self.assertNotIn(b"debug", server.data)
  107. self.assertNotIn(b"info", server.data)
  108. def test_log_backpressure_cut_middle(self) -> None:
  109. """
  110. When backpressure is hit, and no more DEBUG and INFOs cannot be culled,
  111. it will cut the middle messages out.
  112. """
  113. handler = RemoteHandler(
  114. "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor
  115. )
  116. logger = self.get_logger(handler)
  117. # Send a bunch of useful messages
  118. for i in range(20):
  119. logger.warning("warn %s" % (i,))
  120. # Allow the reconnection
  121. client, server = connect_logging_client(self.reactor, 0)
  122. client_transport = checked_cast(FakeTransport, client.transport)
  123. client_transport.flush()
  124. # The first five and last five warnings made it through, the debugs and
  125. # infos were elided
  126. logs = server.data.decode("utf8").splitlines()
  127. self.assertEqual(
  128. ["warn %s" % (i,) for i in range(5)]
  129. + ["warn %s" % (i,) for i in range(15, 20)],
  130. logs,
  131. )
  132. def test_cancel_connection(self) -> None:
  133. """
  134. Gracefully handle the connection being cancelled.
  135. """
  136. handler = RemoteHandler(
  137. "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor
  138. )
  139. logger = self.get_logger(handler)
  140. # Send a message.
  141. logger.info("Hello there, %s!", "wally")
  142. # Do not accept the connection and shutdown. This causes the pending
  143. # connection to be cancelled (and should not raise any exceptions).
  144. handler.close()