test_handler.py 5.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142
  1. # Copyright 2022 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from twisted.internet import defer
  15. from synapse.replication.tcp.commands import PositionCommand
  16. from tests.replication._base import BaseMultiWorkerStreamTestCase
  17. class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
  18. def test_subscribed_to_enough_redis_channels(self) -> None:
  19. # The default main process is subscribed to the USER_IP channel.
  20. self.assertCountEqual(
  21. self.hs.get_replication_command_handler()._channels_to_subscribe_to,
  22. ["USER_IP"],
  23. )
  24. def test_background_worker_subscribed_to_user_ip(self) -> None:
  25. # The default main process is subscribed to the USER_IP channel.
  26. worker1 = self.make_worker_hs(
  27. "synapse.app.generic_worker",
  28. extra_config={
  29. "worker_name": "worker1",
  30. "run_background_tasks_on": "worker1",
  31. "redis": {"enabled": True},
  32. },
  33. )
  34. self.assertIn(
  35. "USER_IP",
  36. worker1.get_replication_command_handler()._channels_to_subscribe_to,
  37. )
  38. # Advance so the Redis subscription gets processed
  39. self.pump(0.1)
  40. # The counts are 2 because both the main process and the worker are subscribed.
  41. self.assertEqual(len(self._redis_server._subscribers_by_channel[b"test"]), 2)
  42. self.assertEqual(
  43. len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 2
  44. )
  45. def test_non_background_worker_not_subscribed_to_user_ip(self) -> None:
  46. # The default main process is subscribed to the USER_IP channel.
  47. worker2 = self.make_worker_hs(
  48. "synapse.app.generic_worker",
  49. extra_config={
  50. "worker_name": "worker2",
  51. "run_background_tasks_on": "worker1",
  52. "redis": {"enabled": True},
  53. },
  54. )
  55. self.assertNotIn(
  56. "USER_IP",
  57. worker2.get_replication_command_handler()._channels_to_subscribe_to,
  58. )
  59. # Advance so the Redis subscription gets processed
  60. self.pump(0.1)
  61. # The count is 2 because both the main process and the worker are subscribed.
  62. self.assertEqual(len(self._redis_server._subscribers_by_channel[b"test"]), 2)
  63. # For USER_IP, the count is 1 because only the main process is subscribed.
  64. self.assertEqual(
  65. len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 1
  66. )
  67. def test_wait_for_stream_position(self) -> None:
  68. """Check that wait for stream position correctly waits for an update from the
  69. correct instance.
  70. """
  71. store = self.hs.get_datastores().main
  72. cmd_handler = self.hs.get_replication_command_handler()
  73. data_handler = self.hs.get_replication_data_handler()
  74. worker1 = self.make_worker_hs(
  75. "synapse.app.generic_worker",
  76. extra_config={
  77. "worker_name": "worker1",
  78. "run_background_tasks_on": "worker1",
  79. "redis": {"enabled": True},
  80. },
  81. )
  82. cache_id_gen = worker1.get_datastores().main._cache_id_gen
  83. assert cache_id_gen is not None
  84. self.replicate()
  85. # First, make sure the master knows that `worker1` exists.
  86. initial_token = cache_id_gen.get_current_token()
  87. cmd_handler.send_command(
  88. PositionCommand("caches", "worker1", initial_token, initial_token)
  89. )
  90. self.replicate()
  91. # Next send out a normal RDATA, and check that waiting for that stream
  92. # ID returns immediately.
  93. ctx = cache_id_gen.get_next()
  94. next_token = self.get_success(ctx.__aenter__())
  95. self.get_success(ctx.__aexit__(None, None, None))
  96. self.get_success(
  97. data_handler.wait_for_stream_position("worker1", "caches", next_token)
  98. )
  99. # `wait_for_stream_position` should only return once master receives a
  100. # notification that `next_token` has persisted.
  101. ctx_worker1 = cache_id_gen.get_next()
  102. next_token = self.get_success(ctx_worker1.__aenter__())
  103. d = defer.ensureDeferred(
  104. data_handler.wait_for_stream_position("worker1", "caches", next_token)
  105. )
  106. self.assertFalse(d.called)
  107. # ... updating the cache ID gen on the master still shouldn't cause the
  108. # deferred to wake up.
  109. ctx = store._cache_id_gen.get_next()
  110. self.get_success(ctx.__aenter__())
  111. self.get_success(ctx.__aexit__(None, None, None))
  112. d = defer.ensureDeferred(
  113. data_handler.wait_for_stream_position("worker1", "caches", next_token)
  114. )
  115. self.assertFalse(d.called)
  116. # ... but worker1 finishing (and so sending an update) should.
  117. self.get_success(ctx_worker1.__aexit__(None, None, None))
  118. self.assertTrue(d.called)