streams.py 3.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394
  1. # Copyright 2020 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. from typing import TYPE_CHECKING, Tuple
  16. from twisted.web.server import Request
  17. from synapse.api.errors import SynapseError
  18. from synapse.http.server import HttpServer
  19. from synapse.http.servlet import parse_integer
  20. from synapse.replication.http._base import ReplicationEndpoint
  21. from synapse.types import JsonDict
  22. if TYPE_CHECKING:
  23. from synapse.server import HomeServer
  24. logger = logging.getLogger(__name__)
  25. class ReplicationGetStreamUpdates(ReplicationEndpoint):
  26. """Fetches stream updates from a server. Used for streams not persisted to
  27. the database, e.g. typing notifications.
  28. The API looks like:
  29. GET /_synapse/replication/get_repl_stream_updates/<stream name>?from_token=0&to_token=10
  30. 200 OK
  31. {
  32. updates: [ ... ],
  33. upto_token: 10,
  34. limited: False,
  35. }
  36. If there are more rows than can sensibly be returned in one lump, `limited` will be
  37. set to true, and the caller should call again with a new `from_token`.
  38. """
  39. NAME = "get_repl_stream_updates"
  40. PATH_ARGS = ("stream_name",)
  41. METHOD = "GET"
  42. # We don't want to wait for replication streams to catch up, as this gets
  43. # called in the process of catching replication streams up.
  44. WAIT_FOR_STREAMS = False
  45. def __init__(self, hs: "HomeServer"):
  46. super().__init__(hs)
  47. self._instance_name = hs.get_instance_name()
  48. self.streams = hs.get_replication_streams()
  49. @staticmethod
  50. async def _serialize_payload( # type: ignore[override]
  51. stream_name: str, from_token: int, upto_token: int
  52. ) -> JsonDict:
  53. return {"from_token": from_token, "upto_token": upto_token}
  54. async def _handle_request( # type: ignore[override]
  55. self, request: Request, content: JsonDict, stream_name: str
  56. ) -> Tuple[int, JsonDict]:
  57. stream = self.streams.get(stream_name)
  58. if stream is None:
  59. raise SynapseError(400, "Unknown stream")
  60. from_token = parse_integer(request, "from_token", required=True)
  61. upto_token = parse_integer(request, "upto_token", required=True)
  62. updates, upto_token, limited = await stream.get_updates_since(
  63. self._instance_name, from_token, upto_token
  64. )
  65. return (
  66. 200,
  67. {"updates": updates, "upto_token": upto_token, "limited": limited},
  68. )
  69. def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
  70. ReplicationGetStreamUpdates(hs).register(http_server)