|
@@ -1,5 +1,6 @@
|
|
|
# -*- coding: utf-8 -*-
|
|
|
# Copyright 2018, 2019 New Vector Ltd
|
|
|
+# Copyright 2019 The Matrix.org Foundation C.I.C.
|
|
|
#
|
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
# you may not use this file except in compliance with the License.
|
|
@@ -14,17 +15,22 @@
|
|
|
# limitations under the License.
|
|
|
|
|
|
import logging
|
|
|
+from itertools import chain
|
|
|
|
|
|
from twisted.internet import defer
|
|
|
+from twisted.internet.defer import DeferredLock
|
|
|
|
|
|
from synapse.api.constants import EventTypes, Membership
|
|
|
-from synapse.storage.prepare_database import get_statements
|
|
|
+from synapse.storage import PostgresEngine
|
|
|
from synapse.storage.state_deltas import StateDeltasStore
|
|
|
from synapse.util.caches.descriptors import cached
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
# these fields track absolutes (e.g. total number of rooms on the server)
|
|
|
+# You can think of these as Prometheus Gauges.
|
|
|
+# You can draw these stats on a line graph.
|
|
|
+# Example: number of users in a room
|
|
|
ABSOLUTE_STATS_FIELDS = {
|
|
|
"room": (
|
|
|
"current_state_events",
|
|
@@ -32,14 +38,23 @@ ABSOLUTE_STATS_FIELDS = {
|
|
|
"invited_members",
|
|
|
"left_members",
|
|
|
"banned_members",
|
|
|
- "state_events",
|
|
|
+ "local_users_in_room",
|
|
|
),
|
|
|
- "user": ("public_rooms", "private_rooms"),
|
|
|
+ "user": ("joined_rooms",),
|
|
|
}
|
|
|
|
|
|
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
|
|
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
|
|
|
+# You can draw these stats on a histogram.
|
|
|
+# Example: number of events sent locally during a time slice
|
|
|
+PER_SLICE_FIELDS = {
|
|
|
+ "room": ("total_events", "total_event_bytes"),
|
|
|
+ "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
|
|
|
+}
|
|
|
+
|
|
|
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
|
|
|
|
|
|
-TEMP_TABLE = "_temp_populate_stats"
|
|
|
+# these are the tables (& ID columns) which contain our actual subjects
|
|
|
+TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}
|
|
|
|
|
|
|
|
|
class StatsStore(StateDeltasStore):
|
|
@@ -51,136 +66,102 @@ class StatsStore(StateDeltasStore):
|
|
|
self.stats_enabled = hs.config.stats_enabled
|
|
|
self.stats_bucket_size = hs.config.stats_bucket_size
|
|
|
|
|
|
- self.register_background_update_handler(
|
|
|
- "populate_stats_createtables", self._populate_stats_createtables
|
|
|
- )
|
|
|
+ self.stats_delta_processing_lock = DeferredLock()
|
|
|
+
|
|
|
self.register_background_update_handler(
|
|
|
"populate_stats_process_rooms", self._populate_stats_process_rooms
|
|
|
)
|
|
|
self.register_background_update_handler(
|
|
|
- "populate_stats_cleanup", self._populate_stats_cleanup
|
|
|
+ "populate_stats_process_users", self._populate_stats_process_users
|
|
|
)
|
|
|
+ # we no longer need to perform clean-up, but we will give ourselves
|
|
|
+ # the potential to reintroduce it in the future – so documentation
|
|
|
+ # will still encourage the use of this no-op handler.
|
|
|
+ self.register_noop_background_update("populate_stats_cleanup")
|
|
|
+ self.register_noop_background_update("populate_stats_prepare")
|
|
|
|
|
|
- @defer.inlineCallbacks
|
|
|
- def _populate_stats_createtables(self, progress, batch_size):
|
|
|
-
|
|
|
- if not self.stats_enabled:
|
|
|
- yield self._end_background_update("populate_stats_createtables")
|
|
|
- return 1
|
|
|
-
|
|
|
- # Get all the rooms that we want to process.
|
|
|
- def _make_staging_area(txn):
|
|
|
- # Create the temporary tables
|
|
|
- stmts = get_statements(
|
|
|
- """
|
|
|
- -- We just recreate the table, we'll be reinserting the
|
|
|
- -- correct entries again later anyway.
|
|
|
- DROP TABLE IF EXISTS {temp}_rooms;
|
|
|
-
|
|
|
- CREATE TABLE IF NOT EXISTS {temp}_rooms(
|
|
|
- room_id TEXT NOT NULL,
|
|
|
- events BIGINT NOT NULL
|
|
|
- );
|
|
|
-
|
|
|
- CREATE INDEX {temp}_rooms_events
|
|
|
- ON {temp}_rooms(events);
|
|
|
- CREATE INDEX {temp}_rooms_id
|
|
|
- ON {temp}_rooms(room_id);
|
|
|
- """.format(
|
|
|
- temp=TEMP_TABLE
|
|
|
- ).splitlines()
|
|
|
- )
|
|
|
-
|
|
|
- for statement in stmts:
|
|
|
- txn.execute(statement)
|
|
|
-
|
|
|
- sql = (
|
|
|
- "CREATE TABLE IF NOT EXISTS "
|
|
|
- + TEMP_TABLE
|
|
|
- + "_position(position TEXT NOT NULL)"
|
|
|
- )
|
|
|
- txn.execute(sql)
|
|
|
-
|
|
|
- # Get rooms we want to process from the database, only adding
|
|
|
- # those that we haven't (i.e. those not in room_stats_earliest_token)
|
|
|
- sql = """
|
|
|
- INSERT INTO %s_rooms (room_id, events)
|
|
|
- SELECT c.room_id, count(*) FROM current_state_events AS c
|
|
|
- LEFT JOIN room_stats_earliest_token AS t USING (room_id)
|
|
|
- WHERE t.room_id IS NULL
|
|
|
- GROUP BY c.room_id
|
|
|
- """ % (
|
|
|
- TEMP_TABLE,
|
|
|
- )
|
|
|
- txn.execute(sql)
|
|
|
+ def quantise_stats_time(self, ts):
|
|
|
+ """
|
|
|
+ Quantises a timestamp to be a multiple of the bucket size.
|
|
|
|
|
|
- new_pos = yield self.get_max_stream_id_in_current_state_deltas()
|
|
|
- yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
|
|
|
- yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
|
|
|
- self.get_earliest_token_for_room_stats.invalidate_all()
|
|
|
+ Args:
|
|
|
+ ts (int): the timestamp to quantise, in milliseconds since the Unix
|
|
|
+ Epoch
|
|
|
|
|
|
- yield self._end_background_update("populate_stats_createtables")
|
|
|
- return 1
|
|
|
+ Returns:
|
|
|
+ int: a timestamp which
|
|
|
+ - is divisible by the bucket size;
|
|
|
+ - is no later than `ts`; and
|
|
|
+ - is the largest such timestamp.
|
|
|
+ """
|
|
|
+ return (ts // self.stats_bucket_size) * self.stats_bucket_size
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
- def _populate_stats_cleanup(self, progress, batch_size):
|
|
|
+ def _populate_stats_process_users(self, progress, batch_size):
|
|
|
"""
|
|
|
- Update the user directory stream position, then clean up the old tables.
|
|
|
+ This is a background update which regenerates statistics for users.
|
|
|
"""
|
|
|
if not self.stats_enabled:
|
|
|
- yield self._end_background_update("populate_stats_cleanup")
|
|
|
+ yield self._end_background_update("populate_stats_process_users")
|
|
|
return 1
|
|
|
|
|
|
- position = yield self._simple_select_one_onecol(
|
|
|
- TEMP_TABLE + "_position", None, "position"
|
|
|
+ last_user_id = progress.get("last_user_id", "")
|
|
|
+
|
|
|
+ def _get_next_batch(txn):
|
|
|
+ sql = """
|
|
|
+ SELECT DISTINCT name FROM users
|
|
|
+ WHERE name > ?
|
|
|
+ ORDER BY name ASC
|
|
|
+ LIMIT ?
|
|
|
+ """
|
|
|
+ txn.execute(sql, (last_user_id, batch_size))
|
|
|
+ return [r for r, in txn]
|
|
|
+
|
|
|
+ users_to_work_on = yield self.runInteraction(
|
|
|
+ "_populate_stats_process_users", _get_next_batch
|
|
|
)
|
|
|
- yield self.update_stats_stream_pos(position)
|
|
|
|
|
|
- def _delete_staging_area(txn):
|
|
|
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
|
|
|
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
|
|
|
+ # No more rooms -- complete the transaction.
|
|
|
+ if not users_to_work_on:
|
|
|
+ yield self._end_background_update("populate_stats_process_users")
|
|
|
+ return 1
|
|
|
|
|
|
- yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
|
|
|
+ for user_id in users_to_work_on:
|
|
|
+ yield self._calculate_and_set_initial_state_for_user(user_id)
|
|
|
+ progress["last_user_id"] = user_id
|
|
|
|
|
|
- yield self._end_background_update("populate_stats_cleanup")
|
|
|
- return 1
|
|
|
+ yield self.runInteraction(
|
|
|
+ "populate_stats_process_users",
|
|
|
+ self._background_update_progress_txn,
|
|
|
+ "populate_stats_process_users",
|
|
|
+ progress,
|
|
|
+ )
|
|
|
+
|
|
|
+ return len(users_to_work_on)
|
|
|
|
|
|
@defer.inlineCallbacks
|
|
|
def _populate_stats_process_rooms(self, progress, batch_size):
|
|
|
-
|
|
|
+ """
|
|
|
+ This is a background update which regenerates statistics for rooms.
|
|
|
+ """
|
|
|
if not self.stats_enabled:
|
|
|
yield self._end_background_update("populate_stats_process_rooms")
|
|
|
return 1
|
|
|
|
|
|
- # If we don't have progress filed, delete everything.
|
|
|
- if not progress:
|
|
|
- yield self.delete_all_stats()
|
|
|
+ last_room_id = progress.get("last_room_id", "")
|
|
|
|
|
|
def _get_next_batch(txn):
|
|
|
- # Only fetch 250 rooms, so we don't fetch too many at once, even
|
|
|
- # if those 250 rooms have less than batch_size state events.
|
|
|
sql = """
|
|
|
- SELECT room_id, events FROM %s_rooms
|
|
|
- ORDER BY events DESC
|
|
|
- LIMIT 250
|
|
|
- """ % (
|
|
|
- TEMP_TABLE,
|
|
|
- )
|
|
|
- txn.execute(sql)
|
|
|
- rooms_to_work_on = txn.fetchall()
|
|
|
-
|
|
|
- if not rooms_to_work_on:
|
|
|
- return None
|
|
|
-
|
|
|
- # Get how many are left to process, so we can give status on how
|
|
|
- # far we are in processing
|
|
|
- txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
|
|
|
- progress["remaining"] = txn.fetchone()[0]
|
|
|
-
|
|
|
- return rooms_to_work_on
|
|
|
+ SELECT DISTINCT room_id FROM current_state_events
|
|
|
+ WHERE room_id > ?
|
|
|
+ ORDER BY room_id ASC
|
|
|
+ LIMIT ?
|
|
|
+ """
|
|
|
+ txn.execute(sql, (last_room_id, batch_size))
|
|
|
+ return [r for r, in txn]
|
|
|
|
|
|
rooms_to_work_on = yield self.runInteraction(
|
|
|
- "populate_stats_temp_read", _get_next_batch
|
|
|
+ "populate_stats_rooms_get_batch", _get_next_batch
|
|
|
)
|
|
|
|
|
|
# No more rooms -- complete the transaction.
|
|
@@ -188,154 +169,28 @@ class StatsStore(StateDeltasStore):
|
|
|
yield self._end_background_update("populate_stats_process_rooms")
|
|
|
return 1
|
|
|
|
|
|
- logger.info(
|
|
|
- "Processing the next %d rooms of %d remaining",
|
|
|
- len(rooms_to_work_on),
|
|
|
- progress["remaining"],
|
|
|
- )
|
|
|
-
|
|
|
- # Number of state events we've processed by going through each room
|
|
|
- processed_event_count = 0
|
|
|
-
|
|
|
- for room_id, event_count in rooms_to_work_on:
|
|
|
-
|
|
|
- current_state_ids = yield self.get_current_state_ids(room_id)
|
|
|
-
|
|
|
- join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
|
|
|
- history_visibility_id = current_state_ids.get(
|
|
|
- (EventTypes.RoomHistoryVisibility, "")
|
|
|
- )
|
|
|
- encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
|
|
|
- name_id = current_state_ids.get((EventTypes.Name, ""))
|
|
|
- topic_id = current_state_ids.get((EventTypes.Topic, ""))
|
|
|
- avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
|
|
|
- canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
|
|
|
-
|
|
|
- event_ids = [
|
|
|
- join_rules_id,
|
|
|
- history_visibility_id,
|
|
|
- encryption_id,
|
|
|
- name_id,
|
|
|
- topic_id,
|
|
|
- avatar_id,
|
|
|
- canonical_alias_id,
|
|
|
- ]
|
|
|
-
|
|
|
- state_events = yield self.get_events(
|
|
|
- [ev for ev in event_ids if ev is not None]
|
|
|
- )
|
|
|
-
|
|
|
- def _get_or_none(event_id, arg):
|
|
|
- event = state_events.get(event_id)
|
|
|
- if event:
|
|
|
- return event.content.get(arg)
|
|
|
- return None
|
|
|
-
|
|
|
- yield self.update_room_state(
|
|
|
- room_id,
|
|
|
- {
|
|
|
- "join_rules": _get_or_none(join_rules_id, "join_rule"),
|
|
|
- "history_visibility": _get_or_none(
|
|
|
- history_visibility_id, "history_visibility"
|
|
|
- ),
|
|
|
- "encryption": _get_or_none(encryption_id, "algorithm"),
|
|
|
- "name": _get_or_none(name_id, "name"),
|
|
|
- "topic": _get_or_none(topic_id, "topic"),
|
|
|
- "avatar": _get_or_none(avatar_id, "url"),
|
|
|
- "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
|
|
|
- },
|
|
|
- )
|
|
|
+ for room_id in rooms_to_work_on:
|
|
|
+ yield self._calculate_and_set_initial_state_for_room(room_id)
|
|
|
+ progress["last_room_id"] = room_id
|
|
|
|
|
|
- now = self.hs.get_reactor().seconds()
|
|
|
-
|
|
|
- # quantise time to the nearest bucket
|
|
|
- now = (now // self.stats_bucket_size) * self.stats_bucket_size
|
|
|
-
|
|
|
- def _fetch_data(txn):
|
|
|
-
|
|
|
- # Get the current token of the room
|
|
|
- current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
|
|
|
-
|
|
|
- current_state_events = len(current_state_ids)
|
|
|
-
|
|
|
- membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
|
|
|
-
|
|
|
- total_state_events = self._get_total_state_event_counts_txn(
|
|
|
- txn, room_id
|
|
|
- )
|
|
|
-
|
|
|
- self._update_stats_txn(
|
|
|
- txn,
|
|
|
- "room",
|
|
|
- room_id,
|
|
|
- now,
|
|
|
- {
|
|
|
- "bucket_size": self.stats_bucket_size,
|
|
|
- "current_state_events": current_state_events,
|
|
|
- "joined_members": membership_counts.get(Membership.JOIN, 0),
|
|
|
- "invited_members": membership_counts.get(Membership.INVITE, 0),
|
|
|
- "left_members": membership_counts.get(Membership.LEAVE, 0),
|
|
|
- "banned_members": membership_counts.get(Membership.BAN, 0),
|
|
|
- "state_events": total_state_events,
|
|
|
- },
|
|
|
- )
|
|
|
- self._simple_insert_txn(
|
|
|
- txn,
|
|
|
- "room_stats_earliest_token",
|
|
|
- {"room_id": room_id, "token": current_token},
|
|
|
- )
|
|
|
-
|
|
|
- # We've finished a room. Delete it from the table.
|
|
|
- self._simple_delete_one_txn(
|
|
|
- txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
|
|
|
- )
|
|
|
-
|
|
|
- yield self.runInteraction("update_room_stats", _fetch_data)
|
|
|
-
|
|
|
- # Update the remaining counter.
|
|
|
- progress["remaining"] -= 1
|
|
|
- yield self.runInteraction(
|
|
|
- "populate_stats",
|
|
|
- self._background_update_progress_txn,
|
|
|
- "populate_stats_process_rooms",
|
|
|
- progress,
|
|
|
- )
|
|
|
-
|
|
|
- processed_event_count += event_count
|
|
|
-
|
|
|
- if processed_event_count > batch_size:
|
|
|
- # Don't process any more rooms, we've hit our batch size.
|
|
|
- return processed_event_count
|
|
|
+ yield self.runInteraction(
|
|
|
+ "_populate_stats_process_rooms",
|
|
|
+ self._background_update_progress_txn,
|
|
|
+ "populate_stats_process_rooms",
|
|
|
+ progress,
|
|
|
+ )
|
|
|
|
|
|
- return processed_event_count
|
|
|
+ return len(rooms_to_work_on)
|
|
|
|
|
|
- def delete_all_stats(self):
|
|
|
+ def get_stats_positions(self):
|
|
|
"""
|
|
|
- Delete all statistics records.
|
|
|
+ Returns the stats processor positions.
|
|
|
"""
|
|
|
-
|
|
|
- def _delete_all_stats_txn(txn):
|
|
|
- txn.execute("DELETE FROM room_state")
|
|
|
- txn.execute("DELETE FROM room_stats")
|
|
|
- txn.execute("DELETE FROM room_stats_earliest_token")
|
|
|
- txn.execute("DELETE FROM user_stats")
|
|
|
-
|
|
|
- return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
|
|
|
-
|
|
|
- def get_stats_stream_pos(self):
|
|
|
return self._simple_select_one_onecol(
|
|
|
- table="stats_stream_pos",
|
|
|
+ table="stats_incremental_position",
|
|
|
keyvalues={},
|
|
|
retcol="stream_id",
|
|
|
- desc="stats_stream_pos",
|
|
|
- )
|
|
|
-
|
|
|
- def update_stats_stream_pos(self, stream_id):
|
|
|
- return self._simple_update_one(
|
|
|
- table="stats_stream_pos",
|
|
|
- keyvalues={},
|
|
|
- updatevalues={"stream_id": stream_id},
|
|
|
- desc="update_stats_stream_pos",
|
|
|
+ desc="stats_incremental_position",
|
|
|
)
|
|
|
|
|
|
def update_room_state(self, room_id, fields):
|
|
@@ -361,42 +216,87 @@ class StatsStore(StateDeltasStore):
|
|
|
fields[col] = None
|
|
|
|
|
|
return self._simple_upsert(
|
|
|
- table="room_state",
|
|
|
+ table="room_stats_state",
|
|
|
keyvalues={"room_id": room_id},
|
|
|
values=fields,
|
|
|
desc="update_room_state",
|
|
|
)
|
|
|
|
|
|
- def get_deltas_for_room(self, room_id, start, size=100):
|
|
|
+ def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
|
|
|
"""
|
|
|
- Get statistics deltas for a given room.
|
|
|
+ Get statistics for a given subject.
|
|
|
|
|
|
Args:
|
|
|
- room_id (str)
|
|
|
+ stats_type (str): The type of subject
|
|
|
+ stats_id (str): The ID of the subject (e.g. room_id or user_id)
|
|
|
start (int): Pagination start. Number of entries, not timestamp.
|
|
|
size (int): How many entries to return.
|
|
|
|
|
|
Returns:
|
|
|
Deferred[list[dict]], where the dict has the keys of
|
|
|
- ABSOLUTE_STATS_FIELDS["room"] and "ts".
|
|
|
+ ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
|
|
|
"""
|
|
|
- return self._simple_select_list_paginate(
|
|
|
- "room_stats",
|
|
|
- {"room_id": room_id},
|
|
|
- "ts",
|
|
|
+ return self.runInteraction(
|
|
|
+ "get_statistics_for_subject",
|
|
|
+ self._get_statistics_for_subject_txn,
|
|
|
+ stats_type,
|
|
|
+ stats_id,
|
|
|
+ start,
|
|
|
+ size,
|
|
|
+ )
|
|
|
+
|
|
|
+ def _get_statistics_for_subject_txn(
|
|
|
+ self, txn, stats_type, stats_id, start, size=100
|
|
|
+ ):
|
|
|
+ """
|
|
|
+ Transaction-bound version of L{get_statistics_for_subject}.
|
|
|
+ """
|
|
|
+
|
|
|
+ table, id_col = TYPE_TO_TABLE[stats_type]
|
|
|
+ selected_columns = list(
|
|
|
+ ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
|
|
|
+ )
|
|
|
+
|
|
|
+ slice_list = self._simple_select_list_paginate_txn(
|
|
|
+ txn,
|
|
|
+ table + "_historical",
|
|
|
+ {id_col: stats_id},
|
|
|
+ "end_ts",
|
|
|
start,
|
|
|
size,
|
|
|
- retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
|
|
|
+ retcols=selected_columns + ["bucket_size", "end_ts"],
|
|
|
order_direction="DESC",
|
|
|
)
|
|
|
|
|
|
- def get_all_room_state(self):
|
|
|
- return self._simple_select_list(
|
|
|
- "room_state", None, retcols=("name", "topic", "canonical_alias")
|
|
|
+ return slice_list
|
|
|
+
|
|
|
+ def get_room_stats_state(self, room_id):
|
|
|
+ """
|
|
|
+ Returns the current room_stats_state for a room.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ room_id (str): The ID of the room to return state for.
|
|
|
+
|
|
|
+ Returns (dict):
|
|
|
+ Dictionary containing these keys:
|
|
|
+ "name", "topic", "canonical_alias", "avatar", "join_rules",
|
|
|
+ "history_visibility"
|
|
|
+ """
|
|
|
+ return self._simple_select_one(
|
|
|
+ "room_stats_state",
|
|
|
+ {"room_id": room_id},
|
|
|
+ retcols=(
|
|
|
+ "name",
|
|
|
+ "topic",
|
|
|
+ "canonical_alias",
|
|
|
+ "avatar",
|
|
|
+ "join_rules",
|
|
|
+ "history_visibility",
|
|
|
+ ),
|
|
|
)
|
|
|
|
|
|
@cached()
|
|
|
- def get_earliest_token_for_room_stats(self, room_id):
|
|
|
+ def get_earliest_token_for_stats(self, stats_type, id):
|
|
|
"""
|
|
|
Fetch the "earliest token". This is used by the room stats delta
|
|
|
processor to ignore deltas that have been processed between the
|
|
@@ -406,79 +306,571 @@ class StatsStore(StateDeltasStore):
|
|
|
Returns:
|
|
|
Deferred[int]
|
|
|
"""
|
|
|
+ table, id_col = TYPE_TO_TABLE[stats_type]
|
|
|
+
|
|
|
return self._simple_select_one_onecol(
|
|
|
- "room_stats_earliest_token",
|
|
|
- {"room_id": room_id},
|
|
|
- retcol="token",
|
|
|
+ "%s_current" % (table,),
|
|
|
+ keyvalues={id_col: id},
|
|
|
+ retcol="completed_delta_stream_id",
|
|
|
allow_none=True,
|
|
|
)
|
|
|
|
|
|
- def update_stats(self, stats_type, stats_id, ts, fields):
|
|
|
- table, id_col = TYPE_TO_ROOM[stats_type]
|
|
|
- return self._simple_upsert(
|
|
|
- table=table,
|
|
|
- keyvalues={id_col: stats_id, "ts": ts},
|
|
|
- values=fields,
|
|
|
- desc="update_stats",
|
|
|
+ def bulk_update_stats_delta(self, ts, updates, stream_id):
|
|
|
+ """Bulk update stats tables for a given stream_id and updates the stats
|
|
|
+ incremental position.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ ts (int): Current timestamp in ms
|
|
|
+ updates(dict[str, dict[str, dict[str, Counter]]]): The updates to
|
|
|
+ commit as a mapping stats_type -> stats_id -> field -> delta.
|
|
|
+ stream_id (int): Current position.
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Deferred
|
|
|
+ """
|
|
|
+
|
|
|
+ def _bulk_update_stats_delta_txn(txn):
|
|
|
+ for stats_type, stats_updates in updates.items():
|
|
|
+ for stats_id, fields in stats_updates.items():
|
|
|
+ self._update_stats_delta_txn(
|
|
|
+ txn,
|
|
|
+ ts=ts,
|
|
|
+ stats_type=stats_type,
|
|
|
+ stats_id=stats_id,
|
|
|
+ fields=fields,
|
|
|
+ complete_with_stream_id=stream_id,
|
|
|
+ )
|
|
|
+
|
|
|
+ self._simple_update_one_txn(
|
|
|
+ txn,
|
|
|
+ table="stats_incremental_position",
|
|
|
+ keyvalues={},
|
|
|
+ updatevalues={"stream_id": stream_id},
|
|
|
+ )
|
|
|
+
|
|
|
+ return self.runInteraction(
|
|
|
+ "bulk_update_stats_delta", _bulk_update_stats_delta_txn
|
|
|
)
|
|
|
|
|
|
- def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
|
|
|
- table, id_col = TYPE_TO_ROOM[stats_type]
|
|
|
- return self._simple_upsert_txn(
|
|
|
- txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
|
|
|
+ def update_stats_delta(
|
|
|
+ self,
|
|
|
+ ts,
|
|
|
+ stats_type,
|
|
|
+ stats_id,
|
|
|
+ fields,
|
|
|
+ complete_with_stream_id,
|
|
|
+ absolute_field_overrides=None,
|
|
|
+ ):
|
|
|
+ """
|
|
|
+ Updates the statistics for a subject, with a delta (difference/relative
|
|
|
+ change).
|
|
|
+
|
|
|
+ Args:
|
|
|
+ ts (int): timestamp of the change
|
|
|
+ stats_type (str): "room" or "user" – the kind of subject
|
|
|
+ stats_id (str): the subject's ID (room ID or user ID)
|
|
|
+ fields (dict[str, int]): Deltas of stats values.
|
|
|
+ complete_with_stream_id (int, optional):
|
|
|
+ If supplied, converts an incomplete row into a complete row,
|
|
|
+ with the supplied stream_id marked as the stream_id where the
|
|
|
+ row was completed.
|
|
|
+ absolute_field_overrides (dict[str, int]): Current stats values
|
|
|
+ (i.e. not deltas) of absolute fields.
|
|
|
+ Does not work with per-slice fields.
|
|
|
+ """
|
|
|
+
|
|
|
+ return self.runInteraction(
|
|
|
+ "update_stats_delta",
|
|
|
+ self._update_stats_delta_txn,
|
|
|
+ ts,
|
|
|
+ stats_type,
|
|
|
+ stats_id,
|
|
|
+ fields,
|
|
|
+ complete_with_stream_id=complete_with_stream_id,
|
|
|
+ absolute_field_overrides=absolute_field_overrides,
|
|
|
)
|
|
|
|
|
|
- def update_stats_delta(self, ts, stats_type, stats_id, field, value):
|
|
|
- def _update_stats_delta(txn):
|
|
|
- table, id_col = TYPE_TO_ROOM[stats_type]
|
|
|
-
|
|
|
- sql = (
|
|
|
- "SELECT * FROM %s"
|
|
|
- " WHERE %s=? and ts=("
|
|
|
- " SELECT MAX(ts) FROM %s"
|
|
|
- " WHERE %s=?"
|
|
|
- ")"
|
|
|
- ) % (table, id_col, table, id_col)
|
|
|
- txn.execute(sql, (stats_id, stats_id))
|
|
|
- rows = self.cursor_to_dict(txn)
|
|
|
- if len(rows) == 0:
|
|
|
- # silently skip as we don't have anything to apply a delta to yet.
|
|
|
- # this tries to minimise any race between the initial sync and
|
|
|
- # subsequent deltas arriving.
|
|
|
- return
|
|
|
-
|
|
|
- current_ts = ts
|
|
|
- latest_ts = rows[0]["ts"]
|
|
|
- if current_ts < latest_ts:
|
|
|
- # This one is in the past, but we're just encountering it now.
|
|
|
- # Mark it as part of the current bucket.
|
|
|
- current_ts = latest_ts
|
|
|
- elif ts != latest_ts:
|
|
|
- # we have to copy our absolute counters over to the new entry.
|
|
|
- values = {
|
|
|
- key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
|
|
|
- }
|
|
|
- values[id_col] = stats_id
|
|
|
- values["ts"] = ts
|
|
|
- values["bucket_size"] = self.stats_bucket_size
|
|
|
-
|
|
|
- self._simple_insert_txn(txn, table=table, values=values)
|
|
|
-
|
|
|
- # actually update the new value
|
|
|
- if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
|
|
|
- self._simple_update_txn(
|
|
|
- txn,
|
|
|
- table=table,
|
|
|
- keyvalues={id_col: stats_id, "ts": current_ts},
|
|
|
- updatevalues={field: value},
|
|
|
+ def _update_stats_delta_txn(
|
|
|
+ self,
|
|
|
+ txn,
|
|
|
+ ts,
|
|
|
+ stats_type,
|
|
|
+ stats_id,
|
|
|
+ fields,
|
|
|
+ complete_with_stream_id,
|
|
|
+ absolute_field_overrides=None,
|
|
|
+ ):
|
|
|
+ if absolute_field_overrides is None:
|
|
|
+ absolute_field_overrides = {}
|
|
|
+
|
|
|
+ table, id_col = TYPE_TO_TABLE[stats_type]
|
|
|
+
|
|
|
+ quantised_ts = self.quantise_stats_time(int(ts))
|
|
|
+ end_ts = quantised_ts + self.stats_bucket_size
|
|
|
+
|
|
|
+ # Lets be paranoid and check that all the given field names are known
|
|
|
+ abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
|
|
|
+ slice_field_names = PER_SLICE_FIELDS[stats_type]
|
|
|
+ for field in chain(fields.keys(), absolute_field_overrides.keys()):
|
|
|
+ if field not in abs_field_names and field not in slice_field_names:
|
|
|
+ # guard against potential SQL injection dodginess
|
|
|
+ raise ValueError(
|
|
|
+ "%s is not a recognised field"
|
|
|
+ " for stats type %s" % (field, stats_type)
|
|
|
)
|
|
|
+
|
|
|
+ # Per slice fields do not get added to the _current table
|
|
|
+
|
|
|
+ # This calculates the deltas (`field = field + ?` values)
|
|
|
+ # for absolute fields,
|
|
|
+ # * defaulting to 0 if not specified
|
|
|
+ # (required for the INSERT part of upserting to work)
|
|
|
+ # * omitting overrides specified in `absolute_field_overrides`
|
|
|
+ deltas_of_absolute_fields = {
|
|
|
+ key: fields.get(key, 0)
|
|
|
+ for key in abs_field_names
|
|
|
+ if key not in absolute_field_overrides
|
|
|
+ }
|
|
|
+
|
|
|
+ # Keep the delta stream ID field up to date
|
|
|
+ absolute_field_overrides = absolute_field_overrides.copy()
|
|
|
+ absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id
|
|
|
+
|
|
|
+ # first upsert the `_current` table
|
|
|
+ self._upsert_with_additive_relatives_txn(
|
|
|
+ txn=txn,
|
|
|
+ table=table + "_current",
|
|
|
+ keyvalues={id_col: stats_id},
|
|
|
+ absolutes=absolute_field_overrides,
|
|
|
+ additive_relatives=deltas_of_absolute_fields,
|
|
|
+ )
|
|
|
+
|
|
|
+ per_slice_additive_relatives = {
|
|
|
+ key: fields.get(key, 0) for key in slice_field_names
|
|
|
+ }
|
|
|
+ self._upsert_copy_from_table_with_additive_relatives_txn(
|
|
|
+ txn=txn,
|
|
|
+ into_table=table + "_historical",
|
|
|
+ keyvalues={id_col: stats_id},
|
|
|
+ extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
|
|
|
+ extra_dst_keyvalues={"end_ts": end_ts},
|
|
|
+ additive_relatives=per_slice_additive_relatives,
|
|
|
+ src_table=table + "_current",
|
|
|
+ copy_columns=abs_field_names,
|
|
|
+ )
|
|
|
+
|
|
|
+ def _upsert_with_additive_relatives_txn(
|
|
|
+ self, txn, table, keyvalues, absolutes, additive_relatives
|
|
|
+ ):
|
|
|
+ """Used to update values in the stats tables.
|
|
|
+
|
|
|
+ This is basically a slightly convoluted upsert that *adds* to any
|
|
|
+ existing rows.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ txn
|
|
|
+ table (str): Table name
|
|
|
+ keyvalues (dict[str, any]): Row-identifying key values
|
|
|
+ absolutes (dict[str, any]): Absolute (set) fields
|
|
|
+ additive_relatives (dict[str, int]): Fields that will be added onto
|
|
|
+ if existing row present.
|
|
|
+ """
|
|
|
+ if self.database_engine.can_native_upsert:
|
|
|
+ absolute_updates = [
|
|
|
+ "%(field)s = EXCLUDED.%(field)s" % {"field": field}
|
|
|
+ for field in absolutes.keys()
|
|
|
+ ]
|
|
|
+
|
|
|
+ relative_updates = [
|
|
|
+ "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
|
|
|
+ % {"table": table, "field": field}
|
|
|
+ for field in additive_relatives.keys()
|
|
|
+ ]
|
|
|
+
|
|
|
+ insert_cols = []
|
|
|
+ qargs = []
|
|
|
+
|
|
|
+ for (key, val) in chain(
|
|
|
+ keyvalues.items(), absolutes.items(), additive_relatives.items()
|
|
|
+ ):
|
|
|
+ insert_cols.append(key)
|
|
|
+ qargs.append(val)
|
|
|
+
|
|
|
+ sql = """
|
|
|
+ INSERT INTO %(table)s (%(insert_cols_cs)s)
|
|
|
+ VALUES (%(insert_vals_qs)s)
|
|
|
+ ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
|
|
|
+ """ % {
|
|
|
+ "table": table,
|
|
|
+ "insert_cols_cs": ", ".join(insert_cols),
|
|
|
+ "insert_vals_qs": ", ".join(
|
|
|
+ ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
|
|
|
+ ),
|
|
|
+ "key_columns": ", ".join(keyvalues),
|
|
|
+ "updates": ", ".join(chain(absolute_updates, relative_updates)),
|
|
|
+ }
|
|
|
+
|
|
|
+ txn.execute(sql, qargs)
|
|
|
+ else:
|
|
|
+ self.database_engine.lock_table(txn, table)
|
|
|
+ retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
|
|
|
+ current_row = self._simple_select_one_txn(
|
|
|
+ txn, table, keyvalues, retcols, allow_none=True
|
|
|
+ )
|
|
|
+ if current_row is None:
|
|
|
+ merged_dict = {**keyvalues, **absolutes, **additive_relatives}
|
|
|
+ self._simple_insert_txn(txn, table, merged_dict)
|
|
|
+ else:
|
|
|
+ for (key, val) in additive_relatives.items():
|
|
|
+ current_row[key] += val
|
|
|
+ current_row.update(absolutes)
|
|
|
+ self._simple_update_one_txn(txn, table, keyvalues, current_row)
|
|
|
+
|
|
|
+ def _upsert_copy_from_table_with_additive_relatives_txn(
|
|
|
+ self,
|
|
|
+ txn,
|
|
|
+ into_table,
|
|
|
+ keyvalues,
|
|
|
+ extra_dst_keyvalues,
|
|
|
+ extra_dst_insvalues,
|
|
|
+ additive_relatives,
|
|
|
+ src_table,
|
|
|
+ copy_columns,
|
|
|
+ ):
|
|
|
+ """Updates the historic stats table with latest updates.
|
|
|
+
|
|
|
+ This involves copying "absolute" fields from the `_current` table, and
|
|
|
+ adding relative fields to any existing values.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ txn: Transaction
|
|
|
+ into_table (str): The destination table to UPSERT the row into
|
|
|
+ keyvalues (dict[str, any]): Row-identifying key values
|
|
|
+ extra_dst_keyvalues (dict[str, any]): Additional keyvalues
|
|
|
+ for `into_table`.
|
|
|
+ extra_dst_insvalues (dict[str, any]): Additional values to insert
|
|
|
+ on new row creation for `into_table`.
|
|
|
+ additive_relatives (dict[str, any]): Fields that will be added onto
|
|
|
+ if existing row present. (Must be disjoint from copy_columns.)
|
|
|
+ src_table (str): The source table to copy from
|
|
|
+ copy_columns (iterable[str]): The list of columns to copy
|
|
|
+ """
|
|
|
+ if self.database_engine.can_native_upsert:
|
|
|
+ ins_columns = chain(
|
|
|
+ keyvalues,
|
|
|
+ copy_columns,
|
|
|
+ additive_relatives,
|
|
|
+ extra_dst_keyvalues,
|
|
|
+ extra_dst_insvalues,
|
|
|
+ )
|
|
|
+ sel_exprs = chain(
|
|
|
+ keyvalues,
|
|
|
+ copy_columns,
|
|
|
+ (
|
|
|
+ "?"
|
|
|
+ for _ in chain(
|
|
|
+ additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
|
|
|
+ )
|
|
|
+ ),
|
|
|
+ )
|
|
|
+ keyvalues_where = ("%s = ?" % f for f in keyvalues)
|
|
|
+
|
|
|
+ sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
|
|
|
+ sets_ar = (
|
|
|
+ "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
|
|
|
+ for f in additive_relatives
|
|
|
+ )
|
|
|
+
|
|
|
+ sql = """
|
|
|
+ INSERT INTO %(into_table)s (%(ins_columns)s)
|
|
|
+ SELECT %(sel_exprs)s
|
|
|
+ FROM %(src_table)s
|
|
|
+ WHERE %(keyvalues_where)s
|
|
|
+ ON CONFLICT (%(keyvalues)s)
|
|
|
+ DO UPDATE SET %(sets)s
|
|
|
+ """ % {
|
|
|
+ "into_table": into_table,
|
|
|
+ "ins_columns": ", ".join(ins_columns),
|
|
|
+ "sel_exprs": ", ".join(sel_exprs),
|
|
|
+ "keyvalues_where": " AND ".join(keyvalues_where),
|
|
|
+ "src_table": src_table,
|
|
|
+ "keyvalues": ", ".join(
|
|
|
+ chain(keyvalues.keys(), extra_dst_keyvalues.keys())
|
|
|
+ ),
|
|
|
+ "sets": ", ".join(chain(sets_cc, sets_ar)),
|
|
|
+ }
|
|
|
+
|
|
|
+ qargs = list(
|
|
|
+ chain(
|
|
|
+ additive_relatives.values(),
|
|
|
+ extra_dst_keyvalues.values(),
|
|
|
+ extra_dst_insvalues.values(),
|
|
|
+ keyvalues.values(),
|
|
|
+ )
|
|
|
+ )
|
|
|
+ txn.execute(sql, qargs)
|
|
|
+ else:
|
|
|
+ self.database_engine.lock_table(txn, into_table)
|
|
|
+ src_row = self._simple_select_one_txn(
|
|
|
+ txn, src_table, keyvalues, copy_columns
|
|
|
+ )
|
|
|
+ all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
|
|
|
+ dest_current_row = self._simple_select_one_txn(
|
|
|
+ txn,
|
|
|
+ into_table,
|
|
|
+ keyvalues=all_dest_keyvalues,
|
|
|
+ retcols=list(chain(additive_relatives.keys(), copy_columns)),
|
|
|
+ allow_none=True,
|
|
|
+ )
|
|
|
+
|
|
|
+ if dest_current_row is None:
|
|
|
+ merged_dict = {
|
|
|
+ **keyvalues,
|
|
|
+ **extra_dst_keyvalues,
|
|
|
+ **extra_dst_insvalues,
|
|
|
+ **src_row,
|
|
|
+ **additive_relatives,
|
|
|
+ }
|
|
|
+ self._simple_insert_txn(txn, into_table, merged_dict)
|
|
|
else:
|
|
|
- sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
|
|
|
- table,
|
|
|
- field,
|
|
|
- field,
|
|
|
- id_col,
|
|
|
+ for (key, val) in additive_relatives.items():
|
|
|
+ src_row[key] = dest_current_row[key] + val
|
|
|
+ self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
|
|
|
+
|
|
|
+ def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
|
|
|
+ """Fetches the counts of events in the given range of stream IDs.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ min_pos (int)
|
|
|
+ max_pos (int)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field
|
|
|
+ changes.
|
|
|
+ """
|
|
|
+
|
|
|
+ return self.runInteraction(
|
|
|
+ "stats_incremental_total_events_and_bytes",
|
|
|
+ self.get_changes_room_total_events_and_bytes_txn,
|
|
|
+ min_pos,
|
|
|
+ max_pos,
|
|
|
+ )
|
|
|
+
|
|
|
+ def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos):
|
|
|
+ """Gets the total_events and total_event_bytes counts for rooms and
|
|
|
+ senders, in a range of stream_orderings (including backfilled events).
|
|
|
+
|
|
|
+ Args:
|
|
|
+ txn
|
|
|
+ low_pos (int): Low stream ordering
|
|
|
+ high_pos (int): High stream ordering
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The
|
|
|
+ room and user deltas for total_events/total_event_bytes in the
|
|
|
+ format of `stats_id` -> fields
|
|
|
+ """
|
|
|
+
|
|
|
+ if low_pos >= high_pos:
|
|
|
+ # nothing to do here.
|
|
|
+ return {}, {}
|
|
|
+
|
|
|
+ if isinstance(self.database_engine, PostgresEngine):
|
|
|
+ new_bytes_expression = "OCTET_LENGTH(json)"
|
|
|
+ else:
|
|
|
+ new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
|
|
|
+
|
|
|
+ sql = """
|
|
|
+ SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
|
|
|
+ FROM events INNER JOIN event_json USING (event_id)
|
|
|
+ WHERE (? < stream_ordering AND stream_ordering <= ?)
|
|
|
+ OR (? <= stream_ordering AND stream_ordering <= ?)
|
|
|
+ GROUP BY events.room_id
|
|
|
+ """ % (
|
|
|
+ new_bytes_expression,
|
|
|
+ )
|
|
|
+
|
|
|
+ txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
|
|
|
+
|
|
|
+ room_deltas = {
|
|
|
+ room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
|
|
|
+ for room_id, new_events, new_bytes in txn
|
|
|
+ }
|
|
|
+
|
|
|
+ sql = """
|
|
|
+ SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
|
|
|
+ FROM events INNER JOIN event_json USING (event_id)
|
|
|
+ WHERE (? < stream_ordering AND stream_ordering <= ?)
|
|
|
+ OR (? <= stream_ordering AND stream_ordering <= ?)
|
|
|
+ GROUP BY events.sender
|
|
|
+ """ % (
|
|
|
+ new_bytes_expression,
|
|
|
+ )
|
|
|
+
|
|
|
+ txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
|
|
|
+
|
|
|
+ user_deltas = {
|
|
|
+ user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
|
|
|
+ for user_id, new_events, new_bytes in txn
|
|
|
+ if self.hs.is_mine_id(user_id)
|
|
|
+ }
|
|
|
+
|
|
|
+ return room_deltas, user_deltas
|
|
|
+
|
|
|
+ @defer.inlineCallbacks
|
|
|
+ def _calculate_and_set_initial_state_for_room(self, room_id):
|
|
|
+ """Calculate and insert an entry into room_stats_current.
|
|
|
+
|
|
|
+ Args:
|
|
|
+ room_id (str)
|
|
|
+
|
|
|
+ Returns:
|
|
|
+ Deferred[tuple[dict, dict, int]]: A tuple of room state, membership
|
|
|
+ counts and stream position.
|
|
|
+ """
|
|
|
+
|
|
|
+ def _fetch_current_state_stats(txn):
|
|
|
+ pos = self.get_room_max_stream_ordering()
|
|
|
+
|
|
|
+ rows = self._simple_select_many_txn(
|
|
|
+ txn,
|
|
|
+ table="current_state_events",
|
|
|
+ column="type",
|
|
|
+ iterable=[
|
|
|
+ EventTypes.Create,
|
|
|
+ EventTypes.JoinRules,
|
|
|
+ EventTypes.RoomHistoryVisibility,
|
|
|
+ EventTypes.Encryption,
|
|
|
+ EventTypes.Name,
|
|
|
+ EventTypes.Topic,
|
|
|
+ EventTypes.RoomAvatar,
|
|
|
+ EventTypes.CanonicalAlias,
|
|
|
+ ],
|
|
|
+ keyvalues={"room_id": room_id, "state_key": ""},
|
|
|
+ retcols=["event_id"],
|
|
|
+ )
|
|
|
+
|
|
|
+ event_ids = [row["event_id"] for row in rows]
|
|
|
+
|
|
|
+ txn.execute(
|
|
|
+ """
|
|
|
+ SELECT membership, count(*) FROM current_state_events
|
|
|
+ WHERE room_id = ? AND type = 'm.room.member'
|
|
|
+ GROUP BY membership
|
|
|
+ """,
|
|
|
+ (room_id,),
|
|
|
+ )
|
|
|
+ membership_counts = {membership: cnt for membership, cnt in txn}
|
|
|
+
|
|
|
+ txn.execute(
|
|
|
+ """
|
|
|
+ SELECT COALESCE(count(*), 0) FROM current_state_events
|
|
|
+ WHERE room_id = ?
|
|
|
+ """,
|
|
|
+ (room_id,),
|
|
|
+ )
|
|
|
+
|
|
|
+ current_state_events_count, = txn.fetchone()
|
|
|
+
|
|
|
+ users_in_room = self.get_users_in_room_txn(txn, room_id)
|
|
|
+
|
|
|
+ return (
|
|
|
+ event_ids,
|
|
|
+ membership_counts,
|
|
|
+ current_state_events_count,
|
|
|
+ users_in_room,
|
|
|
+ pos,
|
|
|
+ )
|
|
|
+
|
|
|
+ (
|
|
|
+ event_ids,
|
|
|
+ membership_counts,
|
|
|
+ current_state_events_count,
|
|
|
+ users_in_room,
|
|
|
+ pos,
|
|
|
+ ) = yield self.runInteraction(
|
|
|
+ "get_initial_state_for_room", _fetch_current_state_stats
|
|
|
+ )
|
|
|
+
|
|
|
+ state_event_map = yield self.get_events(event_ids, get_prev_content=False)
|
|
|
+
|
|
|
+ room_state = {
|
|
|
+ "join_rules": None,
|
|
|
+ "history_visibility": None,
|
|
|
+ "encryption": None,
|
|
|
+ "name": None,
|
|
|
+ "topic": None,
|
|
|
+ "avatar": None,
|
|
|
+ "canonical_alias": None,
|
|
|
+ "is_federatable": True,
|
|
|
+ }
|
|
|
+
|
|
|
+ for event in state_event_map.values():
|
|
|
+ if event.type == EventTypes.JoinRules:
|
|
|
+ room_state["join_rules"] = event.content.get("join_rule")
|
|
|
+ elif event.type == EventTypes.RoomHistoryVisibility:
|
|
|
+ room_state["history_visibility"] = event.content.get(
|
|
|
+ "history_visibility"
|
|
|
)
|
|
|
- txn.execute(sql, (value, stats_id, current_ts))
|
|
|
+ elif event.type == EventTypes.Encryption:
|
|
|
+ room_state["encryption"] = event.content.get("algorithm")
|
|
|
+ elif event.type == EventTypes.Name:
|
|
|
+ room_state["name"] = event.content.get("name")
|
|
|
+ elif event.type == EventTypes.Topic:
|
|
|
+ room_state["topic"] = event.content.get("topic")
|
|
|
+ elif event.type == EventTypes.RoomAvatar:
|
|
|
+ room_state["avatar"] = event.content.get("url")
|
|
|
+ elif event.type == EventTypes.CanonicalAlias:
|
|
|
+ room_state["canonical_alias"] = event.content.get("alias")
|
|
|
+ elif event.type == EventTypes.Create:
|
|
|
+ room_state["is_federatable"] = event.content.get("m.federate", True)
|
|
|
+
|
|
|
+ yield self.update_room_state(room_id, room_state)
|
|
|
+
|
|
|
+ local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)]
|
|
|
+
|
|
|
+ yield self.update_stats_delta(
|
|
|
+ ts=self.clock.time_msec(),
|
|
|
+ stats_type="room",
|
|
|
+ stats_id=room_id,
|
|
|
+ fields={},
|
|
|
+ complete_with_stream_id=pos,
|
|
|
+ absolute_field_overrides={
|
|
|
+ "current_state_events": current_state_events_count,
|
|
|
+ "joined_members": membership_counts.get(Membership.JOIN, 0),
|
|
|
+ "invited_members": membership_counts.get(Membership.INVITE, 0),
|
|
|
+ "left_members": membership_counts.get(Membership.LEAVE, 0),
|
|
|
+ "banned_members": membership_counts.get(Membership.BAN, 0),
|
|
|
+ "local_users_in_room": len(local_users_in_room),
|
|
|
+ },
|
|
|
+ )
|
|
|
+
|
|
|
+ @defer.inlineCallbacks
|
|
|
+ def _calculate_and_set_initial_state_for_user(self, user_id):
|
|
|
+ def _calculate_and_set_initial_state_for_user_txn(txn):
|
|
|
+ pos = self._get_max_stream_id_in_current_state_deltas_txn(txn)
|
|
|
|
|
|
- return self.runInteraction("update_stats_delta", _update_stats_delta)
|
|
|
+ txn.execute(
|
|
|
+ """
|
|
|
+ SELECT COUNT(distinct room_id) FROM current_state_events
|
|
|
+ WHERE type = 'm.room.member' AND state_key = ?
|
|
|
+ AND membership = 'join'
|
|
|
+ """,
|
|
|
+ (user_id,),
|
|
|
+ )
|
|
|
+ count, = txn.fetchone()
|
|
|
+ return count, pos
|
|
|
+
|
|
|
+ joined_rooms, pos = yield self.runInteraction(
|
|
|
+ "calculate_and_set_initial_state_for_user",
|
|
|
+ _calculate_and_set_initial_state_for_user_txn,
|
|
|
+ )
|
|
|
+
|
|
|
+ yield self.update_stats_delta(
|
|
|
+ ts=self.clock.time_msec(),
|
|
|
+ stats_type="user",
|
|
|
+ stats_id=user_id,
|
|
|
+ fields={},
|
|
|
+ complete_with_stream_id=pos,
|
|
|
+ absolute_field_overrides={"joined_rooms": joined_rooms},
|
|
|
+ )
|