1
0

test_federation_sender_shard.py 8.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from unittest.mock import Mock
  16. from synapse.api.constants import EventTypes, Membership
  17. from synapse.events.builder import EventBuilderFactory
  18. from synapse.handlers.typing import TypingWriterHandler
  19. from synapse.rest.admin import register_servlets_for_client_rest_resource
  20. from synapse.rest.client import login, room
  21. from synapse.types import UserID, create_requester
  22. from tests.replication._base import BaseMultiWorkerStreamTestCase
  23. from tests.test_utils import make_awaitable
  24. logger = logging.getLogger(__name__)
  25. class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
  26. """
  27. Various tests for federation sending on workers.
  28. Federation sending is disabled by default, it will be enabled in each test by
  29. updating 'federation_sender_instances'.
  30. """
  31. servlets = [
  32. login.register_servlets,
  33. register_servlets_for_client_rest_resource,
  34. room.register_servlets,
  35. ]
  36. def test_send_event_single_sender(self) -> None:
  37. """Test that using a single federation sender worker correctly sends a
  38. new event.
  39. """
  40. mock_client = Mock(spec=["put_json"])
  41. mock_client.put_json.return_value = make_awaitable({})
  42. self.make_worker_hs(
  43. "synapse.app.generic_worker",
  44. {
  45. "worker_name": "federation_sender1",
  46. "federation_sender_instances": ["federation_sender1"],
  47. },
  48. federation_http_client=mock_client,
  49. )
  50. user = self.register_user("user", "pass")
  51. token = self.login("user", "pass")
  52. room = self.create_room_with_remote_server(user, token)
  53. mock_client.put_json.reset_mock()
  54. self.create_and_send_event(room, UserID.from_string(user))
  55. self.replicate()
  56. # Assert that the event was sent out over federation.
  57. mock_client.put_json.assert_called()
  58. self.assertEqual(mock_client.put_json.call_args[0][0], "other_server")
  59. self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus"))
  60. def test_send_event_sharded(self) -> None:
  61. """Test that using two federation sender workers correctly sends
  62. new events.
  63. """
  64. mock_client1 = Mock(spec=["put_json"])
  65. mock_client1.put_json.return_value = make_awaitable({})
  66. self.make_worker_hs(
  67. "synapse.app.generic_worker",
  68. {
  69. "worker_name": "federation_sender1",
  70. "federation_sender_instances": [
  71. "federation_sender1",
  72. "federation_sender2",
  73. ],
  74. },
  75. federation_http_client=mock_client1,
  76. )
  77. mock_client2 = Mock(spec=["put_json"])
  78. mock_client2.put_json.return_value = make_awaitable({})
  79. self.make_worker_hs(
  80. "synapse.app.generic_worker",
  81. {
  82. "worker_name": "federation_sender2",
  83. "federation_sender_instances": [
  84. "federation_sender1",
  85. "federation_sender2",
  86. ],
  87. },
  88. federation_http_client=mock_client2,
  89. )
  90. user = self.register_user("user2", "pass")
  91. token = self.login("user2", "pass")
  92. sent_on_1 = False
  93. sent_on_2 = False
  94. for i in range(20):
  95. server_name = "other_server_%d" % (i,)
  96. room = self.create_room_with_remote_server(user, token, server_name)
  97. mock_client1.reset_mock()
  98. mock_client2.reset_mock()
  99. self.create_and_send_event(room, UserID.from_string(user))
  100. self.replicate()
  101. if mock_client1.put_json.called:
  102. sent_on_1 = True
  103. mock_client2.put_json.assert_not_called()
  104. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  105. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("pdus"))
  106. elif mock_client2.put_json.called:
  107. sent_on_2 = True
  108. mock_client1.put_json.assert_not_called()
  109. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  110. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("pdus"))
  111. else:
  112. raise AssertionError(
  113. "Expected send transaction from one or the other sender"
  114. )
  115. if sent_on_1 and sent_on_2:
  116. break
  117. self.assertTrue(sent_on_1)
  118. self.assertTrue(sent_on_2)
  119. def test_send_typing_sharded(self) -> None:
  120. """Test that using two federation sender workers correctly sends
  121. new typing EDUs.
  122. """
  123. mock_client1 = Mock(spec=["put_json"])
  124. mock_client1.put_json.return_value = make_awaitable({})
  125. self.make_worker_hs(
  126. "synapse.app.generic_worker",
  127. {
  128. "worker_name": "federation_sender1",
  129. "federation_sender_instances": [
  130. "federation_sender1",
  131. "federation_sender2",
  132. ],
  133. },
  134. federation_http_client=mock_client1,
  135. )
  136. mock_client2 = Mock(spec=["put_json"])
  137. mock_client2.put_json.return_value = make_awaitable({})
  138. self.make_worker_hs(
  139. "synapse.app.generic_worker",
  140. {
  141. "worker_name": "federation_sender2",
  142. "federation_sender_instances": [
  143. "federation_sender1",
  144. "federation_sender2",
  145. ],
  146. },
  147. federation_http_client=mock_client2,
  148. )
  149. user = self.register_user("user3", "pass")
  150. token = self.login("user3", "pass")
  151. typing_handler = self.hs.get_typing_handler()
  152. assert isinstance(typing_handler, TypingWriterHandler)
  153. sent_on_1 = False
  154. sent_on_2 = False
  155. for i in range(20):
  156. server_name = "other_server_%d" % (i,)
  157. room = self.create_room_with_remote_server(user, token, server_name)
  158. mock_client1.reset_mock()
  159. mock_client2.reset_mock()
  160. self.get_success(
  161. typing_handler.started_typing(
  162. target_user=UserID.from_string(user),
  163. requester=create_requester(user),
  164. room_id=room,
  165. timeout=20000,
  166. )
  167. )
  168. self.replicate()
  169. if mock_client1.put_json.called:
  170. sent_on_1 = True
  171. mock_client2.put_json.assert_not_called()
  172. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  173. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("edus"))
  174. elif mock_client2.put_json.called:
  175. sent_on_2 = True
  176. mock_client1.put_json.assert_not_called()
  177. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  178. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("edus"))
  179. else:
  180. raise AssertionError(
  181. "Expected send transaction from one or the other sender"
  182. )
  183. if sent_on_1 and sent_on_2:
  184. break
  185. self.assertTrue(sent_on_1)
  186. self.assertTrue(sent_on_2)
  187. def create_room_with_remote_server(
  188. self, user: str, token: str, remote_server: str = "other_server"
  189. ) -> str:
  190. room = self.helper.create_room_as(user, tok=token)
  191. store = self.hs.get_datastores().main
  192. federation = self.hs.get_federation_event_handler()
  193. prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
  194. room_version = self.get_success(store.get_room_version(room))
  195. factory = EventBuilderFactory(self.hs)
  196. factory.hostname = remote_server
  197. user_id = UserID("user", remote_server).to_string()
  198. event_dict = {
  199. "type": EventTypes.Member,
  200. "state_key": user_id,
  201. "content": {"membership": Membership.JOIN},
  202. "sender": user_id,
  203. "room_id": room,
  204. }
  205. builder = factory.for_room_version(room_version, event_dict)
  206. join_event = self.get_success(
  207. builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
  208. )
  209. self.get_success(federation.on_send_membership_event(remote_server, join_event))
  210. self.replicate()
  211. return room