_remote.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import sys
  16. import traceback
  17. from collections import deque
  18. from ipaddress import IPv4Address, IPv6Address, ip_address
  19. from math import floor
  20. from typing import Callable, Optional
  21. import attr
  22. from typing_extensions import Deque
  23. from zope.interface import implementer
  24. from twisted.application.internet import ClientService
  25. from twisted.internet.defer import CancelledError, Deferred
  26. from twisted.internet.endpoints import (
  27. HostnameEndpoint,
  28. TCP4ClientEndpoint,
  29. TCP6ClientEndpoint,
  30. )
  31. from twisted.internet.interfaces import (
  32. IPushProducer,
  33. IReactorTCP,
  34. IStreamClientEndpoint,
  35. )
  36. from twisted.internet.protocol import Factory, Protocol
  37. from twisted.internet.tcp import Connection
  38. from twisted.python.failure import Failure
  39. logger = logging.getLogger(__name__)
  40. @attr.s(slots=True, auto_attribs=True)
  41. @implementer(IPushProducer)
  42. class LogProducer:
  43. """
  44. An IPushProducer that writes logs from its buffer to its transport when it
  45. is resumed.
  46. Args:
  47. buffer: Log buffer to read logs from.
  48. transport: Transport to write to.
  49. format: A callable to format the log record to a string.
  50. """
  51. # This is essentially ITCPTransport, but that is missing certain fields
  52. # (connected and registerProducer) which are part of the implementation.
  53. transport: Connection
  54. _format: Callable[[logging.LogRecord], str]
  55. _buffer: Deque[logging.LogRecord]
  56. _paused: bool = attr.ib(default=False, init=False)
  57. def pauseProducing(self) -> None:
  58. self._paused = True
  59. def stopProducing(self) -> None:
  60. self._paused = True
  61. self._buffer = deque()
  62. def resumeProducing(self) -> None:
  63. # If we're already producing, nothing to do.
  64. self._paused = False
  65. # Loop until paused.
  66. while self._paused is False and (self._buffer and self.transport.connected):
  67. try:
  68. # Request the next record and format it.
  69. record = self._buffer.popleft()
  70. msg = self._format(record)
  71. # Send it as a new line over the transport.
  72. self.transport.write(msg.encode("utf8"))
  73. self.transport.write(b"\n")
  74. except Exception:
  75. # Something has gone wrong writing to the transport -- log it
  76. # and break out of the while.
  77. traceback.print_exc(file=sys.__stderr__)
  78. break
  79. class RemoteHandler(logging.Handler):
  80. """
  81. An logging handler that writes logs to a TCP target.
  82. Args:
  83. host: The host of the logging target.
  84. port: The logging target's port.
  85. maximum_buffer: The maximum buffer size.
  86. """
  87. def __init__(
  88. self,
  89. host: str,
  90. port: int,
  91. maximum_buffer: int = 1000,
  92. level: int = logging.NOTSET,
  93. _reactor: Optional[IReactorTCP] = None,
  94. ):
  95. super().__init__(level=level)
  96. self.host = host
  97. self.port = port
  98. self.maximum_buffer = maximum_buffer
  99. self._buffer: Deque[logging.LogRecord] = deque()
  100. self._connection_waiter: Optional[Deferred] = None
  101. self._producer: Optional[LogProducer] = None
  102. # Connect without DNS lookups if it's a direct IP.
  103. if _reactor is None:
  104. from twisted.internet import reactor
  105. _reactor = reactor # type: ignore[assignment]
  106. try:
  107. ip = ip_address(self.host)
  108. if isinstance(ip, IPv4Address):
  109. endpoint: IStreamClientEndpoint = TCP4ClientEndpoint(
  110. _reactor, self.host, self.port
  111. )
  112. elif isinstance(ip, IPv6Address):
  113. endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port)
  114. else:
  115. raise ValueError("Unknown IP address provided: %s" % (self.host,))
  116. except ValueError:
  117. endpoint = HostnameEndpoint(_reactor, self.host, self.port)
  118. factory = Factory.forProtocol(Protocol)
  119. self._service = ClientService(endpoint, factory, clock=_reactor)
  120. self._service.startService()
  121. self._stopping = False
  122. self._connect()
  123. def close(self) -> None:
  124. self._stopping = True
  125. self._service.stopService()
  126. def _connect(self) -> None:
  127. """
  128. Triggers an attempt to connect then write to the remote if not already writing.
  129. """
  130. # Do not attempt to open multiple connections.
  131. if self._connection_waiter:
  132. return
  133. def fail(failure: Failure) -> None:
  134. # If the Deferred was cancelled (e.g. during shutdown) do not try to
  135. # reconnect (this will cause an infinite loop of errors).
  136. if failure.check(CancelledError) and self._stopping:
  137. return
  138. # For a different error, print the traceback and re-connect.
  139. failure.printTraceback(file=sys.__stderr__)
  140. self._connection_waiter = None
  141. self._connect()
  142. def writer(result: Protocol) -> None:
  143. # Force recognising transport as a Connection and not the more
  144. # generic ITransport.
  145. transport: Connection = result.transport # type: ignore
  146. # We have a connection. If we already have a producer, and its
  147. # transport is the same, just trigger a resumeProducing.
  148. if self._producer and transport is self._producer.transport:
  149. self._producer.resumeProducing()
  150. self._connection_waiter = None
  151. return
  152. # If the producer is still producing, stop it.
  153. if self._producer:
  154. self._producer.stopProducing()
  155. # Make a new producer and start it.
  156. self._producer = LogProducer(
  157. buffer=self._buffer,
  158. transport=transport,
  159. format=self.format,
  160. )
  161. transport.registerProducer(self._producer, True)
  162. self._producer.resumeProducing()
  163. self._connection_waiter = None
  164. deferred: Deferred = self._service.whenConnected(failAfterFailures=1)
  165. deferred.addCallbacks(writer, fail)
  166. self._connection_waiter = deferred
  167. def _handle_pressure(self) -> None:
  168. """
  169. Handle backpressure by shedding records.
  170. The buffer will, in this order, until the buffer is below the maximum:
  171. - Shed DEBUG records.
  172. - Shed INFO records.
  173. - Shed the middle 50% of the records.
  174. """
  175. if len(self._buffer) <= self.maximum_buffer:
  176. return
  177. # Strip out DEBUGs
  178. self._buffer = deque(
  179. filter(lambda record: record.levelno > logging.DEBUG, self._buffer)
  180. )
  181. if len(self._buffer) <= self.maximum_buffer:
  182. return
  183. # Strip out INFOs
  184. self._buffer = deque(
  185. filter(lambda record: record.levelno > logging.INFO, self._buffer)
  186. )
  187. if len(self._buffer) <= self.maximum_buffer:
  188. return
  189. # Cut the middle entries out
  190. buffer_split = floor(self.maximum_buffer / 2)
  191. old_buffer = self._buffer
  192. self._buffer = deque()
  193. for _ in range(buffer_split):
  194. self._buffer.append(old_buffer.popleft())
  195. end_buffer = []
  196. for _ in range(buffer_split):
  197. end_buffer.append(old_buffer.pop())
  198. self._buffer.extend(reversed(end_buffer))
  199. def emit(self, record: logging.LogRecord) -> None:
  200. self._buffer.append(record)
  201. # Handle backpressure, if it exists.
  202. try:
  203. self._handle_pressure()
  204. except Exception:
  205. # If handling backpressure fails, clear the buffer and log the
  206. # exception.
  207. self._buffer.clear()
  208. logger.warning("Failed clearing backpressure")
  209. # Try and write immediately.
  210. self._connect()