test_linearizer.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. from six.moves import range
  17. from twisted.internet import defer, reactor
  18. from twisted.internet.defer import CancelledError
  19. from synapse.logging.context import LoggingContext
  20. from synapse.util import Clock
  21. from synapse.util.async_helpers import Linearizer
  22. from tests import unittest
  23. class LinearizerTestCase(unittest.TestCase):
  24. @defer.inlineCallbacks
  25. def test_linearizer(self):
  26. linearizer = Linearizer()
  27. key = object()
  28. d1 = linearizer.queue(key)
  29. cm1 = yield d1
  30. d2 = linearizer.queue(key)
  31. self.assertFalse(d2.called)
  32. with cm1:
  33. self.assertFalse(d2.called)
  34. with (yield d2):
  35. pass
  36. def test_lots_of_queued_things(self):
  37. # we have one slow thing, and lots of fast things queued up behind it.
  38. # it should *not* explode the stack.
  39. linearizer = Linearizer()
  40. @defer.inlineCallbacks
  41. def func(i, sleep=False):
  42. with LoggingContext("func(%s)" % i) as lc:
  43. with (yield linearizer.queue("")):
  44. self.assertEqual(LoggingContext.current_context(), lc)
  45. if sleep:
  46. yield Clock(reactor).sleep(0)
  47. self.assertEqual(LoggingContext.current_context(), lc)
  48. func(0, sleep=True)
  49. for i in range(1, 100):
  50. func(i)
  51. return func(1000)
  52. @defer.inlineCallbacks
  53. def test_multiple_entries(self):
  54. limiter = Linearizer(max_count=3)
  55. key = object()
  56. d1 = limiter.queue(key)
  57. cm1 = yield d1
  58. d2 = limiter.queue(key)
  59. cm2 = yield d2
  60. d3 = limiter.queue(key)
  61. cm3 = yield d3
  62. d4 = limiter.queue(key)
  63. self.assertFalse(d4.called)
  64. d5 = limiter.queue(key)
  65. self.assertFalse(d5.called)
  66. with cm1:
  67. self.assertFalse(d4.called)
  68. self.assertFalse(d5.called)
  69. cm4 = yield d4
  70. self.assertFalse(d5.called)
  71. with cm3:
  72. self.assertFalse(d5.called)
  73. cm5 = yield d5
  74. with cm2:
  75. pass
  76. with cm4:
  77. pass
  78. with cm5:
  79. pass
  80. d6 = limiter.queue(key)
  81. with (yield d6):
  82. pass
  83. @defer.inlineCallbacks
  84. def test_cancellation(self):
  85. linearizer = Linearizer()
  86. key = object()
  87. d1 = linearizer.queue(key)
  88. cm1 = yield d1
  89. d2 = linearizer.queue(key)
  90. self.assertFalse(d2.called)
  91. d3 = linearizer.queue(key)
  92. self.assertFalse(d3.called)
  93. d2.cancel()
  94. with cm1:
  95. pass
  96. self.assertTrue(d2.called)
  97. try:
  98. yield d2
  99. self.fail("Expected d2 to raise CancelledError")
  100. except CancelledError:
  101. pass
  102. with (yield d3):
  103. pass