test_transactions.py 7.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191
  1. # Copyright 2018-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from http import HTTPStatus
  15. from typing import Any, Generator, Tuple, cast
  16. from unittest.mock import Mock, call
  17. from twisted.internet import defer, reactor as _reactor
  18. from synapse.logging.context import SENTINEL_CONTEXT, LoggingContext, current_context
  19. from synapse.rest.client.transactions import CLEANUP_PERIOD_MS, HttpTransactionCache
  20. from synapse.types import ISynapseReactor, JsonDict
  21. from synapse.util import Clock
  22. from tests import unittest
  23. from tests.test_utils import make_awaitable
  24. from tests.utils import MockClock
  25. reactor = cast(ISynapseReactor, _reactor)
  26. class HttpTransactionCacheTestCase(unittest.TestCase):
  27. def setUp(self) -> None:
  28. self.clock = MockClock()
  29. self.hs = Mock()
  30. self.hs.get_clock = Mock(return_value=self.clock)
  31. self.hs.get_auth = Mock()
  32. self.cache = HttpTransactionCache(self.hs)
  33. self.mock_http_response = (HTTPStatus.OK, {"result": "GOOD JOB!"})
  34. # Here we make sure that we're setting all the fields that HttpTransactionCache
  35. # uses to build the transaction key.
  36. self.mock_request = Mock()
  37. self.mock_request.path = b"/foo/bar"
  38. self.mock_requester = Mock()
  39. self.mock_requester.app_service = None
  40. self.mock_requester.is_guest = False
  41. self.mock_requester.access_token_id = 1234
  42. @defer.inlineCallbacks
  43. def test_executes_given_function(
  44. self,
  45. ) -> Generator["defer.Deferred[Any]", object, None]:
  46. cb = Mock(return_value=make_awaitable(self.mock_http_response))
  47. res = yield self.cache.fetch_or_execute_request(
  48. self.mock_request, self.mock_requester, cb, "some_arg", keyword="arg"
  49. )
  50. cb.assert_called_once_with("some_arg", keyword="arg")
  51. self.assertEqual(res, self.mock_http_response)
  52. @defer.inlineCallbacks
  53. def test_deduplicates_based_on_key(
  54. self,
  55. ) -> Generator["defer.Deferred[Any]", object, None]:
  56. cb = Mock(return_value=make_awaitable(self.mock_http_response))
  57. for i in range(3): # invoke multiple times
  58. res = yield self.cache.fetch_or_execute_request(
  59. self.mock_request,
  60. self.mock_requester,
  61. cb,
  62. "some_arg",
  63. keyword="arg",
  64. changing_args=i,
  65. )
  66. self.assertEqual(res, self.mock_http_response)
  67. # expect only a single call to do the work
  68. cb.assert_called_once_with("some_arg", keyword="arg", changing_args=0)
  69. @defer.inlineCallbacks
  70. def test_logcontexts_with_async_result(
  71. self,
  72. ) -> Generator["defer.Deferred[Any]", object, None]:
  73. @defer.inlineCallbacks
  74. def cb() -> Generator["defer.Deferred[object]", object, Tuple[int, JsonDict]]:
  75. yield Clock(reactor).sleep(0)
  76. return 1, {}
  77. @defer.inlineCallbacks
  78. def test() -> Generator["defer.Deferred[Any]", object, None]:
  79. with LoggingContext("c") as c1:
  80. res = yield self.cache.fetch_or_execute_request(
  81. self.mock_request, self.mock_requester, cb
  82. )
  83. self.assertIs(current_context(), c1)
  84. self.assertEqual(res, (1, {}))
  85. # run the test twice in parallel
  86. d = defer.gatherResults([test(), test()])
  87. self.assertIs(current_context(), SENTINEL_CONTEXT)
  88. yield d
  89. self.assertIs(current_context(), SENTINEL_CONTEXT)
  90. @defer.inlineCallbacks
  91. def test_does_not_cache_exceptions(
  92. self,
  93. ) -> Generator["defer.Deferred[Any]", object, None]:
  94. """Checks that, if the callback throws an exception, it is called again
  95. for the next request.
  96. """
  97. called = [False]
  98. def cb() -> "defer.Deferred[Tuple[int, JsonDict]]":
  99. if called[0]:
  100. # return a valid result the second time
  101. return defer.succeed(self.mock_http_response)
  102. called[0] = True
  103. raise Exception("boo")
  104. with LoggingContext("test") as test_context:
  105. try:
  106. yield self.cache.fetch_or_execute_request(
  107. self.mock_request, self.mock_requester, cb
  108. )
  109. except Exception as e:
  110. self.assertEqual(e.args[0], "boo")
  111. self.assertIs(current_context(), test_context)
  112. res = yield self.cache.fetch_or_execute_request(
  113. self.mock_request, self.mock_requester, cb
  114. )
  115. self.assertEqual(res, self.mock_http_response)
  116. self.assertIs(current_context(), test_context)
  117. @defer.inlineCallbacks
  118. def test_does_not_cache_failures(
  119. self,
  120. ) -> Generator["defer.Deferred[Any]", object, None]:
  121. """Checks that, if the callback returns a failure, it is called again
  122. for the next request.
  123. """
  124. called = [False]
  125. def cb() -> "defer.Deferred[Tuple[int, JsonDict]]":
  126. if called[0]:
  127. # return a valid result the second time
  128. return defer.succeed(self.mock_http_response)
  129. called[0] = True
  130. return defer.fail(Exception("boo"))
  131. with LoggingContext("test") as test_context:
  132. try:
  133. yield self.cache.fetch_or_execute_request(
  134. self.mock_request, self.mock_requester, cb
  135. )
  136. except Exception as e:
  137. self.assertEqual(e.args[0], "boo")
  138. self.assertIs(current_context(), test_context)
  139. res = yield self.cache.fetch_or_execute_request(
  140. self.mock_request, self.mock_requester, cb
  141. )
  142. self.assertEqual(res, self.mock_http_response)
  143. self.assertIs(current_context(), test_context)
  144. @defer.inlineCallbacks
  145. def test_cleans_up(self) -> Generator["defer.Deferred[Any]", object, None]:
  146. cb = Mock(return_value=make_awaitable(self.mock_http_response))
  147. yield self.cache.fetch_or_execute_request(
  148. self.mock_request, self.mock_requester, cb, "an arg"
  149. )
  150. # should NOT have cleaned up yet
  151. self.clock.advance_time_msec(CLEANUP_PERIOD_MS / 2)
  152. yield self.cache.fetch_or_execute_request(
  153. self.mock_request, self.mock_requester, cb, "an arg"
  154. )
  155. # still using cache
  156. cb.assert_called_once_with("an arg")
  157. self.clock.advance_time_msec(CLEANUP_PERIOD_MS)
  158. yield self.cache.fetch_or_execute_request(
  159. self.mock_request, self.mock_requester, cb, "an arg"
  160. )
  161. # no longer using cache
  162. self.assertEqual(cb.call_count, 2)
  163. self.assertEqual(cb.call_args_list, [call("an arg"), call("an arg")])