test_pusher_shard.py 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from unittest.mock import Mock
  16. from twisted.internet import defer
  17. from twisted.test.proto_helpers import MemoryReactor
  18. from synapse.rest import admin
  19. from synapse.rest.client import login, room
  20. from synapse.server import HomeServer
  21. from synapse.util import Clock
  22. from tests.replication._base import BaseMultiWorkerStreamTestCase
  23. logger = logging.getLogger(__name__)
  24. class PusherShardTestCase(BaseMultiWorkerStreamTestCase):
  25. """Checks pusher sharding works"""
  26. servlets = [
  27. admin.register_servlets_for_client_rest_resource,
  28. room.register_servlets,
  29. login.register_servlets,
  30. ]
  31. def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
  32. # Register a user who sends a message that we'll get notified about
  33. self.other_user_id = self.register_user("otheruser", "pass")
  34. self.other_access_token = self.login("otheruser", "pass")
  35. def _create_pusher_and_send_msg(self, localpart: str) -> str:
  36. # Create a user that will get push notifications
  37. user_id = self.register_user(localpart, "pass")
  38. access_token = self.login(localpart, "pass")
  39. # Register a pusher
  40. user_dict = self.get_success(
  41. self.hs.get_datastores().main.get_user_by_access_token(access_token)
  42. )
  43. assert user_dict is not None
  44. token_id = user_dict.token_id
  45. self.get_success(
  46. self.hs.get_pusherpool().add_or_update_pusher(
  47. user_id=user_id,
  48. access_token=token_id,
  49. kind="http",
  50. app_id="m.http",
  51. app_display_name="HTTP Push Notifications",
  52. device_display_name="pushy push",
  53. pushkey="a@example.com",
  54. lang=None,
  55. data={"url": "https://push.example.com/_matrix/push/v1/notify"},
  56. )
  57. )
  58. self.pump()
  59. # Create a room
  60. room = self.helper.create_room_as(user_id, tok=access_token)
  61. # The other user joins
  62. self.helper.join(
  63. room=room, user=self.other_user_id, tok=self.other_access_token
  64. )
  65. # The other user sends some messages
  66. response = self.helper.send(room, body="Hi!", tok=self.other_access_token)
  67. event_id = response["event_id"]
  68. return event_id
  69. def test_send_push_single_worker(self) -> None:
  70. """Test that registration works when using a pusher worker."""
  71. http_client_mock = Mock(spec_set=["post_json_get_json"])
  72. http_client_mock.post_json_get_json.side_effect = (
  73. lambda *_, **__: defer.succeed({})
  74. )
  75. self.make_worker_hs(
  76. "synapse.app.generic_worker",
  77. {"worker_name": "pusher1", "pusher_instances": ["pusher1"]},
  78. proxied_blacklisted_http_client=http_client_mock,
  79. )
  80. event_id = self._create_pusher_and_send_msg("user")
  81. # Advance time a bit, so the pusher will register something has happened
  82. self.pump()
  83. http_client_mock.post_json_get_json.assert_called_once()
  84. self.assertEqual(
  85. http_client_mock.post_json_get_json.call_args[0][0],
  86. "https://push.example.com/_matrix/push/v1/notify",
  87. )
  88. self.assertEqual(
  89. event_id,
  90. http_client_mock.post_json_get_json.call_args[0][1]["notification"][
  91. "event_id"
  92. ],
  93. )
  94. def test_send_push_multiple_workers(self) -> None:
  95. """Test that registration works when using sharded pusher workers."""
  96. http_client_mock1 = Mock(spec_set=["post_json_get_json"])
  97. http_client_mock1.post_json_get_json.side_effect = (
  98. lambda *_, **__: defer.succeed({})
  99. )
  100. self.make_worker_hs(
  101. "synapse.app.generic_worker",
  102. {
  103. "worker_name": "pusher1",
  104. "pusher_instances": ["pusher1", "pusher2"],
  105. },
  106. proxied_blacklisted_http_client=http_client_mock1,
  107. )
  108. http_client_mock2 = Mock(spec_set=["post_json_get_json"])
  109. http_client_mock2.post_json_get_json.side_effect = (
  110. lambda *_, **__: defer.succeed({})
  111. )
  112. self.make_worker_hs(
  113. "synapse.app.generic_worker",
  114. {
  115. "worker_name": "pusher2",
  116. "pusher_instances": ["pusher1", "pusher2"],
  117. },
  118. proxied_blacklisted_http_client=http_client_mock2,
  119. )
  120. # We choose a user name that we know should go to pusher1.
  121. event_id = self._create_pusher_and_send_msg("user2")
  122. # Advance time a bit, so the pusher will register something has happened
  123. self.pump()
  124. http_client_mock1.post_json_get_json.assert_called_once()
  125. http_client_mock2.post_json_get_json.assert_not_called()
  126. self.assertEqual(
  127. http_client_mock1.post_json_get_json.call_args[0][0],
  128. "https://push.example.com/_matrix/push/v1/notify",
  129. )
  130. self.assertEqual(
  131. event_id,
  132. http_client_mock1.post_json_get_json.call_args[0][1]["notification"][
  133. "event_id"
  134. ],
  135. )
  136. http_client_mock1.post_json_get_json.reset_mock()
  137. http_client_mock2.post_json_get_json.reset_mock()
  138. # Now we choose a user name that we know should go to pusher2.
  139. event_id = self._create_pusher_and_send_msg("user4")
  140. # Advance time a bit, so the pusher will register something has happened
  141. self.pump()
  142. http_client_mock1.post_json_get_json.assert_not_called()
  143. http_client_mock2.post_json_get_json.assert_called_once()
  144. self.assertEqual(
  145. http_client_mock2.post_json_get_json.call_args[0][0],
  146. "https://push.example.com/_matrix/push/v1/notify",
  147. )
  148. self.assertEqual(
  149. event_id,
  150. http_client_mock2.post_json_get_json.call_args[0][1]["notification"][
  151. "event_id"
  152. ],
  153. )