streams.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2017 Vector Creations Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. """Defines all the valid streams that clients can subscribe to, and the format
  16. of the rows returned by each stream.
  17. Each stream is defined by the following information:
  18. stream name: The name of the stream
  19. row type: The type that is used to serialise/deserialse the row
  20. current_token: The function that returns the current token for the stream
  21. update_function: The function that returns a list of updates between two tokens
  22. """
  23. import logging
  24. from collections import namedtuple
  25. from twisted.internet import defer
  26. logger = logging.getLogger(__name__)
  27. MAX_EVENTS_BEHIND = 10000
  28. EventStreamRow = namedtuple("EventStreamRow", (
  29. "event_id", # str
  30. "room_id", # str
  31. "type", # str
  32. "state_key", # str, optional
  33. "redacts", # str, optional
  34. ))
  35. BackfillStreamRow = namedtuple("BackfillStreamRow", (
  36. "event_id", # str
  37. "room_id", # str
  38. "type", # str
  39. "state_key", # str, optional
  40. "redacts", # str, optional
  41. ))
  42. PresenceStreamRow = namedtuple("PresenceStreamRow", (
  43. "user_id", # str
  44. "state", # str
  45. "last_active_ts", # int
  46. "last_federation_update_ts", # int
  47. "last_user_sync_ts", # int
  48. "status_msg", # str
  49. "currently_active", # bool
  50. ))
  51. TypingStreamRow = namedtuple("TypingStreamRow", (
  52. "room_id", # str
  53. "user_ids", # list(str)
  54. ))
  55. ReceiptsStreamRow = namedtuple("ReceiptsStreamRow", (
  56. "room_id", # str
  57. "receipt_type", # str
  58. "user_id", # str
  59. "event_id", # str
  60. "data", # dict
  61. ))
  62. PushRulesStreamRow = namedtuple("PushRulesStreamRow", (
  63. "user_id", # str
  64. ))
  65. PushersStreamRow = namedtuple("PushersStreamRow", (
  66. "user_id", # str
  67. "app_id", # str
  68. "pushkey", # str
  69. "deleted", # bool
  70. ))
  71. CachesStreamRow = namedtuple("CachesStreamRow", (
  72. "cache_func", # str
  73. "keys", # list(str)
  74. "invalidation_ts", # int
  75. ))
  76. PublicRoomsStreamRow = namedtuple("PublicRoomsStreamRow", (
  77. "room_id", # str
  78. "visibility", # str
  79. "appservice_id", # str, optional
  80. "network_id", # str, optional
  81. ))
  82. DeviceListsStreamRow = namedtuple("DeviceListsStreamRow", (
  83. "user_id", # str
  84. "destination", # str
  85. ))
  86. ToDeviceStreamRow = namedtuple("ToDeviceStreamRow", (
  87. "entity", # str
  88. ))
  89. FederationStreamRow = namedtuple("FederationStreamRow", (
  90. "type", # str, the type of data as defined in the BaseFederationRows
  91. "data", # dict, serialization of a federation.send_queue.BaseFederationRow
  92. ))
  93. TagAccountDataStreamRow = namedtuple("TagAccountDataStreamRow", (
  94. "user_id", # str
  95. "room_id", # str
  96. "data", # dict
  97. ))
  98. AccountDataStreamRow = namedtuple("AccountDataStream", (
  99. "user_id", # str
  100. "room_id", # str
  101. "data_type", # str
  102. "data", # dict
  103. ))
  104. CurrentStateDeltaStreamRow = namedtuple("CurrentStateDeltaStream", (
  105. "room_id", # str
  106. "type", # str
  107. "state_key", # str
  108. "event_id", # str, optional
  109. ))
  110. GroupsStreamRow = namedtuple("GroupsStreamRow", (
  111. "group_id", # str
  112. "user_id", # str
  113. "type", # str
  114. "content", # dict
  115. ))
  116. class Stream(object):
  117. """Base class for the streams.
  118. Provides a `get_updates()` function that returns new updates since the last
  119. time it was called up until the point `advance_current_token` was called.
  120. """
  121. NAME = None # The name of the stream
  122. ROW_TYPE = None # The type of the row
  123. _LIMITED = True # Whether the update function takes a limit
  124. def __init__(self, hs):
  125. # The token from which we last asked for updates
  126. self.last_token = self.current_token()
  127. # The token that we will get updates up to
  128. self.upto_token = self.current_token()
  129. def advance_current_token(self):
  130. """Updates `upto_token` to "now", which updates up until which point
  131. get_updates[_since] will fetch rows till.
  132. """
  133. self.upto_token = self.current_token()
  134. def discard_updates_and_advance(self):
  135. """Called when the stream should advance but the updates would be discarded,
  136. e.g. when there are no currently connected workers.
  137. """
  138. self.upto_token = self.current_token()
  139. self.last_token = self.upto_token
  140. @defer.inlineCallbacks
  141. def get_updates(self):
  142. """Gets all updates since the last time this function was called (or
  143. since the stream was constructed if it hadn't been called before),
  144. until the `upto_token`
  145. Returns:
  146. (list(ROW_TYPE), int): list of updates plus the token used as an
  147. upper bound of the updates (i.e. the "current token")
  148. """
  149. updates, current_token = yield self.get_updates_since(self.last_token)
  150. self.last_token = current_token
  151. defer.returnValue((updates, current_token))
  152. @defer.inlineCallbacks
  153. def get_updates_since(self, from_token):
  154. """Like get_updates except allows specifying from when we should
  155. stream updates
  156. Returns:
  157. (list(ROW_TYPE), int): list of updates plus the token used as an
  158. upper bound of the updates (i.e. the "current token")
  159. """
  160. if from_token in ("NOW", "now"):
  161. defer.returnValue(([], self.upto_token))
  162. current_token = self.upto_token
  163. from_token = int(from_token)
  164. if from_token == current_token:
  165. defer.returnValue(([], current_token))
  166. if self._LIMITED:
  167. rows = yield self.update_function(
  168. from_token, current_token,
  169. limit=MAX_EVENTS_BEHIND + 1,
  170. )
  171. if len(rows) >= MAX_EVENTS_BEHIND:
  172. raise Exception("stream %s has fallen behind" % (self.NAME))
  173. else:
  174. rows = yield self.update_function(
  175. from_token, current_token,
  176. )
  177. updates = [(row[0], self.ROW_TYPE(*row[1:])) for row in rows]
  178. defer.returnValue((updates, current_token))
  179. def current_token(self):
  180. """Gets the current token of the underlying streams. Should be provided
  181. by the sub classes
  182. Returns:
  183. int
  184. """
  185. raise NotImplementedError()
  186. def update_function(self, from_token, current_token, limit=None):
  187. """Get updates between from_token and to_token. If Stream._LIMITED is
  188. True then limit is provided, otherwise it's not.
  189. Returns:
  190. Deferred(list(tuple)): the first entry in the tuple is the token for
  191. that update, and the rest of the tuple gets used to construct
  192. a ``ROW_TYPE`` instance
  193. """
  194. raise NotImplementedError()
  195. class EventsStream(Stream):
  196. """We received a new event, or an event went from being an outlier to not
  197. """
  198. NAME = "events"
  199. ROW_TYPE = EventStreamRow
  200. def __init__(self, hs):
  201. store = hs.get_datastore()
  202. self.current_token = store.get_current_events_token
  203. self.update_function = store.get_all_new_forward_event_rows
  204. super(EventsStream, self).__init__(hs)
  205. class BackfillStream(Stream):
  206. """We fetched some old events and either we had never seen that event before
  207. or it went from being an outlier to not.
  208. """
  209. NAME = "backfill"
  210. ROW_TYPE = BackfillStreamRow
  211. def __init__(self, hs):
  212. store = hs.get_datastore()
  213. self.current_token = store.get_current_backfill_token
  214. self.update_function = store.get_all_new_backfill_event_rows
  215. super(BackfillStream, self).__init__(hs)
  216. class PresenceStream(Stream):
  217. NAME = "presence"
  218. _LIMITED = False
  219. ROW_TYPE = PresenceStreamRow
  220. def __init__(self, hs):
  221. store = hs.get_datastore()
  222. presence_handler = hs.get_presence_handler()
  223. self.current_token = store.get_current_presence_token
  224. self.update_function = presence_handler.get_all_presence_updates
  225. super(PresenceStream, self).__init__(hs)
  226. class TypingStream(Stream):
  227. NAME = "typing"
  228. _LIMITED = False
  229. ROW_TYPE = TypingStreamRow
  230. def __init__(self, hs):
  231. typing_handler = hs.get_typing_handler()
  232. self.current_token = typing_handler.get_current_token
  233. self.update_function = typing_handler.get_all_typing_updates
  234. super(TypingStream, self).__init__(hs)
  235. class ReceiptsStream(Stream):
  236. NAME = "receipts"
  237. ROW_TYPE = ReceiptsStreamRow
  238. def __init__(self, hs):
  239. store = hs.get_datastore()
  240. self.current_token = store.get_max_receipt_stream_id
  241. self.update_function = store.get_all_updated_receipts
  242. super(ReceiptsStream, self).__init__(hs)
  243. class PushRulesStream(Stream):
  244. """A user has changed their push rules
  245. """
  246. NAME = "push_rules"
  247. ROW_TYPE = PushRulesStreamRow
  248. def __init__(self, hs):
  249. self.store = hs.get_datastore()
  250. super(PushRulesStream, self).__init__(hs)
  251. def current_token(self):
  252. push_rules_token, _ = self.store.get_push_rules_stream_token()
  253. return push_rules_token
  254. @defer.inlineCallbacks
  255. def update_function(self, from_token, to_token, limit):
  256. rows = yield self.store.get_all_push_rule_updates(from_token, to_token, limit)
  257. defer.returnValue([(row[0], row[2]) for row in rows])
  258. class PushersStream(Stream):
  259. """A user has added/changed/removed a pusher
  260. """
  261. NAME = "pushers"
  262. ROW_TYPE = PushersStreamRow
  263. def __init__(self, hs):
  264. store = hs.get_datastore()
  265. self.current_token = store.get_pushers_stream_token
  266. self.update_function = store.get_all_updated_pushers_rows
  267. super(PushersStream, self).__init__(hs)
  268. class CachesStream(Stream):
  269. """A cache was invalidated on the master and no other stream would invalidate
  270. the cache on the workers
  271. """
  272. NAME = "caches"
  273. ROW_TYPE = CachesStreamRow
  274. def __init__(self, hs):
  275. store = hs.get_datastore()
  276. self.current_token = store.get_cache_stream_token
  277. self.update_function = store.get_all_updated_caches
  278. super(CachesStream, self).__init__(hs)
  279. class PublicRoomsStream(Stream):
  280. """The public rooms list changed
  281. """
  282. NAME = "public_rooms"
  283. ROW_TYPE = PublicRoomsStreamRow
  284. def __init__(self, hs):
  285. store = hs.get_datastore()
  286. self.current_token = store.get_current_public_room_stream_id
  287. self.update_function = store.get_all_new_public_rooms
  288. super(PublicRoomsStream, self).__init__(hs)
  289. class DeviceListsStream(Stream):
  290. """Someone added/changed/removed a device
  291. """
  292. NAME = "device_lists"
  293. _LIMITED = False
  294. ROW_TYPE = DeviceListsStreamRow
  295. def __init__(self, hs):
  296. store = hs.get_datastore()
  297. self.current_token = store.get_device_stream_token
  298. self.update_function = store.get_all_device_list_changes_for_remotes
  299. super(DeviceListsStream, self).__init__(hs)
  300. class ToDeviceStream(Stream):
  301. """New to_device messages for a client
  302. """
  303. NAME = "to_device"
  304. ROW_TYPE = ToDeviceStreamRow
  305. def __init__(self, hs):
  306. store = hs.get_datastore()
  307. self.current_token = store.get_to_device_stream_token
  308. self.update_function = store.get_all_new_device_messages
  309. super(ToDeviceStream, self).__init__(hs)
  310. class FederationStream(Stream):
  311. """Data to be sent over federation. Only available when master has federation
  312. sending disabled.
  313. """
  314. NAME = "federation"
  315. ROW_TYPE = FederationStreamRow
  316. def __init__(self, hs):
  317. federation_sender = hs.get_federation_sender()
  318. self.current_token = federation_sender.get_current_token
  319. self.update_function = federation_sender.get_replication_rows
  320. super(FederationStream, self).__init__(hs)
  321. class TagAccountDataStream(Stream):
  322. """Someone added/removed a tag for a room
  323. """
  324. NAME = "tag_account_data"
  325. ROW_TYPE = TagAccountDataStreamRow
  326. def __init__(self, hs):
  327. store = hs.get_datastore()
  328. self.current_token = store.get_max_account_data_stream_id
  329. self.update_function = store.get_all_updated_tags
  330. super(TagAccountDataStream, self).__init__(hs)
  331. class AccountDataStream(Stream):
  332. """Global or per room account data was changed
  333. """
  334. NAME = "account_data"
  335. ROW_TYPE = AccountDataStreamRow
  336. def __init__(self, hs):
  337. self.store = hs.get_datastore()
  338. self.current_token = self.store.get_max_account_data_stream_id
  339. super(AccountDataStream, self).__init__(hs)
  340. @defer.inlineCallbacks
  341. def update_function(self, from_token, to_token, limit):
  342. global_results, room_results = yield self.store.get_all_updated_account_data(
  343. from_token, from_token, to_token, limit
  344. )
  345. results = list(room_results)
  346. results.extend(
  347. (stream_id, user_id, None, account_data_type, content,)
  348. for stream_id, user_id, account_data_type, content in global_results
  349. )
  350. defer.returnValue(results)
  351. class CurrentStateDeltaStream(Stream):
  352. """Current state for a room was changed
  353. """
  354. NAME = "current_state_deltas"
  355. ROW_TYPE = CurrentStateDeltaStreamRow
  356. def __init__(self, hs):
  357. store = hs.get_datastore()
  358. self.current_token = store.get_max_current_state_delta_stream_id
  359. self.update_function = store.get_all_updated_current_state_deltas
  360. super(CurrentStateDeltaStream, self).__init__(hs)
  361. class GroupServerStream(Stream):
  362. NAME = "groups"
  363. ROW_TYPE = GroupsStreamRow
  364. def __init__(self, hs):
  365. store = hs.get_datastore()
  366. self.current_token = store.get_group_stream_token
  367. self.update_function = store.get_all_groups_changes
  368. super(GroupServerStream, self).__init__(hs)
  369. STREAMS_MAP = {
  370. stream.NAME: stream
  371. for stream in (
  372. EventsStream,
  373. BackfillStream,
  374. PresenceStream,
  375. TypingStream,
  376. ReceiptsStream,
  377. PushRulesStream,
  378. PushersStream,
  379. CachesStream,
  380. PublicRoomsStream,
  381. DeviceListsStream,
  382. ToDeviceStream,
  383. FederationStream,
  384. TagAccountDataStream,
  385. AccountDataStream,
  386. CurrentStateDeltaStream,
  387. GroupServerStream,
  388. )
  389. }