test_room_batch.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. import logging
  2. from typing import List, Tuple
  3. from unittest.mock import Mock, patch
  4. from twisted.test.proto_helpers import MemoryReactor
  5. from synapse.api.constants import EventContentFields, EventTypes
  6. from synapse.appservice import ApplicationService
  7. from synapse.rest import admin
  8. from synapse.rest.client import login, register, room, room_batch, sync
  9. from synapse.server import HomeServer
  10. from synapse.types import JsonDict, RoomStreamToken
  11. from synapse.util import Clock
  12. from tests import unittest
  13. logger = logging.getLogger(__name__)
  14. def _create_join_state_events_for_batch_send_request(
  15. virtual_user_ids: List[str],
  16. insert_time: int,
  17. ) -> List[JsonDict]:
  18. return [
  19. {
  20. "type": EventTypes.Member,
  21. "sender": virtual_user_id,
  22. "origin_server_ts": insert_time,
  23. "content": {
  24. "membership": "join",
  25. "displayname": "display-name-for-%s" % (virtual_user_id,),
  26. },
  27. "state_key": virtual_user_id,
  28. }
  29. for virtual_user_id in virtual_user_ids
  30. ]
  31. def _create_message_events_for_batch_send_request(
  32. virtual_user_id: str, insert_time: int, count: int
  33. ) -> List[JsonDict]:
  34. return [
  35. {
  36. "type": EventTypes.Message,
  37. "sender": virtual_user_id,
  38. "origin_server_ts": insert_time,
  39. "content": {
  40. "msgtype": "m.text",
  41. "body": "Historical %d" % (i),
  42. EventContentFields.MSC2716_HISTORICAL: True,
  43. },
  44. }
  45. for i in range(count)
  46. ]
  47. class RoomBatchTestCase(unittest.HomeserverTestCase):
  48. """Test importing batches of historical messages."""
  49. servlets = [
  50. admin.register_servlets_for_client_rest_resource,
  51. room_batch.register_servlets,
  52. room.register_servlets,
  53. register.register_servlets,
  54. login.register_servlets,
  55. sync.register_servlets,
  56. ]
  57. def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
  58. config = self.default_config()
  59. self.appservice = ApplicationService(
  60. token="i_am_an_app_service",
  61. hostname="test",
  62. id="1234",
  63. namespaces={"users": [{"regex": r"@as_user.*", "exclusive": True}]},
  64. # Note: this user does not have to match the regex above
  65. sender="@as_main:test",
  66. )
  67. mock_load_appservices = Mock(return_value=[self.appservice])
  68. with patch(
  69. "synapse.storage.databases.main.appservice.load_appservices",
  70. mock_load_appservices,
  71. ):
  72. hs = self.setup_test_homeserver(config=config)
  73. return hs
  74. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  75. self.clock = clock
  76. self.storage = hs.get_storage()
  77. self.virtual_user_id, _ = self.register_appservice_user(
  78. "as_user_potato", self.appservice.token
  79. )
  80. def _create_test_room(self) -> Tuple[str, str, str, str]:
  81. room_id = self.helper.create_room_as(
  82. self.appservice.sender, tok=self.appservice.token
  83. )
  84. res_a = self.helper.send_event(
  85. room_id=room_id,
  86. type=EventTypes.Message,
  87. content={
  88. "msgtype": "m.text",
  89. "body": "A",
  90. },
  91. tok=self.appservice.token,
  92. )
  93. event_id_a = res_a["event_id"]
  94. res_b = self.helper.send_event(
  95. room_id=room_id,
  96. type=EventTypes.Message,
  97. content={
  98. "msgtype": "m.text",
  99. "body": "B",
  100. },
  101. tok=self.appservice.token,
  102. )
  103. event_id_b = res_b["event_id"]
  104. res_c = self.helper.send_event(
  105. room_id=room_id,
  106. type=EventTypes.Message,
  107. content={
  108. "msgtype": "m.text",
  109. "body": "C",
  110. },
  111. tok=self.appservice.token,
  112. )
  113. event_id_c = res_c["event_id"]
  114. return room_id, event_id_a, event_id_b, event_id_c
  115. @unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
  116. def test_same_state_groups_for_whole_historical_batch(self) -> None:
  117. """Make sure that when using the `/batch_send` endpoint to import a
  118. bunch of historical messages, it re-uses the same `state_group` across
  119. the whole batch. This is an easy optimization to make sure we're getting
  120. right because the state for the whole batch is contained in
  121. `state_events_at_start` and can be shared across everything.
  122. """
  123. time_before_room = int(self.clock.time_msec())
  124. room_id, event_id_a, _, _ = self._create_test_room()
  125. channel = self.make_request(
  126. "POST",
  127. "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
  128. % (room_id, event_id_a),
  129. content={
  130. "events": _create_message_events_for_batch_send_request(
  131. self.virtual_user_id, time_before_room, 3
  132. ),
  133. "state_events_at_start": _create_join_state_events_for_batch_send_request(
  134. [self.virtual_user_id], time_before_room
  135. ),
  136. },
  137. access_token=self.appservice.token,
  138. )
  139. self.assertEqual(channel.code, 200, channel.result)
  140. # Get the historical event IDs that we just imported
  141. historical_event_ids = channel.json_body["event_ids"]
  142. self.assertEqual(len(historical_event_ids), 3)
  143. # Fetch the state_groups
  144. state_group_map = self.get_success(
  145. self.storage.state.get_state_groups_ids(room_id, historical_event_ids)
  146. )
  147. # We expect all of the historical events to be using the same state_group
  148. # so there should only be a single state_group here!
  149. self.assertEqual(
  150. len(state_group_map.keys()),
  151. 1,
  152. "Expected a single state_group to be returned by saw state_groups=%s"
  153. % (state_group_map.keys(),),
  154. )
  155. @unittest.override_config({"experimental_features": {"msc2716_enabled": True}})
  156. def test_sync_while_batch_importing(self) -> None:
  157. """
  158. Make sure that /sync correctly returns full room state when a user joins
  159. during ongoing batch backfilling.
  160. See: https://github.com/matrix-org/synapse/issues/12281
  161. """
  162. # Create user who will be invited & join room
  163. user_id = self.register_user("beep", "test")
  164. user_tok = self.login("beep", "test")
  165. time_before_room = int(self.clock.time_msec())
  166. # Create a room with some events
  167. room_id, _, _, _ = self._create_test_room()
  168. # Invite the user
  169. self.helper.invite(
  170. room_id, src=self.appservice.sender, tok=self.appservice.token, targ=user_id
  171. )
  172. # Create another room, send a bunch of events to advance the stream token
  173. other_room_id = self.helper.create_room_as(
  174. self.appservice.sender, tok=self.appservice.token
  175. )
  176. for _ in range(5):
  177. self.helper.send_event(
  178. room_id=other_room_id,
  179. type=EventTypes.Message,
  180. content={"msgtype": "m.text", "body": "C"},
  181. tok=self.appservice.token,
  182. )
  183. # Join the room as the normal user
  184. self.helper.join(room_id, user_id, tok=user_tok)
  185. # Create an event to hang the historical batch from - In order to see
  186. # the failure case originally reported in #12281, the historical batch
  187. # must be hung from the most recent event in the room so the base
  188. # insertion event ends up with the highest `topogological_ordering`
  189. # (`depth`) in the room but will have a negative `stream_ordering`
  190. # because it's a `historical` event. Previously, when assembling the
  191. # `state` for the `/sync` response, the bugged logic would sort by
  192. # `topological_ordering` descending and pick up the base insertion
  193. # event because it has a negative `stream_ordering` below the given
  194. # pagination token. Now we properly sort by `stream_ordering`
  195. # descending which puts `historical` events with a negative
  196. # `stream_ordering` way at the bottom and aren't selected as expected.
  197. response = self.helper.send_event(
  198. room_id=room_id,
  199. type=EventTypes.Message,
  200. content={
  201. "msgtype": "m.text",
  202. "body": "C",
  203. },
  204. tok=self.appservice.token,
  205. )
  206. event_to_hang_id = response["event_id"]
  207. channel = self.make_request(
  208. "POST",
  209. "/_matrix/client/unstable/org.matrix.msc2716/rooms/%s/batch_send?prev_event_id=%s"
  210. % (room_id, event_to_hang_id),
  211. content={
  212. "events": _create_message_events_for_batch_send_request(
  213. self.virtual_user_id, time_before_room, 3
  214. ),
  215. "state_events_at_start": _create_join_state_events_for_batch_send_request(
  216. [self.virtual_user_id], time_before_room
  217. ),
  218. },
  219. access_token=self.appservice.token,
  220. )
  221. self.assertEqual(channel.code, 200, channel.result)
  222. # Now we need to find the invite + join events stream tokens so we can sync between
  223. main_store = self.hs.get_datastores().main
  224. events, next_key = self.get_success(
  225. main_store.get_recent_events_for_room(
  226. room_id,
  227. 50,
  228. end_token=main_store.get_room_max_token(),
  229. ),
  230. )
  231. invite_event_position = None
  232. for event in events:
  233. if (
  234. event.type == "m.room.member"
  235. and event.content["membership"] == "invite"
  236. ):
  237. invite_event_position = self.get_success(
  238. main_store.get_topological_token_for_event(event.event_id)
  239. )
  240. break
  241. assert invite_event_position is not None, "No invite event found"
  242. # Remove the topological order from the token by re-creating w/stream only
  243. invite_event_position = RoomStreamToken(None, invite_event_position.stream)
  244. # Sync everything after this token
  245. since_token = self.get_success(invite_event_position.to_string(main_store))
  246. sync_response = self.make_request(
  247. "GET",
  248. f"/sync?since={since_token}",
  249. access_token=user_tok,
  250. )
  251. # Assert that, for this room, the user was considered to have joined and thus
  252. # receives the full state history
  253. state_event_types = [
  254. event["type"]
  255. for event in sync_response.json_body["rooms"]["join"][room_id]["state"][
  256. "events"
  257. ]
  258. ]
  259. assert (
  260. "m.room.create" in state_event_types
  261. ), "Missing room full state in sync response"