events.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer
  16. from synapse.types import StreamToken
  17. from synapse.handlers.presence import PresenceEventSource
  18. from synapse.handlers.room import RoomEventSource
  19. from synapse.handlers.typing import TypingNotificationEventSource
  20. from synapse.handlers.receipts import ReceiptEventSource
  21. from synapse.handlers.account_data import AccountDataEventSource
  22. class EventSources(object):
  23. SOURCE_TYPES = {
  24. "room": RoomEventSource,
  25. "presence": PresenceEventSource,
  26. "typing": TypingNotificationEventSource,
  27. "receipt": ReceiptEventSource,
  28. "account_data": AccountDataEventSource,
  29. }
  30. def __init__(self, hs):
  31. self.sources = {
  32. name: cls(hs)
  33. for name, cls in EventSources.SOURCE_TYPES.items()
  34. }
  35. self.store = hs.get_datastore()
  36. @defer.inlineCallbacks
  37. def get_current_token(self):
  38. push_rules_key, _ = self.store.get_push_rules_stream_token()
  39. to_device_key = self.store.get_to_device_stream_token()
  40. device_list_key = self.store.get_device_stream_token()
  41. groups_key = self.store.get_group_stream_token()
  42. token = StreamToken(
  43. room_key=(
  44. yield self.sources["room"].get_current_key()
  45. ),
  46. presence_key=(
  47. yield self.sources["presence"].get_current_key()
  48. ),
  49. typing_key=(
  50. yield self.sources["typing"].get_current_key()
  51. ),
  52. receipt_key=(
  53. yield self.sources["receipt"].get_current_key()
  54. ),
  55. account_data_key=(
  56. yield self.sources["account_data"].get_current_key()
  57. ),
  58. push_rules_key=push_rules_key,
  59. to_device_key=to_device_key,
  60. device_list_key=device_list_key,
  61. groups_key=groups_key,
  62. )
  63. defer.returnValue(token)
  64. @defer.inlineCallbacks
  65. def get_current_token_for_room(self, room_id):
  66. push_rules_key, _ = self.store.get_push_rules_stream_token()
  67. to_device_key = self.store.get_to_device_stream_token()
  68. device_list_key = self.store.get_device_stream_token()
  69. groups_key = self.store.get_group_stream_token()
  70. token = StreamToken(
  71. room_key=(
  72. yield self.sources["room"].get_current_key_for_room(room_id)
  73. ),
  74. presence_key=(
  75. yield self.sources["presence"].get_current_key()
  76. ),
  77. typing_key=(
  78. yield self.sources["typing"].get_current_key()
  79. ),
  80. receipt_key=(
  81. yield self.sources["receipt"].get_current_key()
  82. ),
  83. account_data_key=(
  84. yield self.sources["account_data"].get_current_key()
  85. ),
  86. push_rules_key=push_rules_key,
  87. to_device_key=to_device_key,
  88. device_list_key=device_list_key,
  89. groups_key=groups_key,
  90. )
  91. defer.returnValue(token)