test_federation_sender_shard.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2020 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from mock import Mock
  17. from synapse.api.constants import EventTypes, Membership
  18. from synapse.events.builder import EventBuilderFactory
  19. from synapse.rest.admin import register_servlets_for_client_rest_resource
  20. from synapse.rest.client.v1 import login, room
  21. from synapse.types import UserID
  22. from tests.replication._base import BaseMultiWorkerStreamTestCase
  23. from tests.test_utils import make_awaitable
  24. logger = logging.getLogger(__name__)
  25. class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
  26. servlets = [
  27. login.register_servlets,
  28. register_servlets_for_client_rest_resource,
  29. room.register_servlets,
  30. ]
  31. def default_config(self):
  32. conf = super().default_config()
  33. conf["send_federation"] = False
  34. return conf
  35. def test_send_event_single_sender(self):
  36. """Test that using a single federation sender worker correctly sends a
  37. new event.
  38. """
  39. mock_client = Mock(spec=["put_json"])
  40. mock_client.put_json.side_effect = lambda *_, **__: make_awaitable({})
  41. self.make_worker_hs(
  42. "synapse.app.federation_sender",
  43. {"send_federation": True},
  44. http_client=mock_client,
  45. )
  46. user = self.register_user("user", "pass")
  47. token = self.login("user", "pass")
  48. room = self.create_room_with_remote_server(user, token)
  49. mock_client.put_json.reset_mock()
  50. self.create_and_send_event(room, UserID.from_string(user))
  51. self.replicate()
  52. # Assert that the event was sent out over federation.
  53. mock_client.put_json.assert_called()
  54. self.assertEqual(mock_client.put_json.call_args[0][0], "other_server")
  55. self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus"))
  56. def test_send_event_sharded(self):
  57. """Test that using two federation sender workers correctly sends
  58. new events.
  59. """
  60. mock_client1 = Mock(spec=["put_json"])
  61. mock_client1.put_json.side_effect = lambda *_, **__: make_awaitable({})
  62. self.make_worker_hs(
  63. "synapse.app.federation_sender",
  64. {
  65. "send_federation": True,
  66. "worker_name": "sender1",
  67. "federation_sender_instances": ["sender1", "sender2"],
  68. },
  69. http_client=mock_client1,
  70. )
  71. mock_client2 = Mock(spec=["put_json"])
  72. mock_client2.put_json.side_effect = lambda *_, **__: make_awaitable({})
  73. self.make_worker_hs(
  74. "synapse.app.federation_sender",
  75. {
  76. "send_federation": True,
  77. "worker_name": "sender2",
  78. "federation_sender_instances": ["sender1", "sender2"],
  79. },
  80. http_client=mock_client2,
  81. )
  82. user = self.register_user("user2", "pass")
  83. token = self.login("user2", "pass")
  84. sent_on_1 = False
  85. sent_on_2 = False
  86. for i in range(20):
  87. server_name = "other_server_%d" % (i,)
  88. room = self.create_room_with_remote_server(user, token, server_name)
  89. mock_client1.reset_mock() # type: ignore[attr-defined]
  90. mock_client2.reset_mock() # type: ignore[attr-defined]
  91. self.create_and_send_event(room, UserID.from_string(user))
  92. self.replicate()
  93. if mock_client1.put_json.called:
  94. sent_on_1 = True
  95. mock_client2.put_json.assert_not_called()
  96. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  97. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("pdus"))
  98. elif mock_client2.put_json.called:
  99. sent_on_2 = True
  100. mock_client1.put_json.assert_not_called()
  101. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  102. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("pdus"))
  103. else:
  104. raise AssertionError(
  105. "Expected send transaction from one or the other sender"
  106. )
  107. if sent_on_1 and sent_on_2:
  108. break
  109. self.assertTrue(sent_on_1)
  110. self.assertTrue(sent_on_2)
  111. def test_send_typing_sharded(self):
  112. """Test that using two federation sender workers correctly sends
  113. new typing EDUs.
  114. """
  115. mock_client1 = Mock(spec=["put_json"])
  116. mock_client1.put_json.side_effect = lambda *_, **__: make_awaitable({})
  117. self.make_worker_hs(
  118. "synapse.app.federation_sender",
  119. {
  120. "send_federation": True,
  121. "worker_name": "sender1",
  122. "federation_sender_instances": ["sender1", "sender2"],
  123. },
  124. http_client=mock_client1,
  125. )
  126. mock_client2 = Mock(spec=["put_json"])
  127. mock_client2.put_json.side_effect = lambda *_, **__: make_awaitable({})
  128. self.make_worker_hs(
  129. "synapse.app.federation_sender",
  130. {
  131. "send_federation": True,
  132. "worker_name": "sender2",
  133. "federation_sender_instances": ["sender1", "sender2"],
  134. },
  135. http_client=mock_client2,
  136. )
  137. user = self.register_user("user3", "pass")
  138. token = self.login("user3", "pass")
  139. typing_handler = self.hs.get_typing_handler()
  140. sent_on_1 = False
  141. sent_on_2 = False
  142. for i in range(20):
  143. server_name = "other_server_%d" % (i,)
  144. room = self.create_room_with_remote_server(user, token, server_name)
  145. mock_client1.reset_mock() # type: ignore[attr-defined]
  146. mock_client2.reset_mock() # type: ignore[attr-defined]
  147. self.get_success(
  148. typing_handler.started_typing(
  149. target_user=UserID.from_string(user),
  150. auth_user=UserID.from_string(user),
  151. room_id=room,
  152. timeout=20000,
  153. )
  154. )
  155. self.replicate()
  156. if mock_client1.put_json.called:
  157. sent_on_1 = True
  158. mock_client2.put_json.assert_not_called()
  159. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  160. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("edus"))
  161. elif mock_client2.put_json.called:
  162. sent_on_2 = True
  163. mock_client1.put_json.assert_not_called()
  164. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  165. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("edus"))
  166. else:
  167. raise AssertionError(
  168. "Expected send transaction from one or the other sender"
  169. )
  170. if sent_on_1 and sent_on_2:
  171. break
  172. self.assertTrue(sent_on_1)
  173. self.assertTrue(sent_on_2)
  174. def create_room_with_remote_server(self, user, token, remote_server="other_server"):
  175. room = self.helper.create_room_as(user, tok=token)
  176. store = self.hs.get_datastore()
  177. federation = self.hs.get_handlers().federation_handler
  178. prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
  179. room_version = self.get_success(store.get_room_version(room))
  180. factory = EventBuilderFactory(self.hs)
  181. factory.hostname = remote_server
  182. user_id = UserID("user", remote_server).to_string()
  183. event_dict = {
  184. "type": EventTypes.Member,
  185. "state_key": user_id,
  186. "content": {"membership": Membership.JOIN},
  187. "sender": user_id,
  188. "room_id": room,
  189. }
  190. builder = factory.for_room_version(room_version, event_dict)
  191. join_event = self.get_success(builder.build(prev_event_ids))
  192. self.get_success(federation.on_send_join_request(remote_server, join_event))
  193. self.replicate()
  194. return room