stats.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from twisted.internet import defer
  17. from synapse.api.constants import EventTypes, JoinRules, Membership
  18. from synapse.handlers.state_deltas import StateDeltasHandler
  19. from synapse.metrics import event_processing_positions
  20. from synapse.metrics.background_process_metrics import run_as_background_process
  21. from synapse.types import UserID
  22. from synapse.util.metrics import Measure
  23. logger = logging.getLogger(__name__)
  24. class StatsHandler(StateDeltasHandler):
  25. """Handles keeping the *_stats tables updated with a simple time-series of
  26. information about the users, rooms and media on the server, such that admins
  27. have some idea of who is consuming their resources.
  28. Heavily derived from UserDirectoryHandler
  29. """
  30. def __init__(self, hs):
  31. super(StatsHandler, self).__init__(hs)
  32. self.hs = hs
  33. self.store = hs.get_datastore()
  34. self.state = hs.get_state_handler()
  35. self.server_name = hs.hostname
  36. self.clock = hs.get_clock()
  37. self.notifier = hs.get_notifier()
  38. self.is_mine_id = hs.is_mine_id
  39. self.stats_bucket_size = hs.config.stats_bucket_size
  40. # The current position in the current_state_delta stream
  41. self.pos = None
  42. # Guard to ensure we only process deltas one at a time
  43. self._is_processing = False
  44. if hs.config.stats_enabled:
  45. self.notifier.add_replication_callback(self.notify_new_event)
  46. # We kick this off so that we don't have to wait for a change before
  47. # we start populating stats
  48. self.clock.call_later(0, self.notify_new_event)
  49. def notify_new_event(self):
  50. """Called when there may be more deltas to process
  51. """
  52. if not self.hs.config.stats_enabled:
  53. return
  54. if self._is_processing:
  55. return
  56. @defer.inlineCallbacks
  57. def process():
  58. try:
  59. yield self._unsafe_process()
  60. finally:
  61. self._is_processing = False
  62. self._is_processing = True
  63. run_as_background_process("stats.notify_new_event", process)
  64. @defer.inlineCallbacks
  65. def _unsafe_process(self):
  66. # If self.pos is None then means we haven't fetched it from DB
  67. if self.pos is None:
  68. self.pos = yield self.store.get_stats_stream_pos()
  69. # If still None then the initial background update hasn't happened yet
  70. if self.pos is None:
  71. return None
  72. # Loop round handling deltas until we're up to date
  73. while True:
  74. with Measure(self.clock, "stats_delta"):
  75. deltas = yield self.store.get_current_state_deltas(self.pos)
  76. if not deltas:
  77. return
  78. logger.info("Handling %d state deltas", len(deltas))
  79. yield self._handle_deltas(deltas)
  80. self.pos = deltas[-1]["stream_id"]
  81. yield self.store.update_stats_stream_pos(self.pos)
  82. event_processing_positions.labels("stats").set(self.pos)
  83. @defer.inlineCallbacks
  84. def _handle_deltas(self, deltas):
  85. """
  86. Called with the state deltas to process
  87. """
  88. for delta in deltas:
  89. typ = delta["type"]
  90. state_key = delta["state_key"]
  91. room_id = delta["room_id"]
  92. event_id = delta["event_id"]
  93. stream_id = delta["stream_id"]
  94. prev_event_id = delta["prev_event_id"]
  95. stream_pos = delta["stream_id"]
  96. logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
  97. token = yield self.store.get_earliest_token_for_room_stats(room_id)
  98. # If the earliest token to begin from is larger than our current
  99. # stream ID, skip processing this delta.
  100. if token is not None and token >= stream_id:
  101. logger.debug(
  102. "Ignoring: %s as earlier than this room's initial ingestion event",
  103. event_id,
  104. )
  105. continue
  106. if event_id is None and prev_event_id is None:
  107. # Errr...
  108. continue
  109. event_content = {}
  110. if event_id is not None:
  111. event = yield self.store.get_event(event_id, allow_none=True)
  112. if event:
  113. event_content = event.content or {}
  114. # We use stream_pos here rather than fetch by event_id as event_id
  115. # may be None
  116. now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
  117. # quantise time to the nearest bucket
  118. now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
  119. if typ == EventTypes.Member:
  120. # we could use _get_key_change here but it's a bit inefficient
  121. # given we're not testing for a specific result; might as well
  122. # just grab the prev_membership and membership strings and
  123. # compare them.
  124. prev_event_content = {}
  125. if prev_event_id is not None:
  126. prev_event = yield self.store.get_event(
  127. prev_event_id, allow_none=True
  128. )
  129. if prev_event:
  130. prev_event_content = prev_event.content
  131. membership = event_content.get("membership", Membership.LEAVE)
  132. prev_membership = prev_event_content.get("membership", Membership.LEAVE)
  133. if prev_membership == membership:
  134. continue
  135. if prev_membership == Membership.JOIN:
  136. yield self.store.update_stats_delta(
  137. now, "room", room_id, "joined_members", -1
  138. )
  139. elif prev_membership == Membership.INVITE:
  140. yield self.store.update_stats_delta(
  141. now, "room", room_id, "invited_members", -1
  142. )
  143. elif prev_membership == Membership.LEAVE:
  144. yield self.store.update_stats_delta(
  145. now, "room", room_id, "left_members", -1
  146. )
  147. elif prev_membership == Membership.BAN:
  148. yield self.store.update_stats_delta(
  149. now, "room", room_id, "banned_members", -1
  150. )
  151. else:
  152. err = "%s is not a valid prev_membership" % (repr(prev_membership),)
  153. logger.error(err)
  154. raise ValueError(err)
  155. if membership == Membership.JOIN:
  156. yield self.store.update_stats_delta(
  157. now, "room", room_id, "joined_members", +1
  158. )
  159. elif membership == Membership.INVITE:
  160. yield self.store.update_stats_delta(
  161. now, "room", room_id, "invited_members", +1
  162. )
  163. elif membership == Membership.LEAVE:
  164. yield self.store.update_stats_delta(
  165. now, "room", room_id, "left_members", +1
  166. )
  167. elif membership == Membership.BAN:
  168. yield self.store.update_stats_delta(
  169. now, "room", room_id, "banned_members", +1
  170. )
  171. else:
  172. err = "%s is not a valid membership" % (repr(membership),)
  173. logger.error(err)
  174. raise ValueError(err)
  175. user_id = state_key
  176. if self.is_mine_id(user_id):
  177. # update user_stats as it's one of our users
  178. public = yield self._is_public_room(room_id)
  179. if membership == Membership.LEAVE:
  180. yield self.store.update_stats_delta(
  181. now,
  182. "user",
  183. user_id,
  184. "public_rooms" if public else "private_rooms",
  185. -1,
  186. )
  187. elif membership == Membership.JOIN:
  188. yield self.store.update_stats_delta(
  189. now,
  190. "user",
  191. user_id,
  192. "public_rooms" if public else "private_rooms",
  193. +1,
  194. )
  195. elif typ == EventTypes.Create:
  196. # Newly created room. Add it with all blank portions.
  197. yield self.store.update_room_state(
  198. room_id,
  199. {
  200. "join_rules": None,
  201. "history_visibility": None,
  202. "encryption": None,
  203. "name": None,
  204. "topic": None,
  205. "avatar": None,
  206. "canonical_alias": None,
  207. },
  208. )
  209. elif typ == EventTypes.JoinRules:
  210. yield self.store.update_room_state(
  211. room_id, {"join_rules": event_content.get("join_rule")}
  212. )
  213. is_public = yield self._get_key_change(
  214. prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
  215. )
  216. if is_public is not None:
  217. yield self.update_public_room_stats(now, room_id, is_public)
  218. elif typ == EventTypes.RoomHistoryVisibility:
  219. yield self.store.update_room_state(
  220. room_id,
  221. {"history_visibility": event_content.get("history_visibility")},
  222. )
  223. is_public = yield self._get_key_change(
  224. prev_event_id, event_id, "history_visibility", "world_readable"
  225. )
  226. if is_public is not None:
  227. yield self.update_public_room_stats(now, room_id, is_public)
  228. elif typ == EventTypes.Encryption:
  229. yield self.store.update_room_state(
  230. room_id, {"encryption": event_content.get("algorithm")}
  231. )
  232. elif typ == EventTypes.Name:
  233. yield self.store.update_room_state(
  234. room_id, {"name": event_content.get("name")}
  235. )
  236. elif typ == EventTypes.Topic:
  237. yield self.store.update_room_state(
  238. room_id, {"topic": event_content.get("topic")}
  239. )
  240. elif typ == EventTypes.RoomAvatar:
  241. yield self.store.update_room_state(
  242. room_id, {"avatar": event_content.get("url")}
  243. )
  244. elif typ == EventTypes.CanonicalAlias:
  245. yield self.store.update_room_state(
  246. room_id, {"canonical_alias": event_content.get("alias")}
  247. )
  248. @defer.inlineCallbacks
  249. def update_public_room_stats(self, ts, room_id, is_public):
  250. """
  251. Increment/decrement a user's number of public rooms when a room they are
  252. in changes to/from public visibility.
  253. Args:
  254. ts (int): Timestamp in seconds
  255. room_id (str)
  256. is_public (bool)
  257. """
  258. # For now, blindly iterate over all local users in the room so that
  259. # we can handle the whole problem of copying buckets over as needed
  260. user_ids = yield self.store.get_users_in_room(room_id)
  261. for user_id in user_ids:
  262. if self.hs.is_mine(UserID.from_string(user_id)):
  263. yield self.store.update_stats_delta(
  264. ts, "user", user_id, "public_rooms", +1 if is_public else -1
  265. )
  266. yield self.store.update_stats_delta(
  267. ts, "user", user_id, "private_rooms", -1 if is_public else +1
  268. )
  269. @defer.inlineCallbacks
  270. def _is_public_room(self, room_id):
  271. join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
  272. history_visibility = yield self.state.get_current_state(
  273. room_id, EventTypes.RoomHistoryVisibility
  274. )
  275. if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
  276. (
  277. history_visibility
  278. and history_visibility.content.get("history_visibility")
  279. == "world_readable"
  280. )
  281. ):
  282. return True
  283. else:
  284. return False