test_domain_blocking.py 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139
  1. # Copyright 2023 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import Dict
  15. from twisted.test.proto_helpers import MemoryReactor
  16. from twisted.web.resource import Resource
  17. from synapse.media._base import FileInfo
  18. from synapse.server import HomeServer
  19. from synapse.util import Clock
  20. from tests import unittest
  21. from tests.test_utils import SMALL_PNG
  22. from tests.unittest import override_config
  23. class MediaDomainBlockingTests(unittest.HomeserverTestCase):
  24. remote_media_id = "doesnotmatter"
  25. remote_server_name = "evil.com"
  26. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  27. self.store = hs.get_datastores().main
  28. # Inject a piece of media. We'll use this to ensure we're returning a sane
  29. # response when we're not supposed to block it, distinguishing a media block
  30. # from a regular 404.
  31. file_id = "abcdefg12345"
  32. file_info = FileInfo(server_name=self.remote_server_name, file_id=file_id)
  33. with hs.get_media_repository().media_storage.store_into_file(file_info) as (
  34. f,
  35. fname,
  36. finish,
  37. ):
  38. f.write(SMALL_PNG)
  39. self.get_success(finish())
  40. self.get_success(
  41. self.store.store_cached_remote_media(
  42. origin=self.remote_server_name,
  43. media_id=self.remote_media_id,
  44. media_type="image/png",
  45. media_length=1,
  46. time_now_ms=clock.time_msec(),
  47. upload_name="test.png",
  48. filesystem_id=file_id,
  49. )
  50. )
  51. def create_resource_dict(self) -> Dict[str, Resource]:
  52. # We need to manually set the resource tree to include media, the
  53. # default only does `/_matrix/client` APIs.
  54. return {"/_matrix/media": self.hs.get_media_repository_resource()}
  55. @override_config(
  56. {
  57. # Disable downloads from the domain we'll be trying to download from.
  58. # Should result in a 404.
  59. "prevent_media_downloads_from": ["evil.com"]
  60. }
  61. )
  62. def test_cannot_download_blocked_media(self) -> None:
  63. """
  64. Tests to ensure that remote media which is blocked cannot be downloaded.
  65. """
  66. response = self.make_request(
  67. "GET",
  68. f"/_matrix/media/v3/download/evil.com/{self.remote_media_id}",
  69. shorthand=False,
  70. )
  71. self.assertEqual(response.code, 404)
  72. @override_config(
  73. {
  74. # Disable downloads from a domain we won't be requesting downloads from.
  75. # This proves we haven't broken anything.
  76. "prevent_media_downloads_from": ["not-listed.com"]
  77. }
  78. )
  79. def test_remote_media_normally_unblocked(self) -> None:
  80. """
  81. Tests to ensure that remote media is normally able to be downloaded
  82. when no domain block is in place.
  83. """
  84. response = self.make_request(
  85. "GET",
  86. f"/_matrix/media/v3/download/evil.com/{self.remote_media_id}",
  87. shorthand=False,
  88. )
  89. self.assertEqual(response.code, 200)
  90. @override_config(
  91. {
  92. # Disable downloads from the domain we'll be trying to download from.
  93. # Should result in a 404.
  94. "prevent_media_downloads_from": ["evil.com"],
  95. "dynamic_thumbnails": True,
  96. }
  97. )
  98. def test_cannot_download_blocked_media_thumbnail(self) -> None:
  99. """
  100. Same test as test_cannot_download_blocked_media but for thumbnails.
  101. """
  102. response = self.make_request(
  103. "GET",
  104. f"/_matrix/media/v3/thumbnail/evil.com/{self.remote_media_id}?width=100&height=100",
  105. shorthand=False,
  106. content={"width": 100, "height": 100},
  107. )
  108. self.assertEqual(response.code, 404)
  109. @override_config(
  110. {
  111. # Disable downloads from a domain we won't be requesting downloads from.
  112. # This proves we haven't broken anything.
  113. "prevent_media_downloads_from": ["not-listed.com"],
  114. "dynamic_thumbnails": True,
  115. }
  116. )
  117. def test_remote_media_thumbnail_normally_unblocked(self) -> None:
  118. """
  119. Same test as test_remote_media_normally_unblocked but for thumbnails.
  120. """
  121. response = self.make_request(
  122. "GET",
  123. f"/_matrix/media/v3/thumbnail/evil.com/{self.remote_media_id}?width=100&height=100",
  124. shorthand=False,
  125. )
  126. self.assertEqual(response.code, 200)