file_consumer.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import queue
  16. from twisted.internet import threads
  17. from synapse.logging.context import make_deferred_yieldable, run_in_background
  18. class BackgroundFileConsumer(object):
  19. """A consumer that writes to a file like object. Supports both push
  20. and pull producers
  21. Args:
  22. file_obj (file): The file like object to write to. Closed when
  23. finished.
  24. reactor (twisted.internet.reactor): the Twisted reactor to use
  25. """
  26. # For PushProducers pause if we have this many unwritten slices
  27. _PAUSE_ON_QUEUE_SIZE = 5
  28. # And resume once the size of the queue is less than this
  29. _RESUME_ON_QUEUE_SIZE = 2
  30. def __init__(self, file_obj, reactor):
  31. self._file_obj = file_obj
  32. self._reactor = reactor
  33. # Producer we're registered with
  34. self._producer = None
  35. # True if PushProducer, false if PullProducer
  36. self.streaming = False
  37. # For PushProducers, indicates whether we've paused the producer and
  38. # need to call resumeProducing before we get more data.
  39. self._paused_producer = False
  40. # Queue of slices of bytes to be written. When producer calls
  41. # unregister a final None is sent.
  42. self._bytes_queue = queue.Queue()
  43. # Deferred that is resolved when finished writing
  44. self._finished_deferred = None
  45. # If the _writer thread throws an exception it gets stored here.
  46. self._write_exception = None
  47. def registerProducer(self, producer, streaming):
  48. """Part of IConsumer interface
  49. Args:
  50. producer (IProducer)
  51. streaming (bool): True if push based producer, False if pull
  52. based.
  53. """
  54. if self._producer:
  55. raise Exception("registerProducer called twice")
  56. self._producer = producer
  57. self.streaming = streaming
  58. self._finished_deferred = run_in_background(
  59. threads.deferToThreadPool,
  60. self._reactor,
  61. self._reactor.getThreadPool(),
  62. self._writer,
  63. )
  64. if not streaming:
  65. self._producer.resumeProducing()
  66. def unregisterProducer(self):
  67. """Part of IProducer interface
  68. """
  69. self._producer = None
  70. if not self._finished_deferred.called:
  71. self._bytes_queue.put_nowait(None)
  72. def write(self, bytes):
  73. """Part of IProducer interface
  74. """
  75. if self._write_exception:
  76. raise self._write_exception
  77. if self._finished_deferred.called:
  78. raise Exception("consumer has closed")
  79. self._bytes_queue.put_nowait(bytes)
  80. # If this is a PushProducer and the queue is getting behind
  81. # then we pause the producer.
  82. if self.streaming and self._bytes_queue.qsize() >= self._PAUSE_ON_QUEUE_SIZE:
  83. self._paused_producer = True
  84. self._producer.pauseProducing()
  85. def _writer(self):
  86. """This is run in a background thread to write to the file.
  87. """
  88. try:
  89. while self._producer or not self._bytes_queue.empty():
  90. # If we've paused the producer check if we should resume the
  91. # producer.
  92. if self._producer and self._paused_producer:
  93. if self._bytes_queue.qsize() <= self._RESUME_ON_QUEUE_SIZE:
  94. self._reactor.callFromThread(self._resume_paused_producer)
  95. bytes = self._bytes_queue.get()
  96. # If we get a None (or empty list) then that's a signal used
  97. # to indicate we should check if we should stop.
  98. if bytes:
  99. self._file_obj.write(bytes)
  100. # If its a pull producer then we need to explicitly ask for
  101. # more stuff.
  102. if not self.streaming and self._producer:
  103. self._reactor.callFromThread(self._producer.resumeProducing)
  104. except Exception as e:
  105. self._write_exception = e
  106. raise
  107. finally:
  108. self._file_obj.close()
  109. def wait(self):
  110. """Returns a deferred that resolves when finished writing to file
  111. """
  112. return make_deferred_yieldable(self._finished_deferred)
  113. def _resume_paused_producer(self):
  114. """Gets called if we should resume producing after being paused
  115. """
  116. if self._paused_producer and self._producer:
  117. self._paused_producer = False
  118. self._producer.resumeProducing()