test_account_data.py 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from synapse.replication.tcp.streams._base import (
  15. _STREAM_UPDATE_TARGET_ROW_COUNT,
  16. AccountDataStream,
  17. )
  18. from tests.replication._base import BaseStreamTestCase
  19. class AccountDataStreamTestCase(BaseStreamTestCase):
  20. def test_update_function_room_account_data_limit(self) -> None:
  21. """Test replication with many room account data updates"""
  22. store = self.hs.get_datastores().main
  23. # generate lots of account data updates
  24. updates = []
  25. for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 5):
  26. update = "m.test_type.%i" % (i,)
  27. self.get_success(
  28. store.add_account_data_to_room("test_user", "test_room", update, {})
  29. )
  30. updates.append(update)
  31. # also one global update
  32. self.get_success(store.add_account_data_for_user("test_user", "m.global", {}))
  33. # check we're testing what we think we are: no rows should yet have been
  34. # received
  35. self.assertEqual([], self.test_handler.received_rdata_rows)
  36. # now reconnect to pull the updates
  37. self.reconnect()
  38. self.replicate()
  39. # we should have received all the expected rows in the right order
  40. received_rows = self.test_handler.received_rdata_rows
  41. for t in updates:
  42. (stream_name, token, row) = received_rows.pop(0)
  43. self.assertEqual(stream_name, AccountDataStream.NAME)
  44. self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
  45. self.assertEqual(row.data_type, t)
  46. self.assertEqual(row.room_id, "test_room")
  47. (stream_name, token, row) = received_rows.pop(0)
  48. self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
  49. self.assertEqual(row.data_type, "m.global")
  50. self.assertIsNone(row.room_id)
  51. self.assertEqual([], received_rows)
  52. def test_update_function_global_account_data_limit(self) -> None:
  53. """Test replication with many global account data updates"""
  54. store = self.hs.get_datastores().main
  55. # generate lots of account data updates
  56. updates = []
  57. for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 5):
  58. update = "m.test_type.%i" % (i,)
  59. self.get_success(store.add_account_data_for_user("test_user", update, {}))
  60. updates.append(update)
  61. # also one per-room update
  62. self.get_success(
  63. store.add_account_data_to_room("test_user", "test_room", "m.per_room", {})
  64. )
  65. # tell the notifier to catch up to avoid duplicate rows.
  66. # workaround for https://github.com/matrix-org/synapse/issues/7360
  67. # FIXME remove this when the above is fixed
  68. self.replicate()
  69. # check we're testing what we think we are: no rows should yet have been
  70. # received
  71. self.assertEqual([], self.test_handler.received_rdata_rows)
  72. # now reconnect to pull the updates
  73. self.reconnect()
  74. self.replicate()
  75. # we should have received all the expected rows in the right order
  76. received_rows = self.test_handler.received_rdata_rows
  77. for t in updates:
  78. (stream_name, token, row) = received_rows.pop(0)
  79. self.assertEqual(stream_name, AccountDataStream.NAME)
  80. self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
  81. self.assertEqual(row.data_type, t)
  82. self.assertIsNone(row.room_id)
  83. (stream_name, token, row) = received_rows.pop(0)
  84. self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
  85. self.assertEqual(row.data_type, "m.per_room")
  86. self.assertEqual(row.room_id, "test_room")
  87. self.assertEqual([], received_rows)