test_async_helpers.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584
  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import traceback
  15. from typing import Generator, List, NoReturn, Optional
  16. from parameterized import parameterized_class
  17. from twisted.internet import defer
  18. from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
  19. from twisted.internet.task import Clock
  20. from twisted.python.failure import Failure
  21. from synapse.logging.context import (
  22. SENTINEL_CONTEXT,
  23. LoggingContext,
  24. PreserveLoggingContext,
  25. current_context,
  26. make_deferred_yieldable,
  27. )
  28. from synapse.util.async_helpers import (
  29. AwakenableSleeper,
  30. ObservableDeferred,
  31. concurrently_execute,
  32. delay_cancellation,
  33. stop_cancellation,
  34. timeout_deferred,
  35. )
  36. from tests.server import get_clock
  37. from tests.unittest import TestCase
  38. class ObservableDeferredTest(TestCase):
  39. def test_succeed(self) -> None:
  40. origin_d: "Deferred[int]" = Deferred()
  41. observable = ObservableDeferred(origin_d)
  42. observer1 = observable.observe()
  43. observer2 = observable.observe()
  44. self.assertFalse(observer1.called)
  45. self.assertFalse(observer2.called)
  46. # check the first observer is called first
  47. def check_called_first(res: int) -> int:
  48. self.assertFalse(observer2.called)
  49. return res
  50. observer1.addBoth(check_called_first)
  51. # store the results
  52. results: List[Optional[int]] = [None, None]
  53. def check_val(res: int, idx: int) -> int:
  54. results[idx] = res
  55. return res
  56. observer1.addCallback(check_val, 0)
  57. observer2.addCallback(check_val, 1)
  58. origin_d.callback(123)
  59. self.assertEqual(results[0], 123, "observer 1 callback result")
  60. self.assertEqual(results[1], 123, "observer 2 callback result")
  61. def test_failure(self) -> None:
  62. origin_d: Deferred = Deferred()
  63. observable = ObservableDeferred(origin_d, consumeErrors=True)
  64. observer1 = observable.observe()
  65. observer2 = observable.observe()
  66. self.assertFalse(observer1.called)
  67. self.assertFalse(observer2.called)
  68. # check the first observer is called first
  69. def check_called_first(res: int) -> int:
  70. self.assertFalse(observer2.called)
  71. return res
  72. observer1.addBoth(check_called_first)
  73. # store the results
  74. results: List[Optional[Failure]] = [None, None]
  75. def check_failure(res: Failure, idx: int) -> None:
  76. results[idx] = res
  77. return None
  78. observer1.addErrback(check_failure, 0)
  79. observer2.addErrback(check_failure, 1)
  80. try:
  81. raise Exception("gah!")
  82. except Exception as e:
  83. origin_d.errback(e)
  84. assert results[0] is not None
  85. self.assertEqual(str(results[0].value), "gah!", "observer 1 errback result")
  86. assert results[1] is not None
  87. self.assertEqual(str(results[1].value), "gah!", "observer 2 errback result")
  88. def test_cancellation(self) -> None:
  89. """Test that cancelling an observer does not affect other observers."""
  90. origin_d: "Deferred[int]" = Deferred()
  91. observable = ObservableDeferred(origin_d, consumeErrors=True)
  92. observer1 = observable.observe()
  93. observer2 = observable.observe()
  94. observer3 = observable.observe()
  95. self.assertFalse(observer1.called)
  96. self.assertFalse(observer2.called)
  97. self.assertFalse(observer3.called)
  98. # cancel the second observer
  99. observer2.cancel()
  100. self.assertFalse(observer1.called)
  101. self.failureResultOf(observer2, CancelledError)
  102. self.assertFalse(observer3.called)
  103. # other observers resolve as normal
  104. origin_d.callback(123)
  105. self.assertEqual(observer1.result, 123, "observer 1 callback result")
  106. self.assertEqual(observer3.result, 123, "observer 3 callback result")
  107. # additional observers resolve as normal
  108. observer4 = observable.observe()
  109. self.assertEqual(observer4.result, 123, "observer 4 callback result")
  110. class TimeoutDeferredTest(TestCase):
  111. def setUp(self) -> None:
  112. self.clock = Clock()
  113. def test_times_out(self) -> None:
  114. """Basic test case that checks that the original deferred is cancelled and that
  115. the timing-out deferred is errbacked
  116. """
  117. cancelled = False
  118. def canceller(_d: Deferred) -> None:
  119. nonlocal cancelled
  120. cancelled = True
  121. non_completing_d: Deferred = Deferred(canceller)
  122. timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock)
  123. self.assertNoResult(timing_out_d)
  124. self.assertFalse(cancelled, "deferred was cancelled prematurely")
  125. self.clock.pump((1.0,))
  126. self.assertTrue(cancelled, "deferred was not cancelled by timeout")
  127. self.failureResultOf(timing_out_d, defer.TimeoutError)
  128. def test_times_out_when_canceller_throws(self) -> None:
  129. """Test that we have successfully worked around
  130. https://twistedmatrix.com/trac/ticket/9534"""
  131. def canceller(_d: Deferred) -> None:
  132. raise Exception("can't cancel this deferred")
  133. non_completing_d: Deferred = Deferred(canceller)
  134. timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock)
  135. self.assertNoResult(timing_out_d)
  136. self.clock.pump((1.0,))
  137. self.failureResultOf(timing_out_d, defer.TimeoutError)
  138. def test_logcontext_is_preserved_on_cancellation(self) -> None:
  139. blocking_was_cancelled = False
  140. @defer.inlineCallbacks
  141. def blocking() -> Generator["Deferred[object]", object, None]:
  142. nonlocal blocking_was_cancelled
  143. non_completing_d: Deferred = Deferred()
  144. with PreserveLoggingContext():
  145. try:
  146. yield non_completing_d
  147. except CancelledError:
  148. blocking_was_cancelled = True
  149. raise
  150. with LoggingContext("one") as context_one:
  151. # the errbacks should be run in the test logcontext
  152. def errback(res: Failure, deferred_name: str) -> Failure:
  153. self.assertIs(
  154. current_context(),
  155. context_one,
  156. "errback %s run in unexpected logcontext %s"
  157. % (deferred_name, current_context()),
  158. )
  159. return res
  160. original_deferred = blocking()
  161. original_deferred.addErrback(errback, "orig")
  162. timing_out_d = timeout_deferred(original_deferred, 1.0, self.clock)
  163. self.assertNoResult(timing_out_d)
  164. self.assertIs(current_context(), SENTINEL_CONTEXT)
  165. timing_out_d.addErrback(errback, "timingout")
  166. self.clock.pump((1.0,))
  167. self.assertTrue(
  168. blocking_was_cancelled, "non-completing deferred was not cancelled"
  169. )
  170. self.failureResultOf(timing_out_d, defer.TimeoutError)
  171. self.assertIs(current_context(), context_one)
  172. class _TestException(Exception):
  173. pass
  174. class ConcurrentlyExecuteTest(TestCase):
  175. def test_limits_runners(self) -> None:
  176. """If we have more tasks than runners, we should get the limit of runners"""
  177. started = 0
  178. waiters = []
  179. processed = []
  180. async def callback(v: int) -> None:
  181. # when we first enter, bump the start count
  182. nonlocal started
  183. started += 1
  184. # record the fact we got an item
  185. processed.append(v)
  186. # wait for the goahead before returning
  187. d2: "Deferred[int]" = Deferred()
  188. waiters.append(d2)
  189. await d2
  190. # set it going
  191. d2 = ensureDeferred(concurrently_execute(callback, [1, 2, 3, 4, 5], 3))
  192. # check we got exactly 3 processes
  193. self.assertEqual(started, 3)
  194. self.assertEqual(len(waiters), 3)
  195. # let one finish
  196. waiters.pop().callback(0)
  197. # ... which should start another
  198. self.assertEqual(started, 4)
  199. self.assertEqual(len(waiters), 3)
  200. # we still shouldn't be done
  201. self.assertNoResult(d2)
  202. # finish the job
  203. while waiters:
  204. waiters.pop().callback(0)
  205. # check everything got done
  206. self.assertEqual(started, 5)
  207. self.assertCountEqual(processed, [1, 2, 3, 4, 5])
  208. self.successResultOf(d2)
  209. def test_preserves_stacktraces(self) -> None:
  210. """Test that the stacktrace from an exception thrown in the callback is preserved"""
  211. d1: "Deferred[int]" = Deferred()
  212. async def callback(v: int) -> None:
  213. # alas, this doesn't work at all without an await here
  214. await d1
  215. raise _TestException("bah")
  216. async def caller() -> None:
  217. try:
  218. await concurrently_execute(callback, [1], 2)
  219. except _TestException as e:
  220. tb = traceback.extract_tb(e.__traceback__)
  221. # we expect to see "caller", "concurrently_execute" and "callback".
  222. self.assertEqual(tb[0].name, "caller")
  223. self.assertEqual(tb[1].name, "concurrently_execute")
  224. self.assertEqual(tb[-1].name, "callback")
  225. else:
  226. self.fail("No exception thrown")
  227. d2 = ensureDeferred(caller())
  228. d1.callback(0)
  229. self.successResultOf(d2)
  230. def test_preserves_stacktraces_on_preformed_failure(self) -> None:
  231. """Test that the stacktrace on a Failure returned by the callback is preserved"""
  232. d1: "Deferred[int]" = Deferred()
  233. f = Failure(_TestException("bah"))
  234. async def callback(v: int) -> None:
  235. # alas, this doesn't work at all without an await here
  236. await d1
  237. await defer.fail(f)
  238. async def caller() -> None:
  239. try:
  240. await concurrently_execute(callback, [1], 2)
  241. except _TestException as e:
  242. tb = traceback.extract_tb(e.__traceback__)
  243. # we expect to see "caller", "concurrently_execute", "callback",
  244. # and some magic from inside ensureDeferred that happens when .fail
  245. # is called.
  246. self.assertEqual(tb[0].name, "caller")
  247. self.assertEqual(tb[1].name, "concurrently_execute")
  248. self.assertEqual(tb[-2].name, "callback")
  249. else:
  250. self.fail("No exception thrown")
  251. d2 = ensureDeferred(caller())
  252. d1.callback(0)
  253. self.successResultOf(d2)
  254. @parameterized_class(
  255. ("wrapper",),
  256. [("stop_cancellation",), ("delay_cancellation",)],
  257. )
  258. class CancellationWrapperTests(TestCase):
  259. """Common tests for the `stop_cancellation` and `delay_cancellation` functions."""
  260. wrapper: str
  261. def wrap_deferred(self, deferred: "Deferred[str]") -> "Deferred[str]":
  262. if self.wrapper == "stop_cancellation":
  263. return stop_cancellation(deferred)
  264. elif self.wrapper == "delay_cancellation":
  265. return delay_cancellation(deferred)
  266. else:
  267. raise ValueError(f"Unsupported wrapper type: {self.wrapper}")
  268. def test_succeed(self) -> None:
  269. """Test that the new `Deferred` receives the result."""
  270. deferred: "Deferred[str]" = Deferred()
  271. wrapper_deferred = self.wrap_deferred(deferred)
  272. # Success should propagate through.
  273. deferred.callback("success")
  274. self.assertTrue(wrapper_deferred.called)
  275. self.assertEqual("success", self.successResultOf(wrapper_deferred))
  276. def test_failure(self) -> None:
  277. """Test that the new `Deferred` receives the `Failure`."""
  278. deferred: "Deferred[str]" = Deferred()
  279. wrapper_deferred = self.wrap_deferred(deferred)
  280. # Failure should propagate through.
  281. deferred.errback(ValueError("abc"))
  282. self.assertTrue(wrapper_deferred.called)
  283. self.failureResultOf(wrapper_deferred, ValueError)
  284. self.assertIsNone(deferred.result, "`Failure` was not consumed")
  285. class StopCancellationTests(TestCase):
  286. """Tests for the `stop_cancellation` function."""
  287. def test_cancellation(self) -> None:
  288. """Test that cancellation of the new `Deferred` leaves the original running."""
  289. deferred: "Deferred[str]" = Deferred()
  290. wrapper_deferred = stop_cancellation(deferred)
  291. # Cancel the new `Deferred`.
  292. wrapper_deferred.cancel()
  293. self.assertTrue(wrapper_deferred.called)
  294. self.failureResultOf(wrapper_deferred, CancelledError)
  295. self.assertFalse(
  296. deferred.called, "Original `Deferred` was unexpectedly cancelled"
  297. )
  298. # Now make the original `Deferred` fail.
  299. # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed
  300. # in logs.
  301. deferred.errback(ValueError("abc"))
  302. self.assertIsNone(deferred.result, "`Failure` was not consumed")
  303. class DelayCancellationTests(TestCase):
  304. """Tests for the `delay_cancellation` function."""
  305. def test_deferred_cancellation(self) -> None:
  306. """Test that cancellation of the new `Deferred` waits for the original."""
  307. deferred: "Deferred[str]" = Deferred()
  308. wrapper_deferred = delay_cancellation(deferred)
  309. # Cancel the new `Deferred`.
  310. wrapper_deferred.cancel()
  311. self.assertNoResult(wrapper_deferred)
  312. self.assertFalse(
  313. deferred.called, "Original `Deferred` was unexpectedly cancelled"
  314. )
  315. # Now make the original `Deferred` fail.
  316. # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed
  317. # in logs.
  318. deferred.errback(ValueError("abc"))
  319. self.assertIsNone(deferred.result, "`Failure` was not consumed")
  320. # Now that the original `Deferred` has failed, we should get a `CancelledError`.
  321. self.failureResultOf(wrapper_deferred, CancelledError)
  322. def test_coroutine_cancellation(self) -> None:
  323. """Test that cancellation of the new `Deferred` waits for the original."""
  324. blocking_deferred: "Deferred[None]" = Deferred()
  325. completion_deferred: "Deferred[None]" = Deferred()
  326. async def task() -> NoReturn:
  327. await blocking_deferred
  328. completion_deferred.callback(None)
  329. # Raise an exception. Twisted should consume it, otherwise unwanted
  330. # tracebacks will be printed in logs.
  331. raise ValueError("abc")
  332. wrapper_deferred = delay_cancellation(task())
  333. # Cancel the new `Deferred`.
  334. wrapper_deferred.cancel()
  335. self.assertNoResult(wrapper_deferred)
  336. self.assertFalse(
  337. blocking_deferred.called, "Cancellation was propagated too deep"
  338. )
  339. self.assertFalse(completion_deferred.called)
  340. # Unblock the task.
  341. blocking_deferred.callback(None)
  342. self.assertTrue(completion_deferred.called)
  343. # Now that the original coroutine has failed, we should get a `CancelledError`.
  344. self.failureResultOf(wrapper_deferred, CancelledError)
  345. def test_suppresses_second_cancellation(self) -> None:
  346. """Test that a second cancellation is suppressed.
  347. Identical to `test_cancellation` except the new `Deferred` is cancelled twice.
  348. """
  349. deferred: "Deferred[str]" = Deferred()
  350. wrapper_deferred = delay_cancellation(deferred)
  351. # Cancel the new `Deferred`, twice.
  352. wrapper_deferred.cancel()
  353. wrapper_deferred.cancel()
  354. self.assertNoResult(wrapper_deferred)
  355. self.assertFalse(
  356. deferred.called, "Original `Deferred` was unexpectedly cancelled"
  357. )
  358. # Now make the original `Deferred` fail.
  359. # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed
  360. # in logs.
  361. deferred.errback(ValueError("abc"))
  362. self.assertIsNone(deferred.result, "`Failure` was not consumed")
  363. # Now that the original `Deferred` has failed, we should get a `CancelledError`.
  364. self.failureResultOf(wrapper_deferred, CancelledError)
  365. def test_propagates_cancelled_error(self) -> None:
  366. """Test that a `CancelledError` from the original `Deferred` gets propagated."""
  367. deferred: "Deferred[str]" = Deferred()
  368. wrapper_deferred = delay_cancellation(deferred)
  369. # Fail the original `Deferred` with a `CancelledError`.
  370. cancelled_error = CancelledError()
  371. deferred.errback(cancelled_error)
  372. # The new `Deferred` should fail with exactly the same `CancelledError`.
  373. self.assertTrue(wrapper_deferred.called)
  374. self.assertIs(cancelled_error, self.failureResultOf(wrapper_deferred).value)
  375. def test_preserves_logcontext(self) -> None:
  376. """Test that logging contexts are preserved."""
  377. blocking_d: "Deferred[None]" = Deferred()
  378. async def inner() -> None:
  379. await make_deferred_yieldable(blocking_d)
  380. async def outer() -> None:
  381. with LoggingContext("c") as c:
  382. try:
  383. await delay_cancellation(inner())
  384. self.fail("`CancelledError` was not raised")
  385. except CancelledError:
  386. self.assertEqual(c, current_context())
  387. # Succeed with no error, unless the logging context is wrong.
  388. # Run and block inside `inner()`.
  389. d = defer.ensureDeferred(outer())
  390. self.assertEqual(SENTINEL_CONTEXT, current_context())
  391. d.cancel()
  392. # Now unblock. `outer()` will consume the `CancelledError` and check the
  393. # logging context.
  394. blocking_d.callback(None)
  395. self.successResultOf(d)
  396. class AwakenableSleeperTests(TestCase):
  397. "Tests AwakenableSleeper"
  398. def test_sleep(self) -> None:
  399. reactor, _ = get_clock()
  400. sleeper = AwakenableSleeper(reactor)
  401. d = defer.ensureDeferred(sleeper.sleep("name", 1000))
  402. reactor.pump([0.0])
  403. self.assertFalse(d.called)
  404. reactor.advance(0.5)
  405. self.assertFalse(d.called)
  406. reactor.advance(0.6)
  407. self.assertTrue(d.called)
  408. def test_explicit_wake(self) -> None:
  409. reactor, _ = get_clock()
  410. sleeper = AwakenableSleeper(reactor)
  411. d = defer.ensureDeferred(sleeper.sleep("name", 1000))
  412. reactor.pump([0.0])
  413. self.assertFalse(d.called)
  414. reactor.advance(0.5)
  415. self.assertFalse(d.called)
  416. sleeper.wake("name")
  417. self.assertTrue(d.called)
  418. reactor.advance(0.6)
  419. def test_multiple_sleepers_timeout(self) -> None:
  420. reactor, _ = get_clock()
  421. sleeper = AwakenableSleeper(reactor)
  422. d1 = defer.ensureDeferred(sleeper.sleep("name", 1000))
  423. reactor.advance(0.6)
  424. self.assertFalse(d1.called)
  425. # Add another sleeper
  426. d2 = defer.ensureDeferred(sleeper.sleep("name", 1000))
  427. # Only the first sleep should time out now.
  428. reactor.advance(0.6)
  429. self.assertTrue(d1.called)
  430. self.assertFalse(d2.called)
  431. reactor.advance(0.6)
  432. self.assertTrue(d2.called)
  433. def test_multiple_sleepers_wake(self) -> None:
  434. reactor, _ = get_clock()
  435. sleeper = AwakenableSleeper(reactor)
  436. d1 = defer.ensureDeferred(sleeper.sleep("name", 1000))
  437. reactor.advance(0.5)
  438. self.assertFalse(d1.called)
  439. # Add another sleeper
  440. d2 = defer.ensureDeferred(sleeper.sleep("name", 1000))
  441. # Neither should fire yet
  442. reactor.advance(0.3)
  443. self.assertFalse(d1.called)
  444. self.assertFalse(d2.called)
  445. # Explicitly waking both up works
  446. sleeper.wake("name")
  447. self.assertTrue(d1.called)
  448. self.assertTrue(d2.called)