__init__.py 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129
  1. # Copyright 2019-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """
  15. Utilities for running the unit tests
  16. """
  17. import sys
  18. import warnings
  19. from asyncio import Future
  20. from binascii import unhexlify
  21. from typing import Awaitable, Callable, TypeVar
  22. from unittest.mock import Mock
  23. import attr
  24. from twisted.python.failure import Failure
  25. from twisted.web.client import ResponseDone
  26. TV = TypeVar("TV")
  27. def get_awaitable_result(awaitable: Awaitable[TV]) -> TV:
  28. """Get the result from an Awaitable which should have completed
  29. Asserts that the given awaitable has a result ready, and returns its value
  30. """
  31. i = awaitable.__await__()
  32. try:
  33. next(i)
  34. except StopIteration as e:
  35. # awaitable returned a result
  36. return e.value
  37. # if next didn't raise, the awaitable hasn't completed.
  38. raise Exception("awaitable has not yet completed")
  39. def make_awaitable(result: TV) -> Awaitable[TV]:
  40. """
  41. Makes an awaitable, suitable for mocking an `async` function.
  42. This uses Futures as they can be awaited multiple times so can be returned
  43. to multiple callers.
  44. """
  45. future: Future[TV] = Future()
  46. future.set_result(result)
  47. return future
  48. def setup_awaitable_errors() -> Callable[[], None]:
  49. """
  50. Convert warnings from a non-awaited coroutines into errors.
  51. """
  52. warnings.simplefilter("error", RuntimeWarning)
  53. # unraisablehook was added in Python 3.8.
  54. if not hasattr(sys, "unraisablehook"):
  55. return lambda: None
  56. # State shared between unraisablehook and check_for_unraisable_exceptions.
  57. unraisable_exceptions = []
  58. orig_unraisablehook = sys.unraisablehook
  59. def unraisablehook(unraisable):
  60. unraisable_exceptions.append(unraisable.exc_value)
  61. def cleanup():
  62. """
  63. A method to be used as a clean-up that fails a test-case if there are any new unraisable exceptions.
  64. """
  65. sys.unraisablehook = orig_unraisablehook
  66. if unraisable_exceptions:
  67. raise unraisable_exceptions.pop()
  68. sys.unraisablehook = unraisablehook
  69. return cleanup
  70. def simple_async_mock(return_value=None, raises=None) -> Mock:
  71. # AsyncMock is not available in python3.5, this mimics part of its behaviour
  72. async def cb(*args, **kwargs):
  73. if raises:
  74. raise raises
  75. return return_value
  76. return Mock(side_effect=cb)
  77. @attr.s
  78. class FakeResponse:
  79. """A fake twisted.web.IResponse object
  80. there is a similar class at treq.test.test_response, but it lacks a `phrase`
  81. attribute, and didn't support deliverBody until recently.
  82. """
  83. # HTTP response code
  84. code = attr.ib(type=int)
  85. # HTTP response phrase (eg b'OK' for a 200)
  86. phrase = attr.ib(type=bytes)
  87. # body of the response
  88. body = attr.ib(type=bytes)
  89. def deliverBody(self, protocol):
  90. protocol.dataReceived(self.body)
  91. protocol.connectionLost(Failure(ResponseDone()))
  92. # A small image used in some tests.
  93. #
  94. # Resolution: 1×1, MIME type: image/png, Extension: png, Size: 67 B
  95. SMALL_PNG = unhexlify(
  96. b"89504e470d0a1a0a0000000d4948445200000001000000010806"
  97. b"0000001f15c4890000000a49444154789c63000100000500010d"
  98. b"0a2db40000000049454e44ae426082"
  99. )