test_lock.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. # Copyright 2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from twisted.internet import defer, reactor
  15. from twisted.internet.base import ReactorBase
  16. from twisted.internet.defer import Deferred
  17. from synapse.server import HomeServer
  18. from synapse.storage.databases.main.lock import _LOCK_TIMEOUT_MS
  19. from tests import unittest
  20. class LockTestCase(unittest.HomeserverTestCase):
  21. def prepare(self, reactor, clock, hs: HomeServer):
  22. self.store = hs.get_datastores().main
  23. def test_acquire_contention(self):
  24. # Track the number of tasks holding the lock.
  25. # Should be at most 1.
  26. in_lock = 0
  27. max_in_lock = 0
  28. release_lock: "Deferred[None]" = Deferred()
  29. async def task():
  30. nonlocal in_lock
  31. nonlocal max_in_lock
  32. lock = await self.store.try_acquire_lock("name", "key")
  33. if not lock:
  34. return
  35. async with lock:
  36. in_lock += 1
  37. max_in_lock = max(max_in_lock, in_lock)
  38. # Block to allow other tasks to attempt to take the lock.
  39. await release_lock
  40. in_lock -= 1
  41. # Start 3 tasks.
  42. task1 = defer.ensureDeferred(task())
  43. task2 = defer.ensureDeferred(task())
  44. task3 = defer.ensureDeferred(task())
  45. # Give the reactor a kick so that the database transaction returns.
  46. self.pump()
  47. release_lock.callback(None)
  48. # Run the tasks to completion.
  49. # To work around `Linearizer`s using a different reactor to sleep when
  50. # contended (#12841), we call `runUntilCurrent` on
  51. # `twisted.internet.reactor`, which is a different reactor to that used
  52. # by the homeserver.
  53. assert isinstance(reactor, ReactorBase)
  54. self.get_success(task1)
  55. reactor.runUntilCurrent()
  56. self.get_success(task2)
  57. reactor.runUntilCurrent()
  58. self.get_success(task3)
  59. # At most one task should have held the lock at a time.
  60. self.assertEqual(max_in_lock, 1)
  61. def test_simple_lock(self):
  62. """Test that we can take out a lock and that while we hold it nobody
  63. else can take it out.
  64. """
  65. # First to acquire this lock, so it should complete
  66. lock = self.get_success(self.store.try_acquire_lock("name", "key"))
  67. assert lock is not None
  68. # Enter the context manager
  69. self.get_success(lock.__aenter__())
  70. # Attempting to acquire the lock again fails.
  71. lock2 = self.get_success(self.store.try_acquire_lock("name", "key"))
  72. self.assertIsNone(lock2)
  73. # Calling `is_still_valid` reports true.
  74. self.assertTrue(self.get_success(lock.is_still_valid()))
  75. # Drop the lock
  76. self.get_success(lock.__aexit__(None, None, None))
  77. # We can now acquire the lock again.
  78. lock3 = self.get_success(self.store.try_acquire_lock("name", "key"))
  79. assert lock3 is not None
  80. self.get_success(lock3.__aenter__())
  81. self.get_success(lock3.__aexit__(None, None, None))
  82. def test_maintain_lock(self):
  83. """Test that we don't time out locks while they're still active"""
  84. lock = self.get_success(self.store.try_acquire_lock("name", "key"))
  85. assert lock is not None
  86. self.get_success(lock.__aenter__())
  87. # Wait for ages with the lock, we should not be able to get the lock.
  88. self.reactor.advance(5 * _LOCK_TIMEOUT_MS / 1000)
  89. lock2 = self.get_success(self.store.try_acquire_lock("name", "key"))
  90. self.assertIsNone(lock2)
  91. self.get_success(lock.__aexit__(None, None, None))
  92. def test_timeout_lock(self):
  93. """Test that we time out locks if they're not updated for ages"""
  94. lock = self.get_success(self.store.try_acquire_lock("name", "key"))
  95. assert lock is not None
  96. self.get_success(lock.__aenter__())
  97. # We simulate the process getting stuck by cancelling the looping call
  98. # that keeps the lock active.
  99. lock._looping_call.stop()
  100. # Wait for the lock to timeout.
  101. self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000)
  102. lock2 = self.get_success(self.store.try_acquire_lock("name", "key"))
  103. self.assertIsNotNone(lock2)
  104. self.assertFalse(self.get_success(lock.is_still_valid()))
  105. def test_drop(self):
  106. """Test that dropping the context manager means we stop renewing the lock"""
  107. lock = self.get_success(self.store.try_acquire_lock("name", "key"))
  108. self.assertIsNotNone(lock)
  109. del lock
  110. # Wait for the lock to timeout.
  111. self.reactor.advance(2 * _LOCK_TIMEOUT_MS / 1000)
  112. lock2 = self.get_success(self.store.try_acquire_lock("name", "key"))
  113. self.assertIsNotNone(lock2)
  114. def test_shutdown(self):
  115. """Test that shutting down Synapse releases the locks"""
  116. # Acquire two locks
  117. lock = self.get_success(self.store.try_acquire_lock("name", "key1"))
  118. self.assertIsNotNone(lock)
  119. lock2 = self.get_success(self.store.try_acquire_lock("name", "key2"))
  120. self.assertIsNotNone(lock2)
  121. # Now call the shutdown code
  122. self.get_success(self.store._on_shutdown())
  123. self.assertEqual(self.store._live_tokens, {})