test_file_consumer.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import threading
  16. from io import StringIO
  17. from mock import NonCallableMock
  18. from twisted.internet import defer, reactor
  19. from synapse.util.file_consumer import BackgroundFileConsumer
  20. from tests import unittest
  21. class FileConsumerTests(unittest.TestCase):
  22. @defer.inlineCallbacks
  23. def test_pull_consumer(self):
  24. string_file = StringIO()
  25. consumer = BackgroundFileConsumer(string_file, reactor=reactor)
  26. try:
  27. producer = DummyPullProducer()
  28. yield producer.register_with_consumer(consumer)
  29. yield producer.write_and_wait("Foo")
  30. self.assertEqual(string_file.getvalue(), "Foo")
  31. yield producer.write_and_wait("Bar")
  32. self.assertEqual(string_file.getvalue(), "FooBar")
  33. finally:
  34. consumer.unregisterProducer()
  35. yield consumer.wait()
  36. self.assertTrue(string_file.closed)
  37. @defer.inlineCallbacks
  38. def test_push_consumer(self):
  39. string_file = BlockingStringWrite()
  40. consumer = BackgroundFileConsumer(string_file, reactor=reactor)
  41. try:
  42. producer = NonCallableMock(spec_set=[])
  43. consumer.registerProducer(producer, True)
  44. consumer.write("Foo")
  45. yield string_file.wait_for_n_writes(1)
  46. self.assertEqual(string_file.buffer, "Foo")
  47. consumer.write("Bar")
  48. yield string_file.wait_for_n_writes(2)
  49. self.assertEqual(string_file.buffer, "FooBar")
  50. finally:
  51. consumer.unregisterProducer()
  52. yield consumer.wait()
  53. self.assertTrue(string_file.closed)
  54. @defer.inlineCallbacks
  55. def test_push_producer_feedback(self):
  56. string_file = BlockingStringWrite()
  57. consumer = BackgroundFileConsumer(string_file, reactor=reactor)
  58. try:
  59. producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"])
  60. resume_deferred = defer.Deferred()
  61. producer.resumeProducing.side_effect = lambda: resume_deferred.callback(
  62. None
  63. )
  64. consumer.registerProducer(producer, True)
  65. number_writes = 0
  66. with string_file.write_lock:
  67. for _ in range(consumer._PAUSE_ON_QUEUE_SIZE):
  68. consumer.write("Foo")
  69. number_writes += 1
  70. producer.pauseProducing.assert_called_once()
  71. yield string_file.wait_for_n_writes(number_writes)
  72. yield resume_deferred
  73. producer.resumeProducing.assert_called_once()
  74. finally:
  75. consumer.unregisterProducer()
  76. yield consumer.wait()
  77. self.assertTrue(string_file.closed)
  78. class DummyPullProducer:
  79. def __init__(self):
  80. self.consumer = None
  81. self.deferred = defer.Deferred()
  82. def resumeProducing(self):
  83. d = self.deferred
  84. self.deferred = defer.Deferred()
  85. d.callback(None)
  86. def write_and_wait(self, bytes):
  87. d = self.deferred
  88. self.consumer.write(bytes)
  89. return d
  90. def register_with_consumer(self, consumer):
  91. d = self.deferred
  92. self.consumer = consumer
  93. self.consumer.registerProducer(self, False)
  94. return d
  95. class BlockingStringWrite:
  96. def __init__(self):
  97. self.buffer = ""
  98. self.closed = False
  99. self.write_lock = threading.Lock()
  100. self._notify_write_deferred = None
  101. self._number_of_writes = 0
  102. def write(self, bytes):
  103. with self.write_lock:
  104. self.buffer += bytes
  105. self._number_of_writes += 1
  106. reactor.callFromThread(self._notify_write)
  107. def close(self):
  108. self.closed = True
  109. def _notify_write(self):
  110. "Called by write to indicate a write happened"
  111. with self.write_lock:
  112. if not self._notify_write_deferred:
  113. return
  114. d = self._notify_write_deferred
  115. self._notify_write_deferred = None
  116. d.callback(None)
  117. @defer.inlineCallbacks
  118. def wait_for_n_writes(self, n):
  119. "Wait for n writes to have happened"
  120. while True:
  121. with self.write_lock:
  122. if n <= self._number_of_writes:
  123. return
  124. if not self._notify_write_deferred:
  125. self._notify_write_deferred = defer.Deferred()
  126. d = self._notify_write_deferred
  127. yield d