state_deltas.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 Vector Creations Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from synapse.storage._base import SQLBaseStore
  17. logger = logging.getLogger(__name__)
  18. class StateDeltasStore(SQLBaseStore):
  19. def get_current_state_deltas(self, prev_stream_id: int, max_stream_id: int):
  20. """Fetch a list of room state changes since the given stream id
  21. Each entry in the result contains the following fields:
  22. - stream_id (int)
  23. - room_id (str)
  24. - type (str): event type
  25. - state_key (str):
  26. - event_id (str|None): new event_id for this state key. None if the
  27. state has been deleted.
  28. - prev_event_id (str|None): previous event_id for this state key. None
  29. if it's new state.
  30. Args:
  31. prev_stream_id (int): point to get changes since (exclusive)
  32. max_stream_id (int): the point that we know has been correctly persisted
  33. - ie, an upper limit to return changes from.
  34. Returns:
  35. Deferred[tuple[int, list[dict]]: A tuple consisting of:
  36. - the stream id which these results go up to
  37. - list of current_state_delta_stream rows. If it is empty, we are
  38. up to date.
  39. """
  40. prev_stream_id = int(prev_stream_id)
  41. # check we're not going backwards
  42. assert prev_stream_id <= max_stream_id
  43. if not self._curr_state_delta_stream_cache.has_any_entity_changed(
  44. prev_stream_id
  45. ):
  46. # if the CSDs haven't changed between prev_stream_id and now, we
  47. # know for certain that they haven't changed between prev_stream_id and
  48. # max_stream_id.
  49. return max_stream_id, []
  50. def get_current_state_deltas_txn(txn):
  51. # First we calculate the max stream id that will give us less than
  52. # N results.
  53. # We arbitarily limit to 100 stream_id entries to ensure we don't
  54. # select toooo many.
  55. sql = """
  56. SELECT stream_id, count(*)
  57. FROM current_state_delta_stream
  58. WHERE stream_id > ? AND stream_id <= ?
  59. GROUP BY stream_id
  60. ORDER BY stream_id ASC
  61. LIMIT 100
  62. """
  63. txn.execute(sql, (prev_stream_id, max_stream_id))
  64. total = 0
  65. for stream_id, count in txn:
  66. total += count
  67. if total > 100:
  68. # We arbitarily limit to 100 entries to ensure we don't
  69. # select toooo many.
  70. logger.debug(
  71. "Clipping current_state_delta_stream rows to stream_id %i",
  72. stream_id,
  73. )
  74. clipped_stream_id = stream_id
  75. break
  76. else:
  77. # if there's no problem, we may as well go right up to the max_stream_id
  78. clipped_stream_id = max_stream_id
  79. # Now actually get the deltas
  80. sql = """
  81. SELECT stream_id, room_id, type, state_key, event_id, prev_event_id
  82. FROM current_state_delta_stream
  83. WHERE ? < stream_id AND stream_id <= ?
  84. ORDER BY stream_id ASC
  85. """
  86. txn.execute(sql, (prev_stream_id, clipped_stream_id))
  87. return clipped_stream_id, self.db.cursor_to_dict(txn)
  88. return self.db.runInteraction(
  89. "get_current_state_deltas", get_current_state_deltas_txn
  90. )
  91. def _get_max_stream_id_in_current_state_deltas_txn(self, txn):
  92. return self.db.simple_select_one_onecol_txn(
  93. txn,
  94. table="current_state_delta_stream",
  95. keyvalues={},
  96. retcol="COALESCE(MAX(stream_id), -1)",
  97. )
  98. def get_max_stream_id_in_current_state_deltas(self):
  99. return self.db.runInteraction(
  100. "get_max_stream_id_in_current_state_deltas",
  101. self._get_max_stream_id_in_current_state_deltas_txn,
  102. )