test_federation_ack.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from unittest import mock
  15. from twisted.test.proto_helpers import MemoryReactor
  16. from synapse.app.generic_worker import GenericWorkerServer
  17. from synapse.replication.tcp.commands import FederationAckCommand
  18. from synapse.replication.tcp.protocol import IReplicationConnection
  19. from synapse.replication.tcp.streams.federation import FederationStream
  20. from synapse.server import HomeServer
  21. from synapse.util import Clock
  22. from tests.unittest import HomeserverTestCase
  23. class FederationAckTestCase(HomeserverTestCase):
  24. def default_config(self) -> dict:
  25. config = super().default_config()
  26. config["worker_app"] = "synapse.app.generic_worker"
  27. config["worker_name"] = "federation_sender1"
  28. config["federation_sender_instances"] = ["federation_sender1"]
  29. config["instance_map"] = {"main": {"host": "127.0.0.1", "port": 0}}
  30. return config
  31. def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
  32. return self.setup_test_homeserver(homeserver_to_use=GenericWorkerServer)
  33. def test_federation_ack_sent(self) -> None:
  34. """A FEDERATION_ACK should be sent back after each RDATA federation
  35. This test checks that the federation sender is correctly sending back
  36. FEDERATION_ACK messages. The test works by spinning up a federation_sender
  37. worker server, and then fishing out its ReplicationCommandHandler. We wire
  38. the RCH up to a mock connection (so that we can observe the command being sent)
  39. and then poke in an RDATA row.
  40. XXX: it might be nice to do this by pretending to be a synapse master worker
  41. (or a redis server), and having the worker connect to us via a mocked-up TCP
  42. transport, rather than assuming that the implementation has a
  43. ReplicationCommandHandler.
  44. """
  45. rch = self.hs.get_replication_command_handler()
  46. # wire up the ReplicationCommandHandler to a mock connection, which needs
  47. # to implement IReplicationConnection. (Note that Mock doesn't understand
  48. # interfaces, but casing an interface to a list gives the attributes.)
  49. mock_connection = mock.Mock(spec=list(IReplicationConnection))
  50. rch.new_connection(mock_connection)
  51. # tell it it received an RDATA row
  52. self.get_success(
  53. rch.on_rdata(
  54. "federation",
  55. "master",
  56. token=10,
  57. rows=[
  58. FederationStream.FederationStreamRow(
  59. type="x", data={"test": [1, 2, 3]}
  60. )
  61. ],
  62. )
  63. )
  64. # now check that the FEDERATION_ACK was sent
  65. mock_connection.send_command.assert_called_once()
  66. cmd = mock_connection.send_command.call_args[0][0]
  67. assert isinstance(cmd, FederationAckCommand)
  68. self.assertEqual(cmd.token, 10)