test_async_helpers.py 20 KB


  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import traceback
  15. from typing import Generator, List, NoReturn, Optional
  16. from parameterized import parameterized_class
  17. from twisted.internet import defer
  18. from twisted.internet.defer import CancelledError, Deferred, ensureDeferred
  19. from twisted.internet.task import Clock
  20. from twisted.python.failure import Failure
  21. from synapse.logging.context import (
  22. SENTINEL_CONTEXT,
  23. LoggingContext,
  24. PreserveLoggingContext,
  25. current_context,
  26. make_deferred_yieldable,
  27. )
  28. from synapse.util.async_helpers import (
  29. AwakenableSleeper,
  30. ObservableDeferred,
  31. concurrently_execute,
  32. delay_cancellation,
  33. stop_cancellation,
  34. timeout_deferred,
  35. )
  36. from tests.server import get_clock
  37. from tests.unittest import TestCase
  38. class ObservableDeferredTest(TestCase):
  39. def test_succeed(self) -> None:
  40. origin_d: "Deferred[int]" = Deferred()
  41. observable = ObservableDeferred(origin_d)
  42. observer1 = observable.observe()
  43. observer2 = observable.observe()
  44. self.assertFalse(observer1.called)
  45. self.assertFalse(observer2.called)
  46. # check the first observer is called first
  47. def check_called_first(res: int) -> int:
  48. self.assertFalse(observer2.called)
  49. return res
  50. observer1.addBoth(check_called_first)
  51. # store the results
  52. results: List[Optional[ObservableDeferred[int]]] = [None, None]
  53. def check_val(
  54. res: ObservableDeferred[int], idx: int
  55. ) -> ObservableDeferred[int]:
  56. results[idx] = res
  57. return res
  58. observer1.addCallback(check_val, 0)
  59. observer2.addCallback(check_val, 1)
  60. origin_d.callback(123)
  61. self.assertEqual(results[0], 123, "observer 1 callback result")
  62. self.assertEqual(results[1], 123, "observer 2 callback result")
  63. def test_failure(self) -> None:
  64. origin_d: Deferred = Deferred()
  65. observable = ObservableDeferred(origin_d, consumeErrors=True)
  66. observer1 = observable.observe()
  67. observer2 = observable.observe()
  68. self.assertFalse(observer1.called)
  69. self.assertFalse(observer2.called)
  70. # check the first observer is called first
  71. def check_called_first(res: int) -> int:
  72. self.assertFalse(observer2.called)
  73. return res
  74. observer1.addBoth(check_called_first)
  75. # store the results
  76. results: List[Optional[ObservableDeferred[str]]] = [None, None]
  77. def check_val(res: ObservableDeferred[str], idx: int) -> None:
  78. results[idx] = res
  79. return None
  80. observer1.addErrback(check_val, 0)
  81. observer2.addErrback(check_val, 1)
  82. try:
  83. raise Exception("gah!")
  84. except Exception as e:
  85. origin_d.errback(e)
  86. assert results[0] is not None
  87. self.assertEqual(str(results[0].value), "gah!", "observer 1 errback result")
  88. assert results[1] is not None
  89. self.assertEqual(str(results[1].value), "gah!", "observer 2 errback result")
  90. def test_cancellation(self) -> None:
  91. """Test that cancelling an observer does not affect other observers."""
  92. origin_d: "Deferred[int]" = Deferred()
  93. observable = ObservableDeferred(origin_d, consumeErrors=True)
  94. observer1 = observable.observe()
  95. observer2 = observable.observe()
  96. observer3 = observable.observe()
  97. self.assertFalse(observer1.called)
  98. self.assertFalse(observer2.called)
  99. self.assertFalse(observer3.called)
  100. # cancel the second observer
  101. observer2.cancel()
  102. self.assertFalse(observer1.called)
  103. self.failureResultOf(observer2, CancelledError)
  104. self.assertFalse(observer3.called)
  105. # other observers resolve as normal
  106. origin_d.callback(123)
  107. self.assertEqual(observer1.result, 123, "observer 1 callback result")
  108. self.assertEqual(observer3.result, 123, "observer 3 callback result")
  109. # additional observers resolve as normal
  110. observer4 = observable.observe()
  111. self.assertEqual(observer4.result, 123, "observer 4 callback result")
  112. class TimeoutDeferredTest(TestCase):
  113. def setUp(self) -> None:
  114. self.clock = Clock()
  115. def test_times_out(self) -> None:
  116. """Basic test case that checks that the original deferred is cancelled and that
  117. the timing-out deferred is errbacked
  118. """
  119. cancelled = False
  120. def canceller(_d: Deferred) -> None:
  121. nonlocal cancelled
  122. cancelled = True
  123. non_completing_d: Deferred = Deferred(canceller)
  124. timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock)
  125. self.assertNoResult(timing_out_d)
  126. self.assertFalse(cancelled, "deferred was cancelled prematurely")
  127. self.clock.pump((1.0,))
  128. self.assertTrue(cancelled, "deferred was not cancelled by timeout")
  129. self.failureResultOf(timing_out_d, defer.TimeoutError)
  130. def test_times_out_when_canceller_throws(self) -> None:
  131. """Test that we have successfully worked around
  132. https://twistedmatrix.com/trac/ticket/9534"""
  133. def canceller(_d: Deferred) -> None:
  134. raise Exception("can't cancel this deferred")
  135. non_completing_d: Deferred = Deferred(canceller)
  136. timing_out_d = timeout_deferred(non_completing_d, 1.0, self.clock)
  137. self.assertNoResult(timing_out_d)
  138. self.clock.pump((1.0,))
  139. self.failureResultOf(timing_out_d, defer.TimeoutError)
  140. def test_logcontext_is_preserved_on_cancellation(self) -> None:
  141. blocking_was_cancelled = False
  142. @defer.inlineCallbacks
  143. def blocking() -> Generator["Deferred[object]", object, None]:
  144. nonlocal blocking_was_cancelled
  145. non_completing_d: Deferred = Deferred()
  146. with PreserveLoggingContext():
  147. try:
  148. yield non_completing_d
  149. except CancelledError:
  150. blocking_was_cancelled = True
  151. raise
  152. with LoggingContext("one") as context_one:
  153. # the errbacks should be run in the test logcontext
  154. def errback(res: Failure, deferred_name: str) -> Failure:
  155. self.assertIs(
  156. current_context(),
  157. context_one,
  158. "errback %s run in unexpected logcontext %s"
  159. % (deferred_name, current_context()),
  160. )
  161. return res
  162. original_deferred = blocking()
  163. original_deferred.addErrback(errback, "orig")
  164. timing_out_d = timeout_deferred(original_deferred, 1.0, self.clock)
  165. self.assertNoResult(timing_out_d)
  166. self.assertIs(current_context(), SENTINEL_CONTEXT)
  167. timing_out_d.addErrback(errback, "timingout")
  168. self.clock.pump((1.0,))
  169. self.assertTrue(
  170. blocking_was_cancelled, "non-completing deferred was not cancelled"
  171. )
  172. self.failureResultOf(timing_out_d, defer.TimeoutError)
  173. self.assertIs(current_context(), context_one)
  174. class _TestException(Exception):
  175. pass
  176. class ConcurrentlyExecuteTest(TestCase):
  177. def test_limits_runners(self) -> None:
  178. """If we have more tasks than runners, we should get the limit of runners"""
  179. started = 0
  180. waiters = []
  181. processed = []
  182. async def callback(v: int) -> None:
  183. # when we first enter, bump the start count
  184. nonlocal started
  185. started += 1
  186. # record the fact we got an item
  187. processed.append(v)
  188. # wait for the goahead before returning
  189. d2: "Deferred[int]" = Deferred()
  190. waiters.append(d2)
  191. await d2
  192. # set it going
  193. d2 = ensureDeferred(concurrently_execute(callback, [1, 2, 3, 4, 5], 3))
  194. # check we got exactly 3 processes
  195. self.assertEqual(started, 3)
  196. self.assertEqual(len(waiters), 3)
  197. # let one finish
  198. waiters.pop().callback(0)
  199. # ... which should start another
  200. self.assertEqual(started, 4)
  201. self.assertEqual(len(waiters), 3)
  202. # we still shouldn't be done
  203. self.assertNoResult(d2)
  204. # finish the job
  205. while waiters:
  206. waiters.pop().callback(0)
  207. # check everything got done
  208. self.assertEqual(started, 5)
  209. self.assertCountEqual(processed, [1, 2, 3, 4, 5])
  210. self.successResultOf(d2)
  211. def test_preserves_stacktraces(self) -> None:
  212. """Test that the stacktrace from an exception thrown in the callback is preserved"""
  213. d1: "Deferred[int]" = Deferred()
  214. async def callback(v: int) -> None:
  215. # alas, this doesn't work at all without an await here
  216. await d1
  217. raise _TestException("bah")
  218. async def caller() -> None:
  219. try:
  220. await concurrently_execute(callback, [1], 2)
  221. except _TestException as e:
  222. tb = traceback.extract_tb(e.__traceback__)
  223. # we expect to see "caller", "concurrently_execute" and "callback".
  224. self.assertEqual(tb[0].name, "caller")
  225. self.assertEqual(tb[1].name, "concurrently_execute")
  226. self.assertEqual(tb[-1].name, "callback")
  227. else:
  228. self.fail("No exception thrown")
  229. d2 = ensureDeferred(caller())
  230. d1.callback(0)
  231. self.successResultOf(d2)
  232. def test_preserves_stacktraces_on_preformed_failure(self) -> None:
  233. """Test that the stacktrace on a Failure returned by the callback is preserved"""
  234. d1: "Deferred[int]" = Deferred()
  235. f = Failure(_TestException("bah"))
  236. async def callback(v: int) -> None:
  237. # alas, this doesn't work at all without an await here
  238. await d1
  239. await defer.fail(f)
  240. async def caller() -> None:
  241. try:
  242. await concurrently_execute(callback, [1], 2)
  243. except _TestException as e:
  244. tb = traceback.extract_tb(e.__traceback__)
  245. # we expect to see "caller", "concurrently_execute", "callback",
  246. # and some magic from inside ensureDeferred that happens when .fail
  247. # is called.
  248. self.assertEqual(tb[0].name, "caller")
  249. self.assertEqual(tb[1].name, "concurrently_execute")
  250. self.assertEqual(tb[-2].name, "callback")
  251. else:
  252. self.fail("No exception thrown")
  253. d2 = ensureDeferred(caller())
  254. d1.callback(0)
  255. self.successResultOf(d2)
  256. @parameterized_class(
  257. ("wrapper",),
  258. [("stop_cancellation",), ("delay_cancellation",)],
  259. )
  260. class CancellationWrapperTests(TestCase):
  261. """Common tests for the `stop_cancellation` and `delay_cancellation` functions."""
  262. wrapper: str
  263. def wrap_deferred(self, deferred: "Deferred[str]") -> "Deferred[str]":
  264. if self.wrapper == "stop_cancellation":
  265. return stop_cancellation(deferred)
  266. elif self.wrapper == "delay_cancellation":
  267. return delay_cancellation(deferred)
  268. else:
  269. raise ValueError(f"Unsupported wrapper type: {self.wrapper}")
  270. def test_succeed(self) -> None:
  271. """Test that the new `Deferred` receives the result."""
  272. deferred: "Deferred[str]" = Deferred()
  273. wrapper_deferred = self.wrap_deferred(deferred)
  274. # Success should propagate through.
  275. deferred.callback("success")
  276. self.assertTrue(wrapper_deferred.called)
  277. self.assertEqual("success", self.successResultOf(wrapper_deferred))
  278. def test_failure(self) -> None:
  279. """Test that the new `Deferred` receives the `Failure`."""
  280. deferred: "Deferred[str]" = Deferred()
  281. wrapper_deferred = self.wrap_deferred(deferred)
  282. # Failure should propagate through.
  283. deferred.errback(ValueError("abc"))
  284. self.assertTrue(wrapper_deferred.called)
  285. self.failureResultOf(wrapper_deferred, ValueError)
  286. self.assertIsNone(deferred.result, "`Failure` was not consumed")
  287. class StopCancellationTests(TestCase):
  288. """Tests for the `stop_cancellation` function."""
  289. def test_cancellation(self) -> None:
  290. """Test that cancellation of the new `Deferred` leaves the original running."""
  291. deferred: "Deferred[str]" = Deferred()
  292. wrapper_deferred = stop_cancellation(deferred)
  293. # Cancel the new `Deferred`.
  294. wrapper_deferred.cancel()
  295. self.assertTrue(wrapper_deferred.called)
  296. self.failureResultOf(wrapper_deferred, CancelledError)
  297. self.assertFalse(
  298. deferred.called, "Original `Deferred` was unexpectedly cancelled"
  299. )
  300. # Now make the original `Deferred` fail.
  301. # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed
  302. # in logs.
  303. deferred.errback(ValueError("abc"))
  304. self.assertIsNone(deferred.result, "`Failure` was not consumed")
  305. class DelayCancellationTests(TestCase):
  306. """Tests for the `delay_cancellation` function."""
  307. def test_deferred_cancellation(self) -> None:
  308. """Test that cancellation of the new `Deferred` waits for the original."""
  309. deferred: "Deferred[str]" = Deferred()
  310. wrapper_deferred = delay_cancellation(deferred)
  311. # Cancel the new `Deferred`.
  312. wrapper_deferred.cancel()
  313. self.assertNoResult(wrapper_deferred)
  314. self.assertFalse(
  315. deferred.called, "Original `Deferred` was unexpectedly cancelled"
  316. )
  317. # Now make the original `Deferred` fail.
  318. # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed
  319. # in logs.
  320. deferred.errback(ValueError("abc"))
  321. self.assertIsNone(deferred.result, "`Failure` was not consumed")
  322. # Now that the original `Deferred` has failed, we should get a `CancelledError`.
  323. self.failureResultOf(wrapper_deferred, CancelledError)
  324. def test_coroutine_cancellation(self) -> None:
  325. """Test that cancellation of the new `Deferred` waits for the original."""
  326. blocking_deferred: "Deferred[None]" = Deferred()
  327. completion_deferred: "Deferred[None]" = Deferred()
  328. async def task() -> NoReturn:
  329. await blocking_deferred
  330. completion_deferred.callback(None)
  331. # Raise an exception. Twisted should consume it, otherwise unwanted
  332. # tracebacks will be printed in logs.
  333. raise ValueError("abc")
  334. wrapper_deferred = delay_cancellation(task())
  335. # Cancel the new `Deferred`.
  336. wrapper_deferred.cancel()
  337. self.assertNoResult(wrapper_deferred)
  338. self.assertFalse(
  339. blocking_deferred.called, "Cancellation was propagated too deep"
  340. )
  341. self.assertFalse(completion_deferred.called)
  342. # Unblock the task.
  343. blocking_deferred.callback(None)
  344. self.assertTrue(completion_deferred.called)
  345. # Now that the original coroutine has failed, we should get a `CancelledError`.
  346. self.failureResultOf(wrapper_deferred, CancelledError)
  347. def test_suppresses_second_cancellation(self) -> None:
  348. """Test that a second cancellation is suppressed.
  349. Identical to `test_cancellation` except the new `Deferred` is cancelled twice.
  350. """
  351. deferred: "Deferred[str]" = Deferred()
  352. wrapper_deferred = delay_cancellation(deferred)
  353. # Cancel the new `Deferred`, twice.
  354. wrapper_deferred.cancel()
  355. wrapper_deferred.cancel()
  356. self.assertNoResult(wrapper_deferred)
  357. self.assertFalse(
  358. deferred.called, "Original `Deferred` was unexpectedly cancelled"
  359. )
  360. # Now make the original `Deferred` fail.
  361. # The `Failure` must be consumed, otherwise unwanted tracebacks will be printed
  362. # in logs.
  363. deferred.errback(ValueError("abc"))
  364. self.assertIsNone(deferred.result, "`Failure` was not consumed")
  365. # Now that the original `Deferred` has failed, we should get a `CancelledError`.
  366. self.failureResultOf(wrapper_deferred, CancelledError)
  367. def test_propagates_cancelled_error(self) -> None:
  368. """Test that a `CancelledError` from the original `Deferred` gets propagated."""
  369. deferred: "Deferred[str]" = Deferred()
  370. wrapper_deferred = delay_cancellation(deferred)
  371. # Fail the original `Deferred` with a `CancelledError`.
  372. cancelled_error = CancelledError()
  373. deferred.errback(cancelled_error)
  374. # The new `Deferred` should fail with exactly the same `CancelledError`.
  375. self.assertTrue(wrapper_deferred.called)
  376. self.assertIs(cancelled_error, self.failureResultOf(wrapper_deferred).value)
  377. def test_preserves_logcontext(self) -> None:
  378. """Test that logging contexts are preserved."""
  379. blocking_d: "Deferred[None]" = Deferred()
  380. async def inner() -> None:
  381. await make_deferred_yieldable(blocking_d)
  382. async def outer() -> None:
  383. with LoggingContext("c") as c:
  384. try:
  385. await delay_cancellation(inner())
  386. self.fail("`CancelledError` was not raised")
  387. except CancelledError:
  388. self.assertEqual(c, current_context())
  389. # Succeed with no error, unless the logging context is wrong.
  390. # Run and block inside `inner()`.
  391. d = defer.ensureDeferred(outer())
  392. self.assertEqual(SENTINEL_CONTEXT, current_context())
  393. d.cancel()
  394. # Now unblock. `outer()` will consume the `CancelledError` and check the
  395. # logging context.
  396. blocking_d.callback(None)
  397. self.successResultOf(d)
  398. class AwakenableSleeperTests(TestCase):
  399. "Tests AwakenableSleeper"
  400. def test_sleep(self) -> None:
  401. reactor, _ = get_clock()
  402. sleeper = AwakenableSleeper(reactor)
  403. d = defer.ensureDeferred(sleeper.sleep("name", 1000))
  404. reactor.pump([0.0])
  405. self.assertFalse(d.called)
  406. reactor.advance(0.5)
  407. self.assertFalse(d.called)
  408. reactor.advance(0.6)
  409. self.assertTrue(d.called)
  410. def test_explicit_wake(self) -> None:
  411. reactor, _ = get_clock()
  412. sleeper = AwakenableSleeper(reactor)
  413. d = defer.ensureDeferred(sleeper.sleep("name", 1000))
  414. reactor.pump([0.0])
  415. self.assertFalse(d.called)
  416. reactor.advance(0.5)
  417. self.assertFalse(d.called)
  418. sleeper.wake("name")
  419. self.assertTrue(d.called)
  420. reactor.advance(0.6)
  421. def test_multiple_sleepers_timeout(self) -> None:
  422. reactor, _ = get_clock()
  423. sleeper = AwakenableSleeper(reactor)
  424. d1 = defer.ensureDeferred(sleeper.sleep("name", 1000))
  425. reactor.advance(0.6)
  426. self.assertFalse(d1.called)
  427. # Add another sleeper
  428. d2 = defer.ensureDeferred(sleeper.sleep("name", 1000))
  429. # Only the first sleep should time out now.
  430. reactor.advance(0.6)
  431. self.assertTrue(d1.called)
  432. self.assertFalse(d2.called)
  433. reactor.advance(0.6)
  434. self.assertTrue(d2.called)
  435. def test_multiple_sleepers_wake(self) -> None:
  436. reactor, _ = get_clock()
  437. sleeper = AwakenableSleeper(reactor)
  438. d1 = defer.ensureDeferred(sleeper.sleep("name", 1000))
  439. reactor.advance(0.5)
  440. self.assertFalse(d1.called)
  441. # Add another sleeper
  442. d2 = defer.ensureDeferred(sleeper.sleep("name", 1000))
  443. # Neither should fire yet
  444. reactor.advance(0.3)
  445. self.assertFalse(d1.called)
  446. self.assertFalse(d2.called)
  447. # Explicitly waking both up works
  448. sleeper.wake("name")
  449. self.assertTrue(d1.called)
  450. self.assertTrue(d2.called)