test_to_device.py 3.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889
  1. # Copyright 2023 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import synapse
  16. from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
  17. from synapse.types import JsonDict
  18. from tests.replication._base import BaseStreamTestCase
  19. logger = logging.getLogger(__name__)
  20. class ToDeviceStreamTestCase(BaseStreamTestCase):
  21. servlets = [
  22. synapse.rest.admin.register_servlets,
  23. synapse.rest.client.login.register_servlets,
  24. ]
  25. def test_to_device_stream(self) -> None:
  26. store = self.hs.get_datastores().main
  27. user1 = self.register_user("user1", "pass")
  28. self.login("user1", "pass", "device")
  29. user2 = self.register_user("user2", "pass")
  30. self.login("user2", "pass", "device")
  31. # connect to pull the updates related to users creation/login
  32. self.reconnect()
  33. self.replicate()
  34. self.test_handler.received_rdata_rows.clear()
  35. # disconnect so we can accumulate the updates without pulling them
  36. self.disconnect()
  37. msg: JsonDict = {}
  38. msg["sender"] = "@sender:example.org"
  39. msg["type"] = "m.new_device"
  40. # add messages to the device inbox for user1 up until the
  41. # limit defined for a stream update batch
  42. for i in range(0, _STREAM_UPDATE_TARGET_ROW_COUNT):
  43. msg["content"] = {"device": {}}
  44. messages = {user1: {"device": msg}}
  45. self.get_success(
  46. store.add_messages_from_remote_to_device_inbox(
  47. "example.org",
  48. f"{i}",
  49. messages,
  50. )
  51. )
  52. # add one more message, for user2 this time
  53. # this message would be dropped before fixing #15335
  54. msg["content"] = {"device": {}}
  55. messages = {user2: {"device": msg}}
  56. self.get_success(
  57. store.add_messages_from_remote_to_device_inbox(
  58. "example.org",
  59. f"{_STREAM_UPDATE_TARGET_ROW_COUNT}",
  60. messages,
  61. )
  62. )
  63. # replication is disconnected so we shouldn't get any updates yet
  64. self.assertEqual([], self.test_handler.received_rdata_rows)
  65. # now reconnect to pull the updates
  66. self.reconnect()
  67. self.replicate()
  68. # we should receive the fact that we have to_device updates
  69. # for user1 and user2
  70. received_rows = self.test_handler.received_rdata_rows
  71. self.assertEqual(len(received_rows), 2)
  72. self.assertEqual(received_rows[0][2].entity, user1)
  73. self.assertEqual(received_rows[1][2].entity, user2)