test_file_consumer.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. # Copyright 2018 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import threading
  15. from io import StringIO
  16. from unittest.mock import NonCallableMock
  17. from twisted.internet import defer, reactor
  18. from synapse.util.file_consumer import BackgroundFileConsumer
  19. from tests import unittest
  20. class FileConsumerTests(unittest.TestCase):
  21. @defer.inlineCallbacks
  22. def test_pull_consumer(self):
  23. string_file = StringIO()
  24. consumer = BackgroundFileConsumer(string_file, reactor=reactor)
  25. try:
  26. producer = DummyPullProducer()
  27. yield producer.register_with_consumer(consumer)
  28. yield producer.write_and_wait("Foo")
  29. self.assertEqual(string_file.getvalue(), "Foo")
  30. yield producer.write_and_wait("Bar")
  31. self.assertEqual(string_file.getvalue(), "FooBar")
  32. finally:
  33. consumer.unregisterProducer()
  34. yield consumer.wait()
  35. self.assertTrue(string_file.closed)
  36. @defer.inlineCallbacks
  37. def test_push_consumer(self):
  38. string_file = BlockingStringWrite()
  39. consumer = BackgroundFileConsumer(string_file, reactor=reactor)
  40. try:
  41. producer = NonCallableMock(spec_set=[])
  42. consumer.registerProducer(producer, True)
  43. consumer.write("Foo")
  44. yield string_file.wait_for_n_writes(1)
  45. self.assertEqual(string_file.buffer, "Foo")
  46. consumer.write("Bar")
  47. yield string_file.wait_for_n_writes(2)
  48. self.assertEqual(string_file.buffer, "FooBar")
  49. finally:
  50. consumer.unregisterProducer()
  51. yield consumer.wait()
  52. self.assertTrue(string_file.closed)
  53. @defer.inlineCallbacks
  54. def test_push_producer_feedback(self):
  55. string_file = BlockingStringWrite()
  56. consumer = BackgroundFileConsumer(string_file, reactor=reactor)
  57. try:
  58. producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"])
  59. resume_deferred = defer.Deferred()
  60. producer.resumeProducing.side_effect = lambda: resume_deferred.callback(
  61. None
  62. )
  63. consumer.registerProducer(producer, True)
  64. number_writes = 0
  65. with string_file.write_lock:
  66. for _ in range(consumer._PAUSE_ON_QUEUE_SIZE):
  67. consumer.write("Foo")
  68. number_writes += 1
  69. producer.pauseProducing.assert_called_once()
  70. yield string_file.wait_for_n_writes(number_writes)
  71. yield resume_deferred
  72. producer.resumeProducing.assert_called_once()
  73. finally:
  74. consumer.unregisterProducer()
  75. yield consumer.wait()
  76. self.assertTrue(string_file.closed)
  77. class DummyPullProducer:
  78. def __init__(self):
  79. self.consumer = None
  80. self.deferred = defer.Deferred()
  81. def resumeProducing(self):
  82. d = self.deferred
  83. self.deferred = defer.Deferred()
  84. d.callback(None)
  85. def write_and_wait(self, bytes):
  86. d = self.deferred
  87. self.consumer.write(bytes)
  88. return d
  89. def register_with_consumer(self, consumer):
  90. d = self.deferred
  91. self.consumer = consumer
  92. self.consumer.registerProducer(self, False)
  93. return d
  94. class BlockingStringWrite:
  95. def __init__(self):
  96. self.buffer = ""
  97. self.closed = False
  98. self.write_lock = threading.Lock()
  99. self._notify_write_deferred = None
  100. self._number_of_writes = 0
  101. def write(self, bytes):
  102. with self.write_lock:
  103. self.buffer += bytes
  104. self._number_of_writes += 1
  105. reactor.callFromThread(self._notify_write)
  106. def close(self):
  107. self.closed = True
  108. def _notify_write(self):
  109. "Called by write to indicate a write happened"
  110. with self.write_lock:
  111. if not self._notify_write_deferred:
  112. return
  113. d = self._notify_write_deferred
  114. self._notify_write_deferred = None
  115. d.callback(None)
  116. @defer.inlineCallbacks
  117. def wait_for_n_writes(self, n):
  118. "Wait for n writes to have happened"
  119. while True:
  120. with self.write_lock:
  121. if n <= self._number_of_writes:
  122. return
  123. if not self._notify_write_deferred:
  124. self._notify_write_deferred = defer.Deferred()
  125. d = self._notify_write_deferred
  126. yield d