test_federation_event.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227
  1. # Copyright 2022 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from unittest import mock
  15. from synapse.events import make_event_from_dict
  16. from synapse.events.snapshot import EventContext
  17. from synapse.federation.transport.client import StateRequestResponse
  18. from synapse.logging.context import LoggingContext
  19. from synapse.rest import admin
  20. from synapse.rest.client import login, room
  21. from tests import unittest
  22. from tests.test_utils import event_injection, make_awaitable
  23. class FederationEventHandlerTests(unittest.FederatingHomeserverTestCase):
  24. servlets = [
  25. admin.register_servlets,
  26. login.register_servlets,
  27. room.register_servlets,
  28. ]
  29. def make_homeserver(self, reactor, clock):
  30. # mock out the federation transport client
  31. self.mock_federation_transport_client = mock.Mock(
  32. spec=["get_room_state_ids", "get_room_state", "get_event"]
  33. )
  34. return super().setup_test_homeserver(
  35. federation_transport_client=self.mock_federation_transport_client
  36. )
  37. def test_process_pulled_event_with_missing_state(self) -> None:
  38. """Ensure that we correctly handle pulled events with lots of missing state
  39. In this test, we pretend we are processing a "pulled" event (eg, via backfill
  40. or get_missing_events). The pulled event has a prev_event we haven't previously
  41. seen, so the server requests the state at that prev_event. There is a lot
  42. of state we don't have, so we expect the server to make a /state request.
  43. We check that the pulled event is correctly persisted, and that the state is
  44. as we expect.
  45. """
  46. return self._test_process_pulled_event_with_missing_state(False)
  47. def test_process_pulled_event_with_missing_state_where_prev_is_outlier(
  48. self,
  49. ) -> None:
  50. """Ensure that we correctly handle pulled events with lots of missing state
  51. A slight modification to test_process_pulled_event_with_missing_state. Again
  52. we have a "pulled" event which refers to a prev_event with lots of state,
  53. but in this case we already have the prev_event (as an outlier, obviously -
  54. if it were a regular event, we wouldn't need to request the state).
  55. """
  56. return self._test_process_pulled_event_with_missing_state(True)
  57. def _test_process_pulled_event_with_missing_state(
  58. self, prev_exists_as_outlier: bool
  59. ) -> None:
  60. OTHER_USER = f"@user:{self.OTHER_SERVER_NAME}"
  61. main_store = self.hs.get_datastores().main
  62. state_storage = self.hs.get_storage().state
  63. # create the room
  64. user_id = self.register_user("kermit", "test")
  65. tok = self.login("kermit", "test")
  66. room_id = self.helper.create_room_as(room_creator=user_id, tok=tok)
  67. room_version = self.get_success(main_store.get_room_version(room_id))
  68. # allow the remote user to send state events
  69. self.helper.send_state(
  70. room_id,
  71. "m.room.power_levels",
  72. {"events_default": 0, "state_default": 0},
  73. tok=tok,
  74. )
  75. # add the remote user to the room
  76. member_event = self.get_success(
  77. event_injection.inject_member_event(self.hs, room_id, OTHER_USER, "join")
  78. )
  79. initial_state_map = self.get_success(main_store.get_current_state_ids(room_id))
  80. auth_event_ids = [
  81. initial_state_map[("m.room.create", "")],
  82. initial_state_map[("m.room.power_levels", "")],
  83. initial_state_map[("m.room.join_rules", "")],
  84. member_event.event_id,
  85. ]
  86. # mock up a load of state events which we are missing
  87. state_events = [
  88. make_event_from_dict(
  89. self.add_hashes_and_signatures(
  90. {
  91. "type": "test_state_type",
  92. "state_key": f"state_{i}",
  93. "room_id": room_id,
  94. "sender": OTHER_USER,
  95. "prev_events": [member_event.event_id],
  96. "auth_events": auth_event_ids,
  97. "origin_server_ts": 1,
  98. "depth": 10,
  99. "content": {"body": f"state_{i}"},
  100. }
  101. ),
  102. room_version,
  103. )
  104. for i in range(1, 10)
  105. ]
  106. # this is the state that we are going to claim is active at the prev_event.
  107. state_at_prev_event = state_events + self.get_success(
  108. main_store.get_events_as_list(initial_state_map.values())
  109. )
  110. # mock up a prev event.
  111. # Depending on the test, we either persist this upfront (as an outlier),
  112. # or let the server request it.
  113. prev_event = make_event_from_dict(
  114. self.add_hashes_and_signatures(
  115. {
  116. "type": "test_regular_type",
  117. "room_id": room_id,
  118. "sender": OTHER_USER,
  119. "prev_events": [],
  120. "auth_events": auth_event_ids,
  121. "origin_server_ts": 1,
  122. "depth": 11,
  123. "content": {"body": "missing_prev"},
  124. }
  125. ),
  126. room_version,
  127. )
  128. if prev_exists_as_outlier:
  129. prev_event.internal_metadata.outlier = True
  130. persistence = self.hs.get_storage().persistence
  131. self.get_success(
  132. persistence.persist_event(
  133. prev_event, EventContext.for_outlier(self.hs.get_storage())
  134. )
  135. )
  136. else:
  137. async def get_event(destination: str, event_id: str, timeout=None):
  138. self.assertEqual(destination, self.OTHER_SERVER_NAME)
  139. self.assertEqual(event_id, prev_event.event_id)
  140. return {"pdus": [prev_event.get_pdu_json()]}
  141. self.mock_federation_transport_client.get_event.side_effect = get_event
  142. # mock up a regular event to pass into _process_pulled_event
  143. pulled_event = make_event_from_dict(
  144. self.add_hashes_and_signatures(
  145. {
  146. "type": "test_regular_type",
  147. "room_id": room_id,
  148. "sender": OTHER_USER,
  149. "prev_events": [prev_event.event_id],
  150. "auth_events": auth_event_ids,
  151. "origin_server_ts": 1,
  152. "depth": 12,
  153. "content": {"body": "pulled"},
  154. }
  155. ),
  156. room_version,
  157. )
  158. # we expect an outbound request to /state_ids, so stub that out
  159. self.mock_federation_transport_client.get_room_state_ids.return_value = (
  160. make_awaitable(
  161. {
  162. "pdu_ids": [e.event_id for e in state_at_prev_event],
  163. "auth_chain_ids": [],
  164. }
  165. )
  166. )
  167. # we also expect an outbound request to /state
  168. self.mock_federation_transport_client.get_room_state.return_value = (
  169. make_awaitable(
  170. StateRequestResponse(auth_events=[], state=state_at_prev_event)
  171. )
  172. )
  173. # we have to bump the clock a bit, to keep the retry logic in
  174. # FederationClient.get_pdu happy
  175. self.reactor.advance(60000)
  176. # Finally, the call under test: send the pulled event into _process_pulled_event
  177. with LoggingContext("test"):
  178. self.get_success(
  179. self.hs.get_federation_event_handler()._process_pulled_event(
  180. self.OTHER_SERVER_NAME, pulled_event, backfilled=False
  181. )
  182. )
  183. # check that the event is correctly persisted
  184. persisted = self.get_success(main_store.get_event(pulled_event.event_id))
  185. self.assertIsNotNone(persisted, "pulled event was not persisted at all")
  186. self.assertFalse(
  187. persisted.internal_metadata.is_outlier(), "pulled event was an outlier"
  188. )
  189. # check that the state at that event is as expected
  190. state = self.get_success(
  191. state_storage.get_state_ids_for_event(pulled_event.event_id)
  192. )
  193. expected_state = {
  194. (e.type, e.state_key): e.event_id for e in state_at_prev_event
  195. }
  196. self.assertEqual(state, expected_state)
  197. if prev_exists_as_outlier:
  198. self.mock_federation_transport_client.get_event.assert_not_called()