__init__.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import abc
  15. from typing import TYPE_CHECKING, Any, Dict, Optional
  16. import attr
  17. from synapse.types import JsonDict, RoomStreamToken
  18. if TYPE_CHECKING:
  19. from synapse.server import HomeServer
  20. @attr.s(slots=True, auto_attribs=True)
  21. class PusherConfig:
  22. """Parameters necessary to configure a pusher."""
  23. id: Optional[str]
  24. user_name: str
  25. access_token: Optional[int]
  26. profile_tag: str
  27. kind: str
  28. app_id: str
  29. app_display_name: str
  30. device_display_name: str
  31. pushkey: str
  32. ts: int
  33. lang: Optional[str]
  34. data: Optional[JsonDict]
  35. last_stream_ordering: int
  36. last_success: Optional[int]
  37. failing_since: Optional[int]
  38. def as_dict(self) -> Dict[str, Any]:
  39. """Information that can be retrieved about a pusher after creation."""
  40. return {
  41. "app_display_name": self.app_display_name,
  42. "app_id": self.app_id,
  43. "data": self.data,
  44. "device_display_name": self.device_display_name,
  45. "kind": self.kind,
  46. "lang": self.lang,
  47. "profile_tag": self.profile_tag,
  48. "pushkey": self.pushkey,
  49. }
  50. @attr.s(slots=True, auto_attribs=True)
  51. class ThrottleParams:
  52. """Parameters for controlling the rate of sending pushes via email."""
  53. last_sent_ts: int
  54. throttle_ms: int
  55. class Pusher(metaclass=abc.ABCMeta):
  56. def __init__(self, hs: "HomeServer", pusher_config: PusherConfig):
  57. self.hs = hs
  58. self.store = self.hs.get_datastores().main
  59. self.clock = self.hs.get_clock()
  60. self.pusher_id = pusher_config.id
  61. self.user_id = pusher_config.user_name
  62. self.app_id = pusher_config.app_id
  63. self.pushkey = pusher_config.pushkey
  64. self.last_stream_ordering = pusher_config.last_stream_ordering
  65. # This is the highest stream ordering we know it's safe to process.
  66. # When new events arrive, we'll be given a window of new events: we
  67. # should honour this rather than just looking for anything higher
  68. # because of potential out-of-order event serialisation.
  69. self.max_stream_ordering = self.store.get_room_max_stream_ordering()
  70. def on_new_notifications(self, max_token: RoomStreamToken) -> None:
  71. # We just use the minimum stream ordering and ignore the vector clock
  72. # component. This is safe to do as long as we *always* ignore the vector
  73. # clock components.
  74. max_stream_ordering = max_token.stream
  75. self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
  76. self._start_processing()
  77. @abc.abstractmethod
  78. def _start_processing(self) -> None:
  79. """Start processing push notifications."""
  80. raise NotImplementedError()
  81. @abc.abstractmethod
  82. def on_new_receipts(self, min_stream_id: int, max_stream_id: int) -> None:
  83. raise NotImplementedError()
  84. @abc.abstractmethod
  85. def on_started(self, have_notifs: bool) -> None:
  86. """Called when this pusher has been started.
  87. Args:
  88. should_check_for_notifs: Whether we should immediately
  89. check for push to send. Set to False only if it's known there
  90. is nothing to send
  91. """
  92. raise NotImplementedError()
  93. @abc.abstractmethod
  94. def on_stop(self) -> None:
  95. raise NotImplementedError()
  96. class PusherConfigException(Exception):
  97. """An error occurred when creating a pusher."""