test_retention.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350
  1. # Copyright 2019 New Vector Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from unittest.mock import Mock
  15. from twisted.test.proto_helpers import MemoryReactor
  16. from synapse.api.constants import EventTypes
  17. from synapse.rest import admin
  18. from synapse.rest.client import login, room
  19. from synapse.server import HomeServer
  20. from synapse.types import JsonDict
  21. from synapse.util import Clock
  22. from synapse.visibility import filter_events_for_client
  23. from tests import unittest
  24. from tests.unittest import override_config
  25. one_hour_ms = 3600000
  26. one_day_ms = one_hour_ms * 24
  27. class RetentionTestCase(unittest.HomeserverTestCase):
  28. servlets = [
  29. admin.register_servlets,
  30. login.register_servlets,
  31. room.register_servlets,
  32. ]
  33. def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
  34. config = self.default_config()
  35. # merge this default retention config with anything that was specified in
  36. # @override_config
  37. retention_config = {
  38. "enabled": True,
  39. "default_policy": {
  40. "min_lifetime": one_day_ms,
  41. "max_lifetime": one_day_ms * 3,
  42. },
  43. "allowed_lifetime_min": one_day_ms,
  44. "allowed_lifetime_max": one_day_ms * 3,
  45. }
  46. retention_config.update(config.get("retention", {}))
  47. config["retention"] = retention_config
  48. self.hs = self.setup_test_homeserver(config=config)
  49. return self.hs
  50. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  51. self.user_id = self.register_user("user", "password")
  52. self.token = self.login("user", "password")
  53. self.store = self.hs.get_datastores().main
  54. self.serializer = self.hs.get_event_client_serializer()
  55. self.clock = self.hs.get_clock()
  56. def test_retention_event_purged_with_state_event(self) -> None:
  57. """Tests that expired events are correctly purged when the room's retention policy
  58. is defined by a state event.
  59. """
  60. room_id = self.helper.create_room_as(self.user_id, tok=self.token)
  61. # Set the room's retention period to 2 days.
  62. lifetime = one_day_ms * 2
  63. self.helper.send_state(
  64. room_id=room_id,
  65. event_type=EventTypes.Retention,
  66. body={"max_lifetime": lifetime},
  67. tok=self.token,
  68. )
  69. self._test_retention_event_purged(room_id, one_day_ms * 1.5)
  70. def test_retention_event_purged_with_state_event_outside_allowed(self) -> None:
  71. """Tests that the server configuration can override the policy for a room when
  72. running the purge jobs.
  73. """
  74. room_id = self.helper.create_room_as(self.user_id, tok=self.token)
  75. # Set a max_lifetime higher than the maximum allowed value.
  76. self.helper.send_state(
  77. room_id=room_id,
  78. event_type=EventTypes.Retention,
  79. body={"max_lifetime": one_day_ms * 4},
  80. tok=self.token,
  81. )
  82. # Check that the event is purged after waiting for the maximum allowed duration
  83. # instead of the one specified in the room's policy.
  84. self._test_retention_event_purged(room_id, one_day_ms * 1.5)
  85. # Set a max_lifetime lower than the minimum allowed value.
  86. self.helper.send_state(
  87. room_id=room_id,
  88. event_type=EventTypes.Retention,
  89. body={"max_lifetime": one_hour_ms},
  90. tok=self.token,
  91. )
  92. # Check that the event is purged after waiting for the minimum allowed duration
  93. # instead of the one specified in the room's policy.
  94. self._test_retention_event_purged(room_id, one_day_ms * 0.5)
  95. def test_retention_event_purged_without_state_event(self) -> None:
  96. """Tests that expired events are correctly purged when the room's retention policy
  97. is defined by the server's configuration's default retention policy.
  98. """
  99. room_id = self.helper.create_room_as(self.user_id, tok=self.token)
  100. self._test_retention_event_purged(room_id, one_day_ms * 2)
  101. @override_config({"retention": {"purge_jobs": [{"interval": "5d"}]}})
  102. def test_visibility(self) -> None:
  103. """Tests that synapse.visibility.filter_events_for_client correctly filters out
  104. outdated events, even if the purge job hasn't got to them yet.
  105. We do this by setting a very long time between purge jobs.
  106. """
  107. store = self.hs.get_datastores().main
  108. storage = self.hs.get_storage()
  109. room_id = self.helper.create_room_as(self.user_id, tok=self.token)
  110. # Send a first event, which should be filtered out at the end of the test.
  111. resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
  112. first_event_id = resp.get("event_id")
  113. # Advance the time by 2 days. We're using the default retention policy, therefore
  114. # after this the first event will still be valid.
  115. self.reactor.advance(one_day_ms * 2 / 1000)
  116. # Send another event, which shouldn't get filtered out.
  117. resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
  118. valid_event_id = resp.get("event_id")
  119. # Advance the time by another 2 days. After this, the first event should be
  120. # outdated but not the second one.
  121. self.reactor.advance(one_day_ms * 2 / 1000)
  122. # Fetch the events, and run filter_events_for_client on them
  123. events = self.get_success(
  124. store.get_events_as_list([first_event_id, valid_event_id])
  125. )
  126. self.assertEqual(2, len(events), "events retrieved from database")
  127. filtered_events = self.get_success(
  128. filter_events_for_client(storage, self.user_id, events)
  129. )
  130. # We should only get one event back.
  131. self.assertEqual(len(filtered_events), 1, filtered_events)
  132. # That event should be the second, not outdated event.
  133. self.assertEqual(filtered_events[0].event_id, valid_event_id, filtered_events)
  134. def _test_retention_event_purged(self, room_id: str, increment: float) -> None:
  135. """Run the following test scenario to test the message retention policy support:
  136. 1. Send event 1
  137. 2. Increment time by `increment`
  138. 3. Send event 2
  139. 4. Increment time by `increment`
  140. 5. Check that event 1 has been purged
  141. 6. Check that event 2 has not been purged
  142. 7. Check that state events that were sent before event 1 aren't purged.
  143. The main reason for sending a second event is because currently Synapse won't
  144. purge the latest message in a room because it would otherwise result in a lack of
  145. forward extremities for this room. It's also a good thing to ensure the purge jobs
  146. aren't too greedy and purge messages they shouldn't.
  147. Args:
  148. room_id: The ID of the room to test retention in.
  149. increment: The number of milliseconds to advance the clock each time. Must be
  150. defined so that events in the room aren't purged if they are `increment`
  151. old but are purged if they are `increment * 2` old.
  152. """
  153. # Get the create event to, later, check that we can still access it.
  154. message_handler = self.hs.get_message_handler()
  155. create_event = self.get_success(
  156. message_handler.get_room_data(
  157. self.user_id, room_id, EventTypes.Create, state_key=""
  158. )
  159. )
  160. # Send a first event to the room. This is the event we'll want to be purged at the
  161. # end of the test.
  162. resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
  163. expired_event_id = resp.get("event_id")
  164. assert expired_event_id is not None
  165. # Check that we can retrieve the event.
  166. expired_event = self.get_event(expired_event_id)
  167. self.assertEqual(
  168. expired_event.get("content", {}).get("body"), "1", expired_event
  169. )
  170. # Advance the time.
  171. self.reactor.advance(increment / 1000)
  172. # Send another event. We need this because the purge job won't purge the most
  173. # recent event in the room.
  174. resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
  175. valid_event_id = resp.get("event_id")
  176. assert valid_event_id is not None
  177. # Advance the time again. Now our first event should have expired but our second
  178. # one should still be kept.
  179. self.reactor.advance(increment / 1000)
  180. # Check that the first event has been purged from the database, i.e. that we
  181. # can't retrieve it anymore, because it has expired.
  182. self.get_event(expired_event_id, expect_none=True)
  183. # Check that the event that hasn't expired can still be retrieved.
  184. valid_event = self.get_event(valid_event_id)
  185. self.assertEqual(valid_event.get("content", {}).get("body"), "2", valid_event)
  186. # Check that we can still access state events that were sent before the event that
  187. # has been purged.
  188. self.get_event(room_id, create_event.event_id)
  189. def get_event(self, event_id: str, expect_none: bool = False) -> JsonDict:
  190. event = self.get_success(self.store.get_event(event_id, allow_none=True))
  191. if expect_none:
  192. self.assertIsNone(event)
  193. return {}
  194. self.assertIsNotNone(event)
  195. time_now = self.clock.time_msec()
  196. serialized = self.serializer.serialize_event(event, time_now)
  197. return serialized
  198. class RetentionNoDefaultPolicyTestCase(unittest.HomeserverTestCase):
  199. servlets = [
  200. admin.register_servlets,
  201. login.register_servlets,
  202. room.register_servlets,
  203. ]
  204. def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
  205. config = self.default_config()
  206. config["retention"] = {
  207. "enabled": True,
  208. }
  209. mock_federation_client = Mock(spec=["backfill"])
  210. self.hs = self.setup_test_homeserver(
  211. config=config,
  212. federation_client=mock_federation_client,
  213. )
  214. return self.hs
  215. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  216. self.user_id = self.register_user("user", "password")
  217. self.token = self.login("user", "password")
  218. def test_no_default_policy(self) -> None:
  219. """Tests that an event doesn't get expired if there is neither a default retention
  220. policy nor a policy specific to the room.
  221. """
  222. room_id = self.helper.create_room_as(self.user_id, tok=self.token)
  223. self._test_retention(room_id)
  224. def test_state_policy(self) -> None:
  225. """Tests that an event gets correctly expired if there is no default retention
  226. policy but there's a policy specific to the room.
  227. """
  228. room_id = self.helper.create_room_as(self.user_id, tok=self.token)
  229. # Set the maximum lifetime to 35 days so that the first event gets expired but not
  230. # the second one.
  231. self.helper.send_state(
  232. room_id=room_id,
  233. event_type=EventTypes.Retention,
  234. body={"max_lifetime": one_day_ms * 35},
  235. tok=self.token,
  236. )
  237. self._test_retention(room_id, expected_code_for_first_event=404)
  238. def _test_retention(
  239. self, room_id: str, expected_code_for_first_event: int = 200
  240. ) -> None:
  241. # Send a first event to the room. This is the event we'll want to be purged at the
  242. # end of the test.
  243. resp = self.helper.send(room_id=room_id, body="1", tok=self.token)
  244. first_event_id = resp.get("event_id")
  245. assert first_event_id is not None
  246. # Check that we can retrieve the event.
  247. expired_event = self.get_event(room_id, first_event_id)
  248. self.assertEqual(
  249. expired_event.get("content", {}).get("body"), "1", expired_event
  250. )
  251. # Advance the time by a month.
  252. self.reactor.advance(one_day_ms * 30 / 1000)
  253. # Send another event. We need this because the purge job won't purge the most
  254. # recent event in the room.
  255. resp = self.helper.send(room_id=room_id, body="2", tok=self.token)
  256. second_event_id = resp.get("event_id")
  257. assert second_event_id is not None
  258. # Advance the time by another month.
  259. self.reactor.advance(one_day_ms * 30 / 1000)
  260. # Check if the event has been purged from the database.
  261. first_event = self.get_event(
  262. room_id, first_event_id, expected_code=expected_code_for_first_event
  263. )
  264. if expected_code_for_first_event == 200:
  265. self.assertEqual(
  266. first_event.get("content", {}).get("body"), "1", first_event
  267. )
  268. # Check that the event that hasn't been purged can still be retrieved.
  269. second_event = self.get_event(room_id, second_event_id)
  270. self.assertEqual(second_event.get("content", {}).get("body"), "2", second_event)
  271. def get_event(
  272. self, room_id: str, event_id: str, expected_code: int = 200
  273. ) -> JsonDict:
  274. url = "/_matrix/client/r0/rooms/%s/event/%s" % (room_id, event_id)
  275. channel = self.make_request("GET", url, access_token=self.token)
  276. self.assertEqual(channel.code, expected_code, channel.result)
  277. return channel.json_body