test_federation_sender_shard.py 8.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2020 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from mock import Mock
  17. from twisted.internet import defer
  18. from synapse.api.constants import EventTypes, Membership
  19. from synapse.events.builder import EventBuilderFactory
  20. from synapse.rest.admin import register_servlets_for_client_rest_resource
  21. from synapse.rest.client.v1 import login, room
  22. from synapse.types import UserID
  23. from tests.replication._base import BaseMultiWorkerStreamTestCase
  24. logger = logging.getLogger(__name__)
  25. class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
  26. servlets = [
  27. login.register_servlets,
  28. register_servlets_for_client_rest_resource,
  29. room.register_servlets,
  30. ]
  31. def default_config(self):
  32. conf = super().default_config()
  33. conf["send_federation"] = False
  34. return conf
  35. def test_send_event_single_sender(self):
  36. """Test that using a single federation sender worker correctly sends a
  37. new event.
  38. """
  39. mock_client = Mock(spec=["put_json"])
  40. mock_client.put_json.side_effect = lambda *_, **__: defer.succeed({})
  41. self.make_worker_hs(
  42. "synapse.app.federation_sender",
  43. {"send_federation": True},
  44. http_client=mock_client,
  45. )
  46. user = self.register_user("user", "pass")
  47. token = self.login("user", "pass")
  48. room = self.create_room_with_remote_server(user, token)
  49. mock_client.put_json.reset_mock()
  50. self.create_and_send_event(room, UserID.from_string(user))
  51. self.replicate()
  52. # Assert that the event was sent out over federation.
  53. mock_client.put_json.assert_called()
  54. self.assertEqual(mock_client.put_json.call_args[0][0], "other_server")
  55. self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus"))
  56. def test_send_event_sharded(self):
  57. """Test that using two federation sender workers correctly sends
  58. new events.
  59. """
  60. mock_client1 = Mock(spec=["put_json"])
  61. mock_client1.put_json.side_effect = lambda *_, **__: defer.succeed({})
  62. self.make_worker_hs(
  63. "synapse.app.federation_sender",
  64. {
  65. "send_federation": True,
  66. "worker_name": "sender1",
  67. "federation_sender_instances": ["sender1", "sender2"],
  68. },
  69. http_client=mock_client1,
  70. )
  71. mock_client2 = Mock(spec=["put_json"])
  72. mock_client2.put_json.side_effect = lambda *_, **__: defer.succeed({})
  73. self.make_worker_hs(
  74. "synapse.app.federation_sender",
  75. {
  76. "send_federation": True,
  77. "worker_name": "sender2",
  78. "federation_sender_instances": ["sender1", "sender2"],
  79. },
  80. http_client=mock_client2,
  81. )
  82. user = self.register_user("user2", "pass")
  83. token = self.login("user2", "pass")
  84. sent_on_1 = False
  85. sent_on_2 = False
  86. for i in range(20):
  87. server_name = "other_server_%d" % (i,)
  88. room = self.create_room_with_remote_server(user, token, server_name)
  89. mock_client1.reset_mock() # type: ignore[attr-defined]
  90. mock_client2.reset_mock() # type: ignore[attr-defined]
  91. self.create_and_send_event(room, UserID.from_string(user))
  92. self.replicate()
  93. if mock_client1.put_json.called:
  94. sent_on_1 = True
  95. mock_client2.put_json.assert_not_called()
  96. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  97. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("pdus"))
  98. elif mock_client2.put_json.called:
  99. sent_on_2 = True
  100. mock_client1.put_json.assert_not_called()
  101. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  102. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("pdus"))
  103. else:
  104. raise AssertionError(
  105. "Expected send transaction from one or the other sender"
  106. )
  107. if sent_on_1 and sent_on_2:
  108. break
  109. self.assertTrue(sent_on_1)
  110. self.assertTrue(sent_on_2)
  111. def test_send_typing_sharded(self):
  112. """Test that using two federation sender workers correctly sends
  113. new typing EDUs.
  114. """
  115. mock_client1 = Mock(spec=["put_json"])
  116. mock_client1.put_json.side_effect = lambda *_, **__: defer.succeed({})
  117. self.make_worker_hs(
  118. "synapse.app.federation_sender",
  119. {
  120. "send_federation": True,
  121. "worker_name": "sender1",
  122. "federation_sender_instances": ["sender1", "sender2"],
  123. },
  124. http_client=mock_client1,
  125. )
  126. mock_client2 = Mock(spec=["put_json"])
  127. mock_client2.put_json.side_effect = lambda *_, **__: defer.succeed({})
  128. self.make_worker_hs(
  129. "synapse.app.federation_sender",
  130. {
  131. "send_federation": True,
  132. "worker_name": "sender2",
  133. "federation_sender_instances": ["sender1", "sender2"],
  134. },
  135. http_client=mock_client2,
  136. )
  137. user = self.register_user("user3", "pass")
  138. token = self.login("user3", "pass")
  139. typing_handler = self.hs.get_typing_handler()
  140. sent_on_1 = False
  141. sent_on_2 = False
  142. for i in range(20):
  143. server_name = "other_server_%d" % (i,)
  144. room = self.create_room_with_remote_server(user, token, server_name)
  145. mock_client1.reset_mock() # type: ignore[attr-defined]
  146. mock_client2.reset_mock() # type: ignore[attr-defined]
  147. self.get_success(
  148. typing_handler.started_typing(
  149. target_user=UserID.from_string(user),
  150. auth_user=UserID.from_string(user),
  151. room_id=room,
  152. timeout=20000,
  153. )
  154. )
  155. self.replicate()
  156. if mock_client1.put_json.called:
  157. sent_on_1 = True
  158. mock_client2.put_json.assert_not_called()
  159. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  160. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("edus"))
  161. elif mock_client2.put_json.called:
  162. sent_on_2 = True
  163. mock_client1.put_json.assert_not_called()
  164. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  165. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("edus"))
  166. else:
  167. raise AssertionError(
  168. "Expected send transaction from one or the other sender"
  169. )
  170. if sent_on_1 and sent_on_2:
  171. break
  172. self.assertTrue(sent_on_1)
  173. self.assertTrue(sent_on_2)
  174. def create_room_with_remote_server(self, user, token, remote_server="other_server"):
  175. room = self.helper.create_room_as(user, tok=token)
  176. store = self.hs.get_datastore()
  177. federation = self.hs.get_handlers().federation_handler
  178. prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
  179. room_version = self.get_success(store.get_room_version(room))
  180. factory = EventBuilderFactory(self.hs)
  181. factory.hostname = remote_server
  182. user_id = UserID("user", remote_server).to_string()
  183. event_dict = {
  184. "type": EventTypes.Member,
  185. "state_key": user_id,
  186. "content": {"membership": Membership.JOIN},
  187. "sender": user_id,
  188. "room_id": room,
  189. }
  190. builder = factory.for_room_version(room_version, event_dict)
  191. join_event = self.get_success(builder.build(prev_event_ids))
  192. self.get_success(federation.on_send_join_request(remote_server, join_event))
  193. self.replicate()
  194. return room