test_federation_ack.py 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2020 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import mock
  16. from synapse.app.generic_worker import GenericWorkerServer
  17. from synapse.replication.tcp.commands import FederationAckCommand
  18. from synapse.replication.tcp.protocol import AbstractConnection
  19. from synapse.replication.tcp.streams.federation import FederationStream
  20. from tests.unittest import HomeserverTestCase
  21. class FederationAckTestCase(HomeserverTestCase):
  22. def default_config(self) -> dict:
  23. config = super().default_config()
  24. config["worker_app"] = "synapse.app.federation_sender"
  25. config["send_federation"] = True
  26. return config
  27. def make_homeserver(self, reactor, clock):
  28. hs = self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer)
  29. return hs
  30. def test_federation_ack_sent(self):
  31. """A FEDERATION_ACK should be sent back after each RDATA federation
  32. This test checks that the federation sender is correctly sending back
  33. FEDERATION_ACK messages. The test works by spinning up a federation_sender
  34. worker server, and then fishing out its ReplicationCommandHandler. We wire
  35. the RCH up to a mock connection (so that we can observe the command being sent)
  36. and then poke in an RDATA row.
  37. XXX: it might be nice to do this by pretending to be a synapse master worker
  38. (or a redis server), and having the worker connect to us via a mocked-up TCP
  39. transport, rather than assuming that the implementation has a
  40. ReplicationCommandHandler.
  41. """
  42. rch = self.hs.get_tcp_replication()
  43. # wire up the ReplicationCommandHandler to a mock connection
  44. mock_connection = mock.Mock(spec=AbstractConnection)
  45. rch.new_connection(mock_connection)
  46. # tell it it received an RDATA row
  47. self.get_success(
  48. rch.on_rdata(
  49. "federation",
  50. "master",
  51. token=10,
  52. rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
  53. )
  54. )
  55. # now check that the FEDERATION_ACK was sent
  56. mock_connection.send_command.assert_called_once()
  57. cmd = mock_connection.send_command.call_args[0][0]
  58. assert isinstance(cmd, FederationAckCommand)
  59. self.assertEqual(cmd.token, 10)