test_federation_sender_shard.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from unittest.mock import Mock
  16. from synapse.api.constants import EventTypes, Membership
  17. from synapse.events.builder import EventBuilderFactory
  18. from synapse.rest.admin import register_servlets_for_client_rest_resource
  19. from synapse.rest.client import login, room
  20. from synapse.types import UserID, create_requester
  21. from tests.replication._base import BaseMultiWorkerStreamTestCase
  22. from tests.test_utils import make_awaitable
  23. logger = logging.getLogger(__name__)
  24. class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
  25. """
  26. Various tests for federation sending on workers.
  27. Federation sending is disabled by default, it will be enabled in each test by
  28. updating 'federation_sender_instances'.
  29. """
  30. servlets = [
  31. login.register_servlets,
  32. register_servlets_for_client_rest_resource,
  33. room.register_servlets,
  34. ]
  35. def test_send_event_single_sender(self):
  36. """Test that using a single federation sender worker correctly sends a
  37. new event.
  38. """
  39. mock_client = Mock(spec=["put_json"])
  40. mock_client.put_json.return_value = make_awaitable({})
  41. self.make_worker_hs(
  42. "synapse.app.generic_worker",
  43. {
  44. "worker_name": "federation_sender1",
  45. "federation_sender_instances": ["federation_sender1"],
  46. },
  47. federation_http_client=mock_client,
  48. )
  49. user = self.register_user("user", "pass")
  50. token = self.login("user", "pass")
  51. room = self.create_room_with_remote_server(user, token)
  52. mock_client.put_json.reset_mock()
  53. self.create_and_send_event(room, UserID.from_string(user))
  54. self.replicate()
  55. # Assert that the event was sent out over federation.
  56. mock_client.put_json.assert_called()
  57. self.assertEqual(mock_client.put_json.call_args[0][0], "other_server")
  58. self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus"))
  59. def test_send_event_sharded(self):
  60. """Test that using two federation sender workers correctly sends
  61. new events.
  62. """
  63. mock_client1 = Mock(spec=["put_json"])
  64. mock_client1.put_json.return_value = make_awaitable({})
  65. self.make_worker_hs(
  66. "synapse.app.generic_worker",
  67. {
  68. "worker_name": "federation_sender1",
  69. "federation_sender_instances": [
  70. "federation_sender1",
  71. "federation_sender2",
  72. ],
  73. },
  74. federation_http_client=mock_client1,
  75. )
  76. mock_client2 = Mock(spec=["put_json"])
  77. mock_client2.put_json.return_value = make_awaitable({})
  78. self.make_worker_hs(
  79. "synapse.app.generic_worker",
  80. {
  81. "worker_name": "federation_sender2",
  82. "federation_sender_instances": [
  83. "federation_sender1",
  84. "federation_sender2",
  85. ],
  86. },
  87. federation_http_client=mock_client2,
  88. )
  89. user = self.register_user("user2", "pass")
  90. token = self.login("user2", "pass")
  91. sent_on_1 = False
  92. sent_on_2 = False
  93. for i in range(20):
  94. server_name = "other_server_%d" % (i,)
  95. room = self.create_room_with_remote_server(user, token, server_name)
  96. mock_client1.reset_mock()
  97. mock_client2.reset_mock()
  98. self.create_and_send_event(room, UserID.from_string(user))
  99. self.replicate()
  100. if mock_client1.put_json.called:
  101. sent_on_1 = True
  102. mock_client2.put_json.assert_not_called()
  103. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  104. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("pdus"))
  105. elif mock_client2.put_json.called:
  106. sent_on_2 = True
  107. mock_client1.put_json.assert_not_called()
  108. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  109. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("pdus"))
  110. else:
  111. raise AssertionError(
  112. "Expected send transaction from one or the other sender"
  113. )
  114. if sent_on_1 and sent_on_2:
  115. break
  116. self.assertTrue(sent_on_1)
  117. self.assertTrue(sent_on_2)
  118. def test_send_typing_sharded(self):
  119. """Test that using two federation sender workers correctly sends
  120. new typing EDUs.
  121. """
  122. mock_client1 = Mock(spec=["put_json"])
  123. mock_client1.put_json.return_value = make_awaitable({})
  124. self.make_worker_hs(
  125. "synapse.app.generic_worker",
  126. {
  127. "worker_name": "federation_sender1",
  128. "federation_sender_instances": [
  129. "federation_sender1",
  130. "federation_sender2",
  131. ],
  132. },
  133. federation_http_client=mock_client1,
  134. )
  135. mock_client2 = Mock(spec=["put_json"])
  136. mock_client2.put_json.return_value = make_awaitable({})
  137. self.make_worker_hs(
  138. "synapse.app.generic_worker",
  139. {
  140. "worker_name": "federation_sender2",
  141. "federation_sender_instances": [
  142. "federation_sender1",
  143. "federation_sender2",
  144. ],
  145. },
  146. federation_http_client=mock_client2,
  147. )
  148. user = self.register_user("user3", "pass")
  149. token = self.login("user3", "pass")
  150. typing_handler = self.hs.get_typing_handler()
  151. sent_on_1 = False
  152. sent_on_2 = False
  153. for i in range(20):
  154. server_name = "other_server_%d" % (i,)
  155. room = self.create_room_with_remote_server(user, token, server_name)
  156. mock_client1.reset_mock()
  157. mock_client2.reset_mock()
  158. self.get_success(
  159. typing_handler.started_typing(
  160. target_user=UserID.from_string(user),
  161. requester=create_requester(user),
  162. room_id=room,
  163. timeout=20000,
  164. )
  165. )
  166. self.replicate()
  167. if mock_client1.put_json.called:
  168. sent_on_1 = True
  169. mock_client2.put_json.assert_not_called()
  170. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  171. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("edus"))
  172. elif mock_client2.put_json.called:
  173. sent_on_2 = True
  174. mock_client1.put_json.assert_not_called()
  175. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  176. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("edus"))
  177. else:
  178. raise AssertionError(
  179. "Expected send transaction from one or the other sender"
  180. )
  181. if sent_on_1 and sent_on_2:
  182. break
  183. self.assertTrue(sent_on_1)
  184. self.assertTrue(sent_on_2)
  185. def create_room_with_remote_server(self, user, token, remote_server="other_server"):
  186. room = self.helper.create_room_as(user, tok=token)
  187. store = self.hs.get_datastores().main
  188. federation = self.hs.get_federation_event_handler()
  189. prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
  190. room_version = self.get_success(store.get_room_version(room))
  191. factory = EventBuilderFactory(self.hs)
  192. factory.hostname = remote_server
  193. user_id = UserID("user", remote_server).to_string()
  194. event_dict = {
  195. "type": EventTypes.Member,
  196. "state_key": user_id,
  197. "content": {"membership": Membership.JOIN},
  198. "sender": user_id,
  199. "room_id": room,
  200. }
  201. builder = factory.for_room_version(room_version, event_dict)
  202. join_event = self.get_success(
  203. builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
  204. )
  205. self.get_success(federation.on_send_membership_event(remote_server, join_event))
  206. self.replicate()
  207. return room