test_commands.py 1.7 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2020 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from synapse.replication.tcp.commands import (
  16. RdataCommand,
  17. ReplicateCommand,
  18. parse_command_from_line,
  19. )
  20. from tests.unittest import TestCase
  21. class ParseCommandTestCase(TestCase):
  22. def test_parse_one_word_command(self):
  23. line = "REPLICATE"
  24. cmd = parse_command_from_line(line)
  25. self.assertIsInstance(cmd, ReplicateCommand)
  26. def test_parse_rdata(self):
  27. line = 'RDATA events master 6287863 ["ev", ["$eventid", "!roomid", "type", null, null, null]]'
  28. cmd = parse_command_from_line(line)
  29. assert isinstance(cmd, RdataCommand)
  30. self.assertEqual(cmd.stream_name, "events")
  31. self.assertEqual(cmd.instance_name, "master")
  32. self.assertEqual(cmd.token, 6287863)
  33. def test_parse_rdata_batch(self):
  34. line = 'RDATA presence master batch ["@foo:example.com", "online"]'
  35. cmd = parse_command_from_line(line)
  36. assert isinstance(cmd, RdataCommand)
  37. self.assertEqual(cmd.stream_name, "presence")
  38. self.assertEqual(cmd.instance_name, "master")
  39. self.assertIsNone(cmd.token)