1
0

test_federation_sender_shard.py 9.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from unittest.mock import Mock
  16. from netaddr import IPSet
  17. from synapse.api.constants import EventTypes, Membership
  18. from synapse.events.builder import EventBuilderFactory
  19. from synapse.handlers.typing import TypingWriterHandler
  20. from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
  21. from synapse.rest.admin import register_servlets_for_client_rest_resource
  22. from synapse.rest.client import login, room
  23. from synapse.types import UserID, create_requester
  24. from tests.replication._base import BaseMultiWorkerStreamTestCase
  25. from tests.server import get_clock
  26. from tests.test_utils import make_awaitable
  27. logger = logging.getLogger(__name__)
  28. class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
  29. """
  30. Various tests for federation sending on workers.
  31. Federation sending is disabled by default, it will be enabled in each test by
  32. updating 'federation_sender_instances'.
  33. """
  34. servlets = [
  35. login.register_servlets,
  36. register_servlets_for_client_rest_resource,
  37. room.register_servlets,
  38. ]
  39. def setUp(self) -> None:
  40. super().setUp()
  41. reactor, _ = get_clock()
  42. self.matrix_federation_agent = MatrixFederationAgent(
  43. reactor,
  44. tls_client_options_factory=None,
  45. user_agent=b"SynapseInTrialTest/0.0.0",
  46. ip_allowlist=None,
  47. ip_blocklist=IPSet(),
  48. )
  49. def test_send_event_single_sender(self) -> None:
  50. """Test that using a single federation sender worker correctly sends a
  51. new event.
  52. """
  53. mock_client = Mock(spec=["put_json"])
  54. mock_client.put_json.return_value = make_awaitable({})
  55. mock_client.agent = self.matrix_federation_agent
  56. self.make_worker_hs(
  57. "synapse.app.generic_worker",
  58. {
  59. "worker_name": "federation_sender1",
  60. "federation_sender_instances": ["federation_sender1"],
  61. },
  62. federation_http_client=mock_client,
  63. )
  64. user = self.register_user("user", "pass")
  65. token = self.login("user", "pass")
  66. room = self.create_room_with_remote_server(user, token)
  67. mock_client.put_json.reset_mock()
  68. self.create_and_send_event(room, UserID.from_string(user))
  69. self.replicate()
  70. # Assert that the event was sent out over federation.
  71. mock_client.put_json.assert_called()
  72. self.assertEqual(mock_client.put_json.call_args[0][0], "other_server")
  73. self.assertTrue(mock_client.put_json.call_args[1]["data"].get("pdus"))
  74. def test_send_event_sharded(self) -> None:
  75. """Test that using two federation sender workers correctly sends
  76. new events.
  77. """
  78. mock_client1 = Mock(spec=["put_json"])
  79. mock_client1.put_json.return_value = make_awaitable({})
  80. mock_client1.agent = self.matrix_federation_agent
  81. self.make_worker_hs(
  82. "synapse.app.generic_worker",
  83. {
  84. "worker_name": "federation_sender1",
  85. "federation_sender_instances": [
  86. "federation_sender1",
  87. "federation_sender2",
  88. ],
  89. },
  90. federation_http_client=mock_client1,
  91. )
  92. mock_client2 = Mock(spec=["put_json"])
  93. mock_client2.put_json.return_value = make_awaitable({})
  94. mock_client2.agent = self.matrix_federation_agent
  95. self.make_worker_hs(
  96. "synapse.app.generic_worker",
  97. {
  98. "worker_name": "federation_sender2",
  99. "federation_sender_instances": [
  100. "federation_sender1",
  101. "federation_sender2",
  102. ],
  103. },
  104. federation_http_client=mock_client2,
  105. )
  106. user = self.register_user("user2", "pass")
  107. token = self.login("user2", "pass")
  108. sent_on_1 = False
  109. sent_on_2 = False
  110. for i in range(20):
  111. server_name = "other_server_%d" % (i,)
  112. room = self.create_room_with_remote_server(user, token, server_name)
  113. mock_client1.reset_mock()
  114. mock_client2.reset_mock()
  115. self.create_and_send_event(room, UserID.from_string(user))
  116. self.replicate()
  117. if mock_client1.put_json.called:
  118. sent_on_1 = True
  119. mock_client2.put_json.assert_not_called()
  120. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  121. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("pdus"))
  122. elif mock_client2.put_json.called:
  123. sent_on_2 = True
  124. mock_client1.put_json.assert_not_called()
  125. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  126. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("pdus"))
  127. else:
  128. raise AssertionError(
  129. "Expected send transaction from one or the other sender"
  130. )
  131. if sent_on_1 and sent_on_2:
  132. break
  133. self.assertTrue(sent_on_1)
  134. self.assertTrue(sent_on_2)
  135. def test_send_typing_sharded(self) -> None:
  136. """Test that using two federation sender workers correctly sends
  137. new typing EDUs.
  138. """
  139. mock_client1 = Mock(spec=["put_json"])
  140. mock_client1.put_json.return_value = make_awaitable({})
  141. mock_client1.agent = self.matrix_federation_agent
  142. self.make_worker_hs(
  143. "synapse.app.generic_worker",
  144. {
  145. "worker_name": "federation_sender1",
  146. "federation_sender_instances": [
  147. "federation_sender1",
  148. "federation_sender2",
  149. ],
  150. },
  151. federation_http_client=mock_client1,
  152. )
  153. mock_client2 = Mock(spec=["put_json"])
  154. mock_client2.put_json.return_value = make_awaitable({})
  155. mock_client2.agent = self.matrix_federation_agent
  156. self.make_worker_hs(
  157. "synapse.app.generic_worker",
  158. {
  159. "worker_name": "federation_sender2",
  160. "federation_sender_instances": [
  161. "federation_sender1",
  162. "federation_sender2",
  163. ],
  164. },
  165. federation_http_client=mock_client2,
  166. )
  167. user = self.register_user("user3", "pass")
  168. token = self.login("user3", "pass")
  169. typing_handler = self.hs.get_typing_handler()
  170. assert isinstance(typing_handler, TypingWriterHandler)
  171. sent_on_1 = False
  172. sent_on_2 = False
  173. for i in range(20):
  174. server_name = "other_server_%d" % (i,)
  175. room = self.create_room_with_remote_server(user, token, server_name)
  176. mock_client1.reset_mock()
  177. mock_client2.reset_mock()
  178. self.get_success(
  179. typing_handler.started_typing(
  180. target_user=UserID.from_string(user),
  181. requester=create_requester(user),
  182. room_id=room,
  183. timeout=20000,
  184. )
  185. )
  186. self.replicate()
  187. if mock_client1.put_json.called:
  188. sent_on_1 = True
  189. mock_client2.put_json.assert_not_called()
  190. self.assertEqual(mock_client1.put_json.call_args[0][0], server_name)
  191. self.assertTrue(mock_client1.put_json.call_args[1]["data"].get("edus"))
  192. elif mock_client2.put_json.called:
  193. sent_on_2 = True
  194. mock_client1.put_json.assert_not_called()
  195. self.assertEqual(mock_client2.put_json.call_args[0][0], server_name)
  196. self.assertTrue(mock_client2.put_json.call_args[1]["data"].get("edus"))
  197. else:
  198. raise AssertionError(
  199. "Expected send transaction from one or the other sender"
  200. )
  201. if sent_on_1 and sent_on_2:
  202. break
  203. self.assertTrue(sent_on_1)
  204. self.assertTrue(sent_on_2)
  205. def create_room_with_remote_server(
  206. self, user: str, token: str, remote_server: str = "other_server"
  207. ) -> str:
  208. room = self.helper.create_room_as(user, tok=token)
  209. store = self.hs.get_datastores().main
  210. federation = self.hs.get_federation_event_handler()
  211. prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
  212. room_version = self.get_success(store.get_room_version(room))
  213. factory = EventBuilderFactory(self.hs)
  214. factory.hostname = remote_server
  215. user_id = UserID("user", remote_server).to_string()
  216. event_dict = {
  217. "type": EventTypes.Member,
  218. "state_key": user_id,
  219. "content": {"membership": Membership.JOIN},
  220. "sender": user_id,
  221. "room_id": room,
  222. }
  223. builder = factory.for_room_version(room_version, event_dict)
  224. join_event = self.get_success(
  225. builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
  226. )
  227. self.get_success(federation.on_send_membership_event(remote_server, join_event))
  228. self.replicate()
  229. return room