test_linearizer.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257
  1. # Copyright 2016 OpenMarket Ltd
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from typing import Hashable, Tuple
  16. from typing_extensions import Protocol
  17. from twisted.internet import defer, reactor
  18. from twisted.internet.base import ReactorBase
  19. from twisted.internet.defer import CancelledError, Deferred
  20. from synapse.logging.context import LoggingContext, current_context
  21. from synapse.util.async_helpers import Linearizer
  22. from tests import unittest
  23. class UnblockFunction(Protocol):
  24. def __call__(self, pump_reactor: bool = True) -> None:
  25. ...
  26. class LinearizerTestCase(unittest.TestCase):
  27. def _start_task(
  28. self, linearizer: Linearizer, key: Hashable
  29. ) -> Tuple["Deferred[None]", "Deferred[None]", UnblockFunction]:
  30. """Starts a task which acquires the linearizer lock, blocks, then completes.
  31. Args:
  32. linearizer: The `Linearizer`.
  33. key: The `Linearizer` key.
  34. Returns:
  35. A tuple containing:
  36. * A cancellable `Deferred` for the entire task.
  37. * A `Deferred` that resolves once the task acquires the lock.
  38. * A function that unblocks the task. Must be called by the caller
  39. to allow the task to release the lock and complete.
  40. """
  41. acquired_d: "Deferred[None]" = Deferred()
  42. unblock_d: "Deferred[None]" = Deferred()
  43. async def task() -> None:
  44. async with linearizer.queue(key):
  45. acquired_d.callback(None)
  46. await unblock_d
  47. d = defer.ensureDeferred(task())
  48. def unblock(pump_reactor: bool = True) -> None:
  49. unblock_d.callback(None)
  50. # The next task, if it exists, will acquire the lock and require a kick of
  51. # the reactor to advance.
  52. if pump_reactor:
  53. self._pump()
  54. return d, acquired_d, unblock
  55. def _pump(self) -> None:
  56. """Pump the reactor to advance `Linearizer`s."""
  57. assert isinstance(reactor, ReactorBase)
  58. while reactor.getDelayedCalls():
  59. reactor.runUntilCurrent()
  60. def test_linearizer(self) -> None:
  61. """Tests that a task is queued up behind an earlier task."""
  62. linearizer = Linearizer()
  63. key = object()
  64. _, acquired_d1, unblock1 = self._start_task(linearizer, key)
  65. self.assertTrue(acquired_d1.called)
  66. _, acquired_d2, unblock2 = self._start_task(linearizer, key)
  67. self.assertFalse(acquired_d2.called)
  68. # Once the first task is done, the second task can continue.
  69. unblock1()
  70. self.assertTrue(acquired_d2.called)
  71. unblock2()
  72. def test_linearizer_is_queued(self) -> None:
  73. """Tests `Linearizer.is_queued`.
  74. Runs through the same scenario as `test_linearizer`.
  75. """
  76. linearizer = Linearizer()
  77. key = object()
  78. _, acquired_d1, unblock1 = self._start_task(linearizer, key)
  79. self.assertTrue(acquired_d1.called)
  80. # Since the first task acquires the lock immediately, "is_queued" should return
  81. # false.
  82. self.assertFalse(linearizer.is_queued(key))
  83. _, acquired_d2, unblock2 = self._start_task(linearizer, key)
  84. self.assertFalse(acquired_d2.called)
  85. # Now the second task is queued up behind the first.
  86. self.assertTrue(linearizer.is_queued(key))
  87. unblock1()
  88. # And now the second task acquires the lock and nothing is in the queue again.
  89. self.assertTrue(acquired_d2.called)
  90. self.assertFalse(linearizer.is_queued(key))
  91. unblock2()
  92. self.assertFalse(linearizer.is_queued(key))
  93. def test_lots_of_queued_things(self) -> None:
  94. """Tests lots of fast things queued up behind a slow thing.
  95. The stack should *not* explode when the slow thing completes.
  96. """
  97. linearizer = Linearizer()
  98. key = ""
  99. async def func(i: int) -> None:
  100. with LoggingContext("func(%s)" % i) as lc:
  101. async with linearizer.queue(key):
  102. self.assertEqual(current_context(), lc)
  103. self.assertEqual(current_context(), lc)
  104. _, _, unblock = self._start_task(linearizer, key)
  105. for i in range(1, 100):
  106. defer.ensureDeferred(func(i)) # type: ignore[unused-awaitable]
  107. d = defer.ensureDeferred(func(1000))
  108. unblock()
  109. self.successResultOf(d)
  110. def test_multiple_entries(self) -> None:
  111. """Tests a `Linearizer` with a concurrency above 1."""
  112. limiter = Linearizer(max_count=3)
  113. key = object()
  114. _, acquired_d1, unblock1 = self._start_task(limiter, key)
  115. self.assertTrue(acquired_d1.called)
  116. _, acquired_d2, unblock2 = self._start_task(limiter, key)
  117. self.assertTrue(acquired_d2.called)
  118. _, acquired_d3, unblock3 = self._start_task(limiter, key)
  119. self.assertTrue(acquired_d3.called)
  120. # These next two tasks have to wait.
  121. _, acquired_d4, unblock4 = self._start_task(limiter, key)
  122. self.assertFalse(acquired_d4.called)
  123. _, acquired_d5, unblock5 = self._start_task(limiter, key)
  124. self.assertFalse(acquired_d5.called)
  125. # Once the first task completes, the fourth task can continue.
  126. unblock1()
  127. self.assertTrue(acquired_d4.called)
  128. self.assertFalse(acquired_d5.called)
  129. # Once the third task completes, the fifth task can continue.
  130. unblock3()
  131. self.assertTrue(acquired_d5.called)
  132. # Make all tasks finish.
  133. unblock2()
  134. unblock4()
  135. unblock5()
  136. # The next task shouldn't have to wait.
  137. _, acquired_d6, unblock6 = self._start_task(limiter, key)
  138. self.assertTrue(acquired_d6)
  139. unblock6()
  140. def test_cancellation(self) -> None:
  141. """Tests cancellation while waiting for a `Linearizer`."""
  142. linearizer = Linearizer()
  143. key = object()
  144. d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
  145. self.assertTrue(acquired_d1.called)
  146. # Create a second task, waiting for the first task.
  147. d2, acquired_d2, _ = self._start_task(linearizer, key)
  148. self.assertFalse(acquired_d2.called)
  149. # Create a third task, waiting for the second task.
  150. d3, acquired_d3, unblock3 = self._start_task(linearizer, key)
  151. self.assertFalse(acquired_d3.called)
  152. # Cancel the waiting second task.
  153. d2.cancel()
  154. unblock1()
  155. self.successResultOf(d1)
  156. self.assertTrue(d2.called)
  157. self.failureResultOf(d2, CancelledError)
  158. # The third task should continue running.
  159. self.assertTrue(
  160. acquired_d3.called,
  161. "Third task did not get the lock after the second task was cancelled",
  162. )
  163. unblock3()
  164. self.successResultOf(d3)
  165. def test_cancellation_during_sleep(self) -> None:
  166. """Tests cancellation during the sleep just after waiting for a `Linearizer`."""
  167. linearizer = Linearizer()
  168. key = object()
  169. d1, acquired_d1, unblock1 = self._start_task(linearizer, key)
  170. self.assertTrue(acquired_d1.called)
  171. # Create a second task, waiting for the first task.
  172. d2, acquired_d2, _ = self._start_task(linearizer, key)
  173. self.assertFalse(acquired_d2.called)
  174. # Create a third task, waiting for the second task.
  175. d3, acquired_d3, unblock3 = self._start_task(linearizer, key)
  176. self.assertFalse(acquired_d3.called)
  177. # Once the first task completes, cancel the waiting second task while it is
  178. # sleeping just after acquiring the lock.
  179. unblock1(pump_reactor=False)
  180. self.successResultOf(d1)
  181. d2.cancel()
  182. self._pump()
  183. self.assertTrue(d2.called)
  184. self.failureResultOf(d2, CancelledError)
  185. # The third task should continue running.
  186. self.assertTrue(
  187. acquired_d3.called,
  188. "Third task did not get the lock after the second task was cancelled",
  189. )
  190. unblock3()
  191. self.successResultOf(d3)