pushers.py 1.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. from typing import TYPE_CHECKING
  17. from synapse.replication.tcp.streams import PushersStream
  18. from synapse.storage.database import DatabasePool
  19. from synapse.storage.databases.main.pusher import PusherWorkerStore
  20. from synapse.storage.types import Connection
  21. from ._base import BaseSlavedStore
  22. from ._slaved_id_tracker import SlavedIdTracker
  23. if TYPE_CHECKING:
  24. from synapse.app.homeserver import HomeServer
  25. class SlavedPusherStore(PusherWorkerStore, BaseSlavedStore):
  26. def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
  27. super().__init__(database, db_conn, hs)
  28. self._pushers_id_gen = SlavedIdTracker( # type: ignore
  29. db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
  30. )
  31. def get_pushers_stream_token(self) -> int:
  32. return self._pushers_id_gen.get_current_token()
  33. def process_replication_rows(
  34. self, stream_name: str, instance_name: str, token, rows
  35. ) -> None:
  36. if stream_name == PushersStream.NAME:
  37. self._pushers_id_gen.advance(instance_name, token) # type: ignore
  38. return super().process_replication_rows(stream_name, instance_name, token, rows)