test_id_generators.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2020 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from synapse.storage.database import Database
  16. from synapse.storage.util.id_generators import MultiWriterIdGenerator
  17. from tests.unittest import HomeserverTestCase
  18. from tests.utils import USE_POSTGRES_FOR_TESTS
  19. class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
  20. if not USE_POSTGRES_FOR_TESTS:
  21. skip = "Requires Postgres"
  22. def prepare(self, reactor, clock, hs):
  23. self.store = hs.get_datastore()
  24. self.db = self.store.db # type: Database
  25. self.get_success(self.db.runInteraction("_setup_db", self._setup_db))
  26. def _setup_db(self, txn):
  27. txn.execute("CREATE SEQUENCE foobar_seq")
  28. txn.execute(
  29. """
  30. CREATE TABLE foobar (
  31. stream_id BIGINT NOT NULL,
  32. instance_name TEXT NOT NULL,
  33. data TEXT
  34. );
  35. """
  36. )
  37. def _create_id_generator(self, instance_name="master") -> MultiWriterIdGenerator:
  38. def _create(conn):
  39. return MultiWriterIdGenerator(
  40. conn,
  41. self.db,
  42. instance_name=instance_name,
  43. table="foobar",
  44. instance_column="instance_name",
  45. id_column="stream_id",
  46. sequence_name="foobar_seq",
  47. )
  48. return self.get_success(self.db.runWithConnection(_create))
  49. def _insert_rows(self, instance_name: str, number: int):
  50. def _insert(txn):
  51. for _ in range(number):
  52. txn.execute(
  53. "INSERT INTO foobar VALUES (nextval('foobar_seq'), ?)",
  54. (instance_name,),
  55. )
  56. self.get_success(self.db.runInteraction("test_single_instance", _insert))
  57. def test_empty(self):
  58. """Test an ID generator against an empty database gives sensible
  59. current positions.
  60. """
  61. id_gen = self._create_id_generator()
  62. # The table is empty so we expect an empty map for positions
  63. self.assertEqual(id_gen.get_positions(), {})
  64. def test_single_instance(self):
  65. """Test that reads and writes from a single process are handled
  66. correctly.
  67. """
  68. # Prefill table with 7 rows written by 'master'
  69. self._insert_rows("master", 7)
  70. id_gen = self._create_id_generator()
  71. self.assertEqual(id_gen.get_positions(), {"master": 7})
  72. self.assertEqual(id_gen.get_current_token("master"), 7)
  73. # Try allocating a new ID gen and check that we only see position
  74. # advanced after we leave the context manager.
  75. async def _get_next_async():
  76. with await id_gen.get_next() as stream_id:
  77. self.assertEqual(stream_id, 8)
  78. self.assertEqual(id_gen.get_positions(), {"master": 7})
  79. self.assertEqual(id_gen.get_current_token("master"), 7)
  80. self.get_success(_get_next_async())
  81. self.assertEqual(id_gen.get_positions(), {"master": 8})
  82. self.assertEqual(id_gen.get_current_token("master"), 8)
  83. def test_multi_instance(self):
  84. """Test that reads and writes from multiple processes are handled
  85. correctly.
  86. """
  87. self._insert_rows("first", 3)
  88. self._insert_rows("second", 4)
  89. first_id_gen = self._create_id_generator("first")
  90. second_id_gen = self._create_id_generator("second")
  91. self.assertEqual(first_id_gen.get_positions(), {"first": 3, "second": 7})
  92. self.assertEqual(first_id_gen.get_current_token("first"), 3)
  93. self.assertEqual(first_id_gen.get_current_token("second"), 7)
  94. # Try allocating a new ID gen and check that we only see position
  95. # advanced after we leave the context manager.
  96. async def _get_next_async():
  97. with await first_id_gen.get_next() as stream_id:
  98. self.assertEqual(stream_id, 8)
  99. self.assertEqual(
  100. first_id_gen.get_positions(), {"first": 3, "second": 7}
  101. )
  102. self.get_success(_get_next_async())
  103. self.assertEqual(first_id_gen.get_positions(), {"first": 8, "second": 7})
  104. # However the ID gen on the second instance won't have seen the update
  105. self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 7})
  106. # ... but calling `get_next` on the second instance should give a unique
  107. # stream ID
  108. async def _get_next_async():
  109. with await second_id_gen.get_next() as stream_id:
  110. self.assertEqual(stream_id, 9)
  111. self.assertEqual(
  112. second_id_gen.get_positions(), {"first": 3, "second": 7}
  113. )
  114. self.get_success(_get_next_async())
  115. self.assertEqual(second_id_gen.get_positions(), {"first": 3, "second": 9})
  116. # If the second ID gen gets told about the first, it correctly updates
  117. second_id_gen.advance("first", 8)
  118. self.assertEqual(second_id_gen.get_positions(), {"first": 8, "second": 9})
  119. def test_get_next_txn(self):
  120. """Test that the `get_next_txn` function works correctly.
  121. """
  122. # Prefill table with 7 rows written by 'master'
  123. self._insert_rows("master", 7)
  124. id_gen = self._create_id_generator()
  125. self.assertEqual(id_gen.get_positions(), {"master": 7})
  126. self.assertEqual(id_gen.get_current_token("master"), 7)
  127. # Try allocating a new ID gen and check that we only see position
  128. # advanced after we leave the context manager.
  129. def _get_next_txn(txn):
  130. stream_id = id_gen.get_next_txn(txn)
  131. self.assertEqual(stream_id, 8)
  132. self.assertEqual(id_gen.get_positions(), {"master": 7})
  133. self.assertEqual(id_gen.get_current_token("master"), 7)
  134. self.get_success(self.db.runInteraction("test", _get_next_txn))
  135. self.assertEqual(id_gen.get_positions(), {"master": 8})
  136. self.assertEqual(id_gen.get_current_token("master"), 8)