test_federation_sender_shard.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from unittest.mock import Mock
  16. from synapse.api.constants import EventTypes, Membership
  17. from synapse.events.builder import EventBuilderFactory
  18. from synapse.rest.admin import register_servlets_for_client_rest_resource
  19. from synapse.rest.client.v1 import login, room
  20. from synapse.types import UserID, create_requester
  21. from tests.replication._base import BaseMultiWorkerStreamTestCase
  22. from tests.test_utils import make_awaitable
  23. logger = logging.getLogger(__name__)
  24. class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
  25. servlets = [
  26. login.register_servlets,
  27. register_servlets_for_client_rest_resource,
  28. room.register_servlets,
  29. ]
  30. def default_config(self):
  31. conf = super().default_config()
  32. conf["send_federation"] = False
  33. return conf
  34. def test_send_event_single_sender(self):
  35. """Test that using a single federation sender worker correctly sends a
  36. new event.
  37. """
  38. mock_client = Mock(spec=["put_json"])
  39. mock_client.put_json.return_value = make_awaitable({})
  40. self.make_worker_hs(
  41. "synapse.app.federation_sender",
  42. {"send_federation": False},
  43. federation_http_client=mock_client,
  44. )
  45. user = self.register_user("user", "pass")
  46. token = self.login("user", "pass")
  47. room = self.create_room_with_remote_server(user, token)
  48. mock_client.put_json.reset_mock()
  49. self.create_and_send_event(room, UserID.from_string(user))
  50. self.replicate()
  51. # Assert that the event was sent out over federation.
  52. mock_client.put_json.assert_called()
  53. self.assertEqual(mock_client.put_json.call_args[0][0], "other_server")
  54. self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus"))
  55. def test_send_event_sharded(self):
  56. """Test that using two federation sender workers correctly sends
  57. new events.
  58. """
  59. mock_client1 = Mock(spec=["put_json"])
  60. mock_client1.put_json.return_value = make_awaitable({})
  61. self.make_worker_hs(
  62. "synapse.app.federation_sender",
  63. {
  64. "send_federation": True,
  65. "worker_name": "sender1",
  66. "federation_sender_instances": ["sender1", "sender2"],
  67. },
  68. federation_http_client=mock_client1,
  69. )
  70. mock_client2 = Mock(spec=["put_json"])
  71. mock_client2.put_json.return_value = make_awaitable({})
  72. self.make_worker_hs(
  73. "synapse.app.federation_sender",
  74. {
  75. "send_federation": True,
  76. "worker_name": "sender2",
  77. "federation_sender_instances": ["sender1", "sender2"],
  78. },
  79. federation_http_client=mock_client2,
  80. )
  81. user = self.register_user("user2", "pass")
  82. token = self.login("user2", "pass")
  83. sent_on_1 = False
  84. sent_on_2 = False
  85. for i in range(20):
  86. server_name = "other_server_%d" % (i,)
  87. room = self.create_room_with_remote_server(user, token, server_name)
  88. mock_client1.reset_mock() # type: ignore[attr-defined]
  89. mock_client2.reset_mock() # type: ignore[attr-defined]
  90. self.create_and_send_event(room, UserID.from_string(user))
  91. self.replicate()
  92. if mock_client1.put_json.called:
  93. sent_on_1 = True
  94. mock_client2.put_json.assert_not_called()
  95. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  96. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("pdus"))
  97. elif mock_client2.put_json.called:
  98. sent_on_2 = True
  99. mock_client1.put_json.assert_not_called()
  100. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  101. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("pdus"))
  102. else:
  103. raise AssertionError(
  104. "Expected send transaction from one or the other sender"
  105. )
  106. if sent_on_1 and sent_on_2:
  107. break
  108. self.assertTrue(sent_on_1)
  109. self.assertTrue(sent_on_2)
  110. def test_send_typing_sharded(self):
  111. """Test that using two federation sender workers correctly sends
  112. new typing EDUs.
  113. """
  114. mock_client1 = Mock(spec=["put_json"])
  115. mock_client1.put_json.return_value = make_awaitable({})
  116. self.make_worker_hs(
  117. "synapse.app.federation_sender",
  118. {
  119. "send_federation": True,
  120. "worker_name": "sender1",
  121. "federation_sender_instances": ["sender1", "sender2"],
  122. },
  123. federation_http_client=mock_client1,
  124. )
  125. mock_client2 = Mock(spec=["put_json"])
  126. mock_client2.put_json.return_value = make_awaitable({})
  127. self.make_worker_hs(
  128. "synapse.app.federation_sender",
  129. {
  130. "send_federation": True,
  131. "worker_name": "sender2",
  132. "federation_sender_instances": ["sender1", "sender2"],
  133. },
  134. federation_http_client=mock_client2,
  135. )
  136. user = self.register_user("user3", "pass")
  137. token = self.login("user3", "pass")
  138. typing_handler = self.hs.get_typing_handler()
  139. sent_on_1 = False
  140. sent_on_2 = False
  141. for i in range(20):
  142. server_name = "other_server_%d" % (i,)
  143. room = self.create_room_with_remote_server(user, token, server_name)
  144. mock_client1.reset_mock() # type: ignore[attr-defined]
  145. mock_client2.reset_mock() # type: ignore[attr-defined]
  146. self.get_success(
  147. typing_handler.started_typing(
  148. target_user=UserID.from_string(user),
  149. requester=create_requester(user),
  150. room_id=room,
  151. timeout=20000,
  152. )
  153. )
  154. self.replicate()
  155. if mock_client1.put_json.called:
  156. sent_on_1 = True
  157. mock_client2.put_json.assert_not_called()
  158. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  159. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("edus"))
  160. elif mock_client2.put_json.called:
  161. sent_on_2 = True
  162. mock_client1.put_json.assert_not_called()
  163. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  164. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("edus"))
  165. else:
  166. raise AssertionError(
  167. "Expected send transaction from one or the other sender"
  168. )
  169. if sent_on_1 and sent_on_2:
  170. break
  171. self.assertTrue(sent_on_1)
  172. self.assertTrue(sent_on_2)
  173. def create_room_with_remote_server(self, user, token, remote_server="other_server"):
  174. room = self.helper.create_room_as(user, tok=token)
  175. store = self.hs.get_datastore()
  176. federation = self.hs.get_federation_handler()
  177. prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
  178. room_version = self.get_success(store.get_room_version(room))
  179. factory = EventBuilderFactory(self.hs)
  180. factory.hostname = remote_server
  181. user_id = UserID("user", remote_server).to_string()
  182. event_dict = {
  183. "type": EventTypes.Member,
  184. "state_key": user_id,
  185. "content": {"membership": Membership.JOIN},
  186. "sender": user_id,
  187. "room_id": room,
  188. }
  189. builder = factory.for_room_version(room_version, event_dict)
  190. join_event = self.get_success(
  191. builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
  192. )
  193. self.get_success(federation.on_send_membership_event(remote_server, join_event))
  194. self.replicate()
  195. return room