test_handler.py 7.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. # Copyright 2022 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from twisted.internet import defer
  15. from synapse.replication.tcp.commands import PositionCommand
  16. from tests.replication._base import BaseMultiWorkerStreamTestCase
  17. class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
  18. def test_subscribed_to_enough_redis_channels(self) -> None:
  19. # The default main process is subscribed to the USER_IP channel.
  20. self.assertCountEqual(
  21. self.hs.get_replication_command_handler()._channels_to_subscribe_to,
  22. ["USER_IP"],
  23. )
  24. def test_background_worker_subscribed_to_user_ip(self) -> None:
  25. # The default main process is subscribed to the USER_IP channel.
  26. worker1 = self.make_worker_hs(
  27. "synapse.app.generic_worker",
  28. extra_config={
  29. "worker_name": "worker1",
  30. "run_background_tasks_on": "worker1",
  31. "redis": {"enabled": True},
  32. },
  33. )
  34. self.assertIn(
  35. "USER_IP",
  36. worker1.get_replication_command_handler()._channels_to_subscribe_to,
  37. )
  38. # Advance so the Redis subscription gets processed
  39. self.pump(0.1)
  40. # The counts are 2 because both the main process and the worker are subscribed.
  41. self.assertEqual(len(self._redis_server._subscribers_by_channel[b"test"]), 2)
  42. self.assertEqual(
  43. len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 2
  44. )
  45. def test_non_background_worker_not_subscribed_to_user_ip(self) -> None:
  46. # The default main process is subscribed to the USER_IP channel.
  47. worker2 = self.make_worker_hs(
  48. "synapse.app.generic_worker",
  49. extra_config={
  50. "worker_name": "worker2",
  51. "run_background_tasks_on": "worker1",
  52. "redis": {"enabled": True},
  53. },
  54. )
  55. self.assertNotIn(
  56. "USER_IP",
  57. worker2.get_replication_command_handler()._channels_to_subscribe_to,
  58. )
  59. # Advance so the Redis subscription gets processed
  60. self.pump(0.1)
  61. # The count is 2 because both the main process and the worker are subscribed.
  62. self.assertEqual(len(self._redis_server._subscribers_by_channel[b"test"]), 2)
  63. # For USER_IP, the count is 1 because only the main process is subscribed.
  64. self.assertEqual(
  65. len(self._redis_server._subscribers_by_channel[b"test/USER_IP"]), 1
  66. )
  67. def test_wait_for_stream_position(self) -> None:
  68. """Check that wait for stream position correctly waits for an update from the
  69. correct instance.
  70. """
  71. store = self.hs.get_datastores().main
  72. cmd_handler = self.hs.get_replication_command_handler()
  73. data_handler = self.hs.get_replication_data_handler()
  74. worker1 = self.make_worker_hs(
  75. "synapse.app.generic_worker",
  76. extra_config={
  77. "worker_name": "worker1",
  78. "run_background_tasks_on": "worker1",
  79. "redis": {"enabled": True},
  80. },
  81. )
  82. cache_id_gen = worker1.get_datastores().main._cache_id_gen
  83. assert cache_id_gen is not None
  84. self.replicate()
  85. # First, make sure the master knows that `worker1` exists.
  86. initial_token = cache_id_gen.get_current_token()
  87. cmd_handler.send_command(
  88. PositionCommand("caches", "worker1", initial_token, initial_token)
  89. )
  90. self.replicate()
  91. # Next send out a normal RDATA, and check that waiting for that stream
  92. # ID returns immediately.
  93. ctx = cache_id_gen.get_next()
  94. next_token = self.get_success(ctx.__aenter__())
  95. self.get_success(ctx.__aexit__(None, None, None))
  96. self.get_success(
  97. data_handler.wait_for_stream_position("worker1", "caches", next_token)
  98. )
  99. # `wait_for_stream_position` should only return once master receives a
  100. # notification that `next_token` has persisted.
  101. ctx_worker1 = cache_id_gen.get_next()
  102. next_token = self.get_success(ctx_worker1.__aenter__())
  103. d = defer.ensureDeferred(
  104. data_handler.wait_for_stream_position("worker1", "caches", next_token)
  105. )
  106. self.assertFalse(d.called)
  107. # ... updating the cache ID gen on the master still shouldn't cause the
  108. # deferred to wake up.
  109. assert store._cache_id_gen is not None
  110. ctx = store._cache_id_gen.get_next()
  111. self.get_success(ctx.__aenter__())
  112. self.get_success(ctx.__aexit__(None, None, None))
  113. d = defer.ensureDeferred(
  114. data_handler.wait_for_stream_position("worker1", "caches", next_token)
  115. )
  116. self.assertFalse(d.called)
  117. # ... but worker1 finishing (and so sending an update) should.
  118. self.get_success(ctx_worker1.__aexit__(None, None, None))
  119. self.assertTrue(d.called)
  120. def test_wait_for_stream_position_rdata(self) -> None:
  121. """Check that wait for stream position correctly waits for an update
  122. from the correct instance, when RDATA is sent.
  123. """
  124. store = self.hs.get_datastores().main
  125. cmd_handler = self.hs.get_replication_command_handler()
  126. data_handler = self.hs.get_replication_data_handler()
  127. worker1 = self.make_worker_hs(
  128. "synapse.app.generic_worker",
  129. extra_config={
  130. "worker_name": "worker1",
  131. "run_background_tasks_on": "worker1",
  132. "redis": {"enabled": True},
  133. },
  134. )
  135. cache_id_gen = worker1.get_datastores().main._cache_id_gen
  136. assert cache_id_gen is not None
  137. self.replicate()
  138. # First, make sure the master knows that `worker1` exists.
  139. initial_token = cache_id_gen.get_current_token()
  140. cmd_handler.send_command(
  141. PositionCommand("caches", "worker1", initial_token, initial_token)
  142. )
  143. self.replicate()
  144. # `wait_for_stream_position` should only return once master receives a
  145. # notification that `next_token2` has persisted.
  146. ctx_worker1 = cache_id_gen.get_next_mult(2)
  147. next_token1, next_token2 = self.get_success(ctx_worker1.__aenter__())
  148. d = defer.ensureDeferred(
  149. data_handler.wait_for_stream_position("worker1", "caches", next_token2)
  150. )
  151. self.assertFalse(d.called)
  152. # Insert an entry into the cache stream with token `next_token1`, but
  153. # not `next_token2`.
  154. self.get_success(
  155. store.db_pool.simple_insert(
  156. table="cache_invalidation_stream_by_instance",
  157. values={
  158. "stream_id": next_token1,
  159. "instance_name": "worker1",
  160. "cache_func": "foo",
  161. "keys": [],
  162. "invalidation_ts": 0,
  163. },
  164. )
  165. )
  166. # Finish the context manager, triggering the data to be sent to master.
  167. self.get_success(ctx_worker1.__aexit__(None, None, None))
  168. # Master should get told about `next_token2`, so the deferred should
  169. # resolve.
  170. self.assertTrue(d.called)