test_rwlock.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import AsyncContextManager, Callable, Sequence, Tuple
  15. from twisted.internet import defer
  16. from twisted.internet.defer import CancelledError, Deferred
  17. from synapse.util.async_helpers import ReadWriteLock
  18. from tests import unittest
  19. class ReadWriteLockTestCase(unittest.TestCase):
  20. def _start_reader_or_writer(
  21. self,
  22. read_or_write: Callable[[str], AsyncContextManager],
  23. key: str,
  24. return_value: str,
  25. ) -> Tuple["Deferred[str]", "Deferred[None]", "Deferred[None]"]:
  26. """Starts a reader or writer which acquires the lock, blocks, then completes.
  27. Args:
  28. read_or_write: A function returning a context manager for a lock.
  29. Either a bound `ReadWriteLock.read` or `ReadWriteLock.write`.
  30. key: The key to read or write.
  31. return_value: A string that the reader or writer will resolve with when
  32. done.
  33. Returns:
  34. A tuple of three `Deferred`s:
  35. * A cancellable `Deferred` for the entire read or write operation that
  36. resolves with `return_value` on successful completion.
  37. * A `Deferred` that resolves once the reader or writer acquires the lock.
  38. * A `Deferred` that blocks the reader or writer. Must be resolved by the
  39. caller to allow the reader or writer to release the lock and complete.
  40. """
  41. acquired_d: "Deferred[None]" = Deferred()
  42. unblock_d: "Deferred[None]" = Deferred()
  43. async def reader_or_writer() -> str:
  44. async with read_or_write(key):
  45. acquired_d.callback(None)
  46. await unblock_d
  47. return return_value
  48. d = defer.ensureDeferred(reader_or_writer())
  49. return d, acquired_d, unblock_d
  50. def _start_blocking_reader(
  51. self, rwlock: ReadWriteLock, key: str, return_value: str
  52. ) -> Tuple["Deferred[str]", "Deferred[None]", "Deferred[None]"]:
  53. """Starts a reader which acquires the lock, blocks, then releases the lock.
  54. See the docstring for `_start_reader_or_writer` for details about the arguments
  55. and return values.
  56. """
  57. return self._start_reader_or_writer(rwlock.read, key, return_value)
  58. def _start_blocking_writer(
  59. self, rwlock: ReadWriteLock, key: str, return_value: str
  60. ) -> Tuple["Deferred[str]", "Deferred[None]", "Deferred[None]"]:
  61. """Starts a writer which acquires the lock, blocks, then releases the lock.
  62. See the docstring for `_start_reader_or_writer` for details about the arguments
  63. and return values.
  64. """
  65. return self._start_reader_or_writer(rwlock.write, key, return_value)
  66. def _start_nonblocking_reader(
  67. self, rwlock: ReadWriteLock, key: str, return_value: str
  68. ) -> Tuple["Deferred[str]", "Deferred[None]"]:
  69. """Starts a reader which acquires the lock, then releases it immediately.
  70. See the docstring for `_start_reader_or_writer` for details about the arguments.
  71. Returns:
  72. A tuple of two `Deferred`s:
  73. * A cancellable `Deferred` for the entire read operation that resolves with
  74. `return_value` on successful completion.
  75. * A `Deferred` that resolves once the reader acquires the lock.
  76. """
  77. d, acquired_d, unblock_d = self._start_reader_or_writer(
  78. rwlock.read, key, return_value
  79. )
  80. unblock_d.callback(None)
  81. return d, acquired_d
  82. def _start_nonblocking_writer(
  83. self, rwlock: ReadWriteLock, key: str, return_value: str
  84. ) -> Tuple["Deferred[str]", "Deferred[None]"]:
  85. """Starts a writer which acquires the lock, then releases it immediately.
  86. See the docstring for `_start_reader_or_writer` for details about the arguments.
  87. Returns:
  88. A tuple of two `Deferred`s:
  89. * A cancellable `Deferred` for the entire write operation that resolves
  90. with `return_value` on successful completion.
  91. * A `Deferred` that resolves once the writer acquires the lock.
  92. """
  93. d, acquired_d, unblock_d = self._start_reader_or_writer(
  94. rwlock.write, key, return_value
  95. )
  96. unblock_d.callback(None)
  97. return d, acquired_d
  98. def _assert_first_n_resolved(
  99. self, deferreds: Sequence["defer.Deferred[None]"], n: int
  100. ) -> None:
  101. """Assert that exactly the first n `Deferred`s in the given list are resolved.
  102. Args:
  103. deferreds: The list of `Deferred`s to be checked.
  104. n: The number of `Deferred`s at the start of `deferreds` that should be
  105. resolved.
  106. """
  107. for i, d in enumerate(deferreds[:n]):
  108. self.assertTrue(d.called, msg="deferred %d was unexpectedly unresolved" % i)
  109. for i, d in enumerate(deferreds[n:]):
  110. self.assertFalse(
  111. d.called, msg="deferred %d was unexpectedly resolved" % (i + n)
  112. )
  113. def test_rwlock(self) -> None:
  114. rwlock = ReadWriteLock()
  115. key = "key"
  116. ds = [
  117. self._start_blocking_reader(rwlock, key, "0"),
  118. self._start_blocking_reader(rwlock, key, "1"),
  119. self._start_blocking_writer(rwlock, key, "2"),
  120. self._start_blocking_writer(rwlock, key, "3"),
  121. self._start_blocking_reader(rwlock, key, "4"),
  122. self._start_blocking_reader(rwlock, key, "5"),
  123. self._start_blocking_writer(rwlock, key, "6"),
  124. ]
  125. # `Deferred`s that resolve when each reader or writer acquires the lock.
  126. acquired_ds = [acquired_d for _, acquired_d, _ in ds]
  127. # `Deferred`s that will trigger the release of locks when resolved.
  128. release_ds = [release_d for _, _, release_d in ds]
  129. # The first two readers should acquire their locks.
  130. self._assert_first_n_resolved(acquired_ds, 2)
  131. # Release one of the read locks. The next writer should not acquire the lock,
  132. # because there is another reader holding the lock.
  133. self._assert_first_n_resolved(acquired_ds, 2)
  134. release_ds[0].callback(None)
  135. self._assert_first_n_resolved(acquired_ds, 2)
  136. # Release the other read lock. The next writer should acquire the lock.
  137. self._assert_first_n_resolved(acquired_ds, 2)
  138. release_ds[1].callback(None)
  139. self._assert_first_n_resolved(acquired_ds, 3)
  140. # Release the write lock. The next writer should acquire the lock.
  141. self._assert_first_n_resolved(acquired_ds, 3)
  142. release_ds[2].callback(None)
  143. self._assert_first_n_resolved(acquired_ds, 4)
  144. # Release the write lock. The next two readers should acquire locks.
  145. self._assert_first_n_resolved(acquired_ds, 4)
  146. release_ds[3].callback(None)
  147. self._assert_first_n_resolved(acquired_ds, 6)
  148. # Release one of the read locks. The next writer should not acquire the lock,
  149. # because there is another reader holding the lock.
  150. self._assert_first_n_resolved(acquired_ds, 6)
  151. release_ds[5].callback(None)
  152. self._assert_first_n_resolved(acquired_ds, 6)
  153. # Release the other read lock. The next writer should acquire the lock.
  154. self._assert_first_n_resolved(acquired_ds, 6)
  155. release_ds[4].callback(None)
  156. self._assert_first_n_resolved(acquired_ds, 7)
  157. # Release the write lock.
  158. release_ds[6].callback(None)
  159. # Acquire and release the write and read locks one last time for good measure.
  160. _, acquired_d = self._start_nonblocking_writer(rwlock, key, "last writer")
  161. self.assertTrue(acquired_d.called)
  162. _, acquired_d = self._start_nonblocking_reader(rwlock, key, "last reader")
  163. self.assertTrue(acquired_d.called)
  164. def test_lock_handoff_to_nonblocking_writer(self) -> None:
  165. """Test a writer handing the lock to another writer that completes instantly."""
  166. rwlock = ReadWriteLock()
  167. key = "key"
  168. d1, _, unblock = self._start_blocking_writer(rwlock, key, "write 1 completed")
  169. d2, _ = self._start_nonblocking_writer(rwlock, key, "write 2 completed")
  170. self.assertFalse(d1.called)
  171. self.assertFalse(d2.called)
  172. # Unblock the first writer. The second writer will complete without blocking.
  173. unblock.callback(None)
  174. self.assertTrue(d1.called)
  175. self.assertTrue(d2.called)
  176. # The `ReadWriteLock` should operate as normal.
  177. d3, _ = self._start_nonblocking_writer(rwlock, key, "write 3 completed")
  178. self.assertTrue(d3.called)
  179. def test_cancellation_while_holding_read_lock(self) -> None:
  180. """Test cancellation while holding a read lock.
  181. A waiting writer should be given the lock when the reader holding the lock is
  182. cancelled.
  183. """
  184. rwlock = ReadWriteLock()
  185. key = "key"
  186. # 1. A reader takes the lock and blocks.
  187. reader_d, _, _ = self._start_blocking_reader(rwlock, key, "read completed")
  188. # 2. A writer waits for the reader to complete.
  189. writer_d, _ = self._start_nonblocking_writer(rwlock, key, "write completed")
  190. self.assertFalse(writer_d.called)
  191. # 3. The reader is cancelled.
  192. reader_d.cancel()
  193. self.failureResultOf(reader_d, CancelledError)
  194. # 4. The writer should take the lock and complete.
  195. self.assertTrue(
  196. writer_d.called, "Writer is stuck waiting for a cancelled reader"
  197. )
  198. self.assertEqual("write completed", self.successResultOf(writer_d))
  199. def test_cancellation_while_holding_write_lock(self) -> None:
  200. """Test cancellation while holding a write lock.
  201. A waiting reader should be given the lock when the writer holding the lock is
  202. cancelled.
  203. """
  204. rwlock = ReadWriteLock()
  205. key = "key"
  206. # 1. A writer takes the lock and blocks.
  207. writer_d, _, _ = self._start_blocking_writer(rwlock, key, "write completed")
  208. # 2. A reader waits for the writer to complete.
  209. reader_d, _ = self._start_nonblocking_reader(rwlock, key, "read completed")
  210. self.assertFalse(reader_d.called)
  211. # 3. The writer is cancelled.
  212. writer_d.cancel()
  213. self.failureResultOf(writer_d, CancelledError)
  214. # 4. The reader should take the lock and complete.
  215. self.assertTrue(
  216. reader_d.called, "Reader is stuck waiting for a cancelled writer"
  217. )
  218. self.assertEqual("read completed", self.successResultOf(reader_d))
  219. def test_cancellation_while_waiting_for_read_lock(self) -> None:
  220. """Test cancellation while waiting for a read lock.
  221. Tests that cancelling a waiting reader:
  222. * does not cancel the writer it is waiting on
  223. * does not cancel the next writer waiting on it
  224. * does not allow the next writer to acquire the lock before an earlier writer
  225. has finished
  226. * does not keep the next writer waiting indefinitely
  227. These correspond to the asserts with explicit messages.
  228. """
  229. rwlock = ReadWriteLock()
  230. key = "key"
  231. # 1. A writer takes the lock and blocks.
  232. writer1_d, _, unblock_writer1 = self._start_blocking_writer(
  233. rwlock, key, "write 1 completed"
  234. )
  235. # 2. A reader waits for the first writer to complete.
  236. # This reader will be cancelled later.
  237. reader_d, _ = self._start_nonblocking_reader(rwlock, key, "read completed")
  238. self.assertFalse(reader_d.called)
  239. # 3. A second writer waits for both the first writer and the reader to complete.
  240. writer2_d, _ = self._start_nonblocking_writer(rwlock, key, "write 2 completed")
  241. self.assertFalse(writer2_d.called)
  242. # 4. The waiting reader is cancelled.
  243. # Neither of the writers should be cancelled.
  244. # The second writer should still be waiting, but only on the first writer.
  245. reader_d.cancel()
  246. self.failureResultOf(reader_d, CancelledError)
  247. self.assertFalse(writer1_d.called, "First writer was unexpectedly cancelled")
  248. self.assertFalse(
  249. writer2_d.called,
  250. "Second writer was unexpectedly cancelled or given the lock before the "
  251. "first writer finished",
  252. )
  253. # 5. Unblock the first writer, which should complete.
  254. unblock_writer1.callback(None)
  255. self.assertEqual("write 1 completed", self.successResultOf(writer1_d))
  256. # 6. The second writer should take the lock and complete.
  257. self.assertTrue(
  258. writer2_d.called, "Second writer is stuck waiting for a cancelled reader"
  259. )
  260. self.assertEqual("write 2 completed", self.successResultOf(writer2_d))
  261. def test_cancellation_while_waiting_for_write_lock(self) -> None:
  262. """Test cancellation while waiting for a write lock.
  263. Tests that cancelling a waiting writer:
  264. * does not cancel the reader or writer it is waiting on
  265. * does not cancel the next writer waiting on it
  266. * does not allow the next writer to acquire the lock before an earlier reader
  267. and writer have finished
  268. * does not keep the next writer waiting indefinitely
  269. These correspond to the asserts with explicit messages.
  270. """
  271. rwlock = ReadWriteLock()
  272. key = "key"
  273. # 1. A reader takes the lock and blocks.
  274. reader_d, _, unblock_reader = self._start_blocking_reader(
  275. rwlock, key, "read completed"
  276. )
  277. # 2. A writer waits for the reader to complete.
  278. writer1_d, _, unblock_writer1 = self._start_blocking_writer(
  279. rwlock, key, "write 1 completed"
  280. )
  281. # 3. A second writer waits for both the reader and first writer to complete.
  282. # This writer will be cancelled later.
  283. writer2_d, _ = self._start_nonblocking_writer(rwlock, key, "write 2 completed")
  284. self.assertFalse(writer2_d.called)
  285. # 4. A third writer waits for the second writer to complete.
  286. writer3_d, _ = self._start_nonblocking_writer(rwlock, key, "write 3 completed")
  287. self.assertFalse(writer3_d.called)
  288. # 5. The second writer is cancelled, but continues waiting for the lock.
  289. # The reader, first writer and third writer should not be cancelled.
  290. # The first writer should still be waiting on the reader.
  291. # The third writer should still be waiting on the second writer.
  292. writer2_d.cancel()
  293. self.assertNoResult(writer2_d)
  294. self.assertFalse(reader_d.called, "Reader was unexpectedly cancelled")
  295. self.assertFalse(writer1_d.called, "First writer was unexpectedly cancelled")
  296. self.assertFalse(
  297. writer3_d.called,
  298. "Third writer was unexpectedly cancelled or given the lock before the first "
  299. "writer finished",
  300. )
  301. # 6. Unblock the reader, which should complete.
  302. # The first writer should be given the lock and block.
  303. # The third writer should still be waiting on the second writer.
  304. unblock_reader.callback(None)
  305. self.assertEqual("read completed", self.successResultOf(reader_d))
  306. self.assertNoResult(writer2_d)
  307. self.assertFalse(
  308. writer3_d.called,
  309. "Third writer was unexpectedly given the lock before the first writer "
  310. "finished",
  311. )
  312. # 7. Unblock the first writer, which should complete.
  313. unblock_writer1.callback(None)
  314. self.assertEqual("write 1 completed", self.successResultOf(writer1_d))
  315. # 8. The second writer should take the lock and release it immediately, since it
  316. # has been cancelled.
  317. self.failureResultOf(writer2_d, CancelledError)
  318. # 9. The third writer should take the lock and complete.
  319. self.assertTrue(
  320. writer3_d.called, "Third writer is stuck waiting for a cancelled writer"
  321. )
  322. self.assertEqual("write 3 completed", self.successResultOf(writer3_d))