test_file_consumer.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer, reactor
  16. from mock import NonCallableMock
  17. from synapse.util.file_consumer import BackgroundFileConsumer
  18. from tests import unittest
  19. from six import StringIO
  20. import threading
  21. class FileConsumerTests(unittest.TestCase):
  22. @defer.inlineCallbacks
  23. def test_pull_consumer(self):
  24. string_file = StringIO()
  25. consumer = BackgroundFileConsumer(string_file)
  26. try:
  27. producer = DummyPullProducer()
  28. yield producer.register_with_consumer(consumer)
  29. yield producer.write_and_wait("Foo")
  30. self.assertEqual(string_file.getvalue(), "Foo")
  31. yield producer.write_and_wait("Bar")
  32. self.assertEqual(string_file.getvalue(), "FooBar")
  33. finally:
  34. consumer.unregisterProducer()
  35. yield consumer.wait()
  36. self.assertTrue(string_file.closed)
  37. @defer.inlineCallbacks
  38. def test_push_consumer(self):
  39. string_file = BlockingStringWrite()
  40. consumer = BackgroundFileConsumer(string_file)
  41. try:
  42. producer = NonCallableMock(spec_set=[])
  43. consumer.registerProducer(producer, True)
  44. consumer.write("Foo")
  45. yield string_file.wait_for_n_writes(1)
  46. self.assertEqual(string_file.buffer, "Foo")
  47. consumer.write("Bar")
  48. yield string_file.wait_for_n_writes(2)
  49. self.assertEqual(string_file.buffer, "FooBar")
  50. finally:
  51. consumer.unregisterProducer()
  52. yield consumer.wait()
  53. self.assertTrue(string_file.closed)
  54. @defer.inlineCallbacks
  55. def test_push_producer_feedback(self):
  56. string_file = BlockingStringWrite()
  57. consumer = BackgroundFileConsumer(string_file)
  58. try:
  59. producer = NonCallableMock(spec_set=["pauseProducing", "resumeProducing"])
  60. resume_deferred = defer.Deferred()
  61. producer.resumeProducing.side_effect = lambda: resume_deferred.callback(None)
  62. consumer.registerProducer(producer, True)
  63. number_writes = 0
  64. with string_file.write_lock:
  65. for _ in range(consumer._PAUSE_ON_QUEUE_SIZE):
  66. consumer.write("Foo")
  67. number_writes += 1
  68. producer.pauseProducing.assert_called_once()
  69. yield string_file.wait_for_n_writes(number_writes)
  70. yield resume_deferred
  71. producer.resumeProducing.assert_called_once()
  72. finally:
  73. consumer.unregisterProducer()
  74. yield consumer.wait()
  75. self.assertTrue(string_file.closed)
  76. class DummyPullProducer(object):
  77. def __init__(self):
  78. self.consumer = None
  79. self.deferred = defer.Deferred()
  80. def resumeProducing(self):
  81. d = self.deferred
  82. self.deferred = defer.Deferred()
  83. d.callback(None)
  84. def write_and_wait(self, bytes):
  85. d = self.deferred
  86. self.consumer.write(bytes)
  87. return d
  88. def register_with_consumer(self, consumer):
  89. d = self.deferred
  90. self.consumer = consumer
  91. self.consumer.registerProducer(self, False)
  92. return d
  93. class BlockingStringWrite(object):
  94. def __init__(self):
  95. self.buffer = ""
  96. self.closed = False
  97. self.write_lock = threading.Lock()
  98. self._notify_write_deferred = None
  99. self._number_of_writes = 0
  100. def write(self, bytes):
  101. with self.write_lock:
  102. self.buffer += bytes
  103. self._number_of_writes += 1
  104. reactor.callFromThread(self._notify_write)
  105. def close(self):
  106. self.closed = True
  107. def _notify_write(self):
  108. "Called by write to indicate a write happened"
  109. with self.write_lock:
  110. if not self._notify_write_deferred:
  111. return
  112. d = self._notify_write_deferred
  113. self._notify_write_deferred = None
  114. d.callback(None)
  115. @defer.inlineCallbacks
  116. def wait_for_n_writes(self, n):
  117. "Wait for n writes to have happened"
  118. while True:
  119. with self.write_lock:
  120. if n <= self._number_of_writes:
  121. return
  122. if not self._notify_write_deferred:
  123. self._notify_write_deferred = defer.Deferred()
  124. d = self._notify_write_deferred
  125. yield d