monthly_active_users.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from twisted.internet import defer
  17. from synapse.storage._base import SQLBaseStore
  18. from synapse.storage.database import Database
  19. from synapse.util.caches.descriptors import cached
  20. logger = logging.getLogger(__name__)
  21. # Number of msec of granularity to store the monthly_active_user timestamp
  22. # This means it is not necessary to update the table on every request
  23. LAST_SEEN_GRANULARITY = 60 * 60 * 1000
  24. class MonthlyActiveUsersWorkerStore(SQLBaseStore):
  25. def __init__(self, database: Database, db_conn, hs):
  26. super(MonthlyActiveUsersWorkerStore, self).__init__(database, db_conn, hs)
  27. self._clock = hs.get_clock()
  28. self.hs = hs
  29. @cached(num_args=0)
  30. def get_monthly_active_count(self):
  31. """Generates current count of monthly active users
  32. Returns:
  33. Defered[int]: Number of current monthly active users
  34. """
  35. def _count_users(txn):
  36. sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
  37. txn.execute(sql)
  38. (count,) = txn.fetchone()
  39. return count
  40. return self.db.runInteraction("count_users", _count_users)
  41. @defer.inlineCallbacks
  42. def get_registered_reserved_users(self):
  43. """Of the reserved threepids defined in config, which are associated
  44. with registered users?
  45. Returns:
  46. Defered[list]: Real reserved users
  47. """
  48. users = []
  49. for tp in self.hs.config.mau_limits_reserved_threepids[
  50. : self.hs.config.max_mau_value
  51. ]:
  52. user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
  53. tp["medium"], tp["address"]
  54. )
  55. if user_id:
  56. users.append(user_id)
  57. return users
  58. @cached(num_args=1)
  59. def user_last_seen_monthly_active(self, user_id):
  60. """
  61. Checks if a given user is part of the monthly active user group
  62. Arguments:
  63. user_id (str): user to add/update
  64. Return:
  65. Deferred[int] : timestamp since last seen, None if never seen
  66. """
  67. return self.db.simple_select_one_onecol(
  68. table="monthly_active_users",
  69. keyvalues={"user_id": user_id},
  70. retcol="timestamp",
  71. allow_none=True,
  72. desc="user_last_seen_monthly_active",
  73. )
  74. class MonthlyActiveUsersStore(MonthlyActiveUsersWorkerStore):
  75. def __init__(self, database: Database, db_conn, hs):
  76. super(MonthlyActiveUsersStore, self).__init__(database, db_conn, hs)
  77. # Do not add more reserved users than the total allowable number
  78. # cur = LoggingTransaction(
  79. self.db.new_transaction(
  80. db_conn,
  81. "initialise_mau_threepids",
  82. [],
  83. [],
  84. self._initialise_reserved_users,
  85. hs.config.mau_limits_reserved_threepids[: self.hs.config.max_mau_value],
  86. )
  87. def _initialise_reserved_users(self, txn, threepids):
  88. """Ensures that reserved threepids are accounted for in the MAU table, should
  89. be called on start up.
  90. Args:
  91. txn (cursor):
  92. threepids (list[dict]): List of threepid dicts to reserve
  93. """
  94. for tp in threepids:
  95. user_id = self.get_user_id_by_threepid_txn(txn, tp["medium"], tp["address"])
  96. if user_id:
  97. is_support = self.is_support_user_txn(txn, user_id)
  98. if not is_support:
  99. # We do this manually here to avoid hitting #6791
  100. self.db.simple_upsert_txn(
  101. txn,
  102. table="monthly_active_users",
  103. keyvalues={"user_id": user_id},
  104. values={"timestamp": int(self._clock.time_msec())},
  105. )
  106. else:
  107. logger.warning("mau limit reserved threepid %s not found in db" % tp)
  108. @defer.inlineCallbacks
  109. def reap_monthly_active_users(self):
  110. """Cleans out monthly active user table to ensure that no stale
  111. entries exist.
  112. Returns:
  113. Deferred[]
  114. """
  115. def _reap_users(txn, reserved_users):
  116. """
  117. Args:
  118. reserved_users (tuple): reserved users to preserve
  119. """
  120. thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
  121. query_args = [thirty_days_ago]
  122. base_sql = "DELETE FROM monthly_active_users WHERE timestamp < ?"
  123. # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
  124. # when len(reserved_users) == 0. Works fine on sqlite.
  125. if len(reserved_users) > 0:
  126. # questionmarks is a hack to overcome sqlite not supporting
  127. # tuples in 'WHERE IN %s'
  128. question_marks = ",".join("?" * len(reserved_users))
  129. query_args.extend(reserved_users)
  130. sql = base_sql + " AND user_id NOT IN ({})".format(question_marks)
  131. else:
  132. sql = base_sql
  133. txn.execute(sql, query_args)
  134. max_mau_value = self.hs.config.max_mau_value
  135. if self.hs.config.limit_usage_by_mau:
  136. # If MAU user count still exceeds the MAU threshold, then delete on
  137. # a least recently active basis.
  138. # Note it is not possible to write this query using OFFSET due to
  139. # incompatibilities in how sqlite and postgres support the feature.
  140. # sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present
  141. # While Postgres does not require 'LIMIT', but also does not support
  142. # negative LIMIT values. So there is no way to write it that both can
  143. # support
  144. if len(reserved_users) == 0:
  145. sql = """
  146. DELETE FROM monthly_active_users
  147. WHERE user_id NOT IN (
  148. SELECT user_id FROM monthly_active_users
  149. ORDER BY timestamp DESC
  150. LIMIT ?
  151. )
  152. """
  153. txn.execute(sql, (max_mau_value,))
  154. # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
  155. # when len(reserved_users) == 0. Works fine on sqlite.
  156. else:
  157. # Must be >= 0 for postgres
  158. num_of_non_reserved_users_to_remove = max(
  159. max_mau_value - len(reserved_users), 0
  160. )
  161. # It is important to filter reserved users twice to guard
  162. # against the case where the reserved user is present in the
  163. # SELECT, meaning that a legitmate mau is deleted.
  164. sql = """
  165. DELETE FROM monthly_active_users
  166. WHERE user_id NOT IN (
  167. SELECT user_id FROM monthly_active_users
  168. WHERE user_id NOT IN ({})
  169. ORDER BY timestamp DESC
  170. LIMIT ?
  171. )
  172. AND user_id NOT IN ({})
  173. """.format(
  174. question_marks, question_marks
  175. )
  176. query_args = [
  177. *reserved_users,
  178. num_of_non_reserved_users_to_remove,
  179. *reserved_users,
  180. ]
  181. txn.execute(sql, query_args)
  182. # It seems poor to invalidate the whole cache, Postgres supports
  183. # 'Returning' which would allow me to invalidate only the
  184. # specific users, but sqlite has no way to do this and instead
  185. # I would need to SELECT and the DELETE which without locking
  186. # is racy.
  187. # Have resolved to invalidate the whole cache for now and do
  188. # something about it if and when the perf becomes significant
  189. self._invalidate_all_cache_and_stream(
  190. txn, self.user_last_seen_monthly_active
  191. )
  192. self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
  193. reserved_users = yield self.get_registered_reserved_users()
  194. yield self.db.runInteraction(
  195. "reap_monthly_active_users", _reap_users, reserved_users
  196. )
  197. @defer.inlineCallbacks
  198. def upsert_monthly_active_user(self, user_id):
  199. """Updates or inserts the user into the monthly active user table, which
  200. is used to track the current MAU usage of the server
  201. Args:
  202. user_id (str): user to add/update
  203. """
  204. # Support user never to be included in MAU stats. Note I can't easily call this
  205. # from upsert_monthly_active_user_txn because then I need a _txn form of
  206. # is_support_user which is complicated because I want to cache the result.
  207. # Therefore I call it here and ignore the case where
  208. # upsert_monthly_active_user_txn is called directly from
  209. # _initialise_reserved_users reasoning that it would be very strange to
  210. # include a support user in this context.
  211. is_support = yield self.is_support_user(user_id)
  212. if is_support:
  213. return
  214. yield self.db.runInteraction(
  215. "upsert_monthly_active_user", self.upsert_monthly_active_user_txn, user_id
  216. )
  217. def upsert_monthly_active_user_txn(self, txn, user_id):
  218. """Updates or inserts monthly active user member
  219. We consciously do not call is_support_txn from this method because it
  220. is not possible to cache the response. is_support_txn will be false in
  221. almost all cases, so it seems reasonable to call it only for
  222. upsert_monthly_active_user and to call is_support_txn manually
  223. for cases where upsert_monthly_active_user_txn is called directly,
  224. like _initialise_reserved_users
  225. In short, don't call this method with support users. (Support users
  226. should not appear in the MAU stats).
  227. Args:
  228. txn (cursor):
  229. user_id (str): user to add/update
  230. Returns:
  231. bool: True if a new entry was created, False if an
  232. existing one was updated.
  233. """
  234. # Am consciously deciding to lock the table on the basis that is ought
  235. # never be a big table and alternative approaches (batching multiple
  236. # upserts into a single txn) introduced a lot of extra complexity.
  237. # See https://github.com/matrix-org/synapse/issues/3854 for more
  238. is_insert = self.db.simple_upsert_txn(
  239. txn,
  240. table="monthly_active_users",
  241. keyvalues={"user_id": user_id},
  242. values={"timestamp": int(self._clock.time_msec())},
  243. )
  244. self._invalidate_cache_and_stream(txn, self.get_monthly_active_count, ())
  245. self._invalidate_cache_and_stream(
  246. txn, self.user_last_seen_monthly_active, (user_id,)
  247. )
  248. return is_insert
  249. @defer.inlineCallbacks
  250. def populate_monthly_active_users(self, user_id):
  251. """Checks on the state of monthly active user limits and optionally
  252. add the user to the monthly active tables
  253. Args:
  254. user_id(str): the user_id to query
  255. """
  256. if self.hs.config.limit_usage_by_mau or self.hs.config.mau_stats_only:
  257. # Trial users and guests should not be included as part of MAU group
  258. is_guest = yield self.is_guest(user_id)
  259. if is_guest:
  260. return
  261. is_trial = yield self.is_trial_user(user_id)
  262. if is_trial:
  263. return
  264. last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id)
  265. now = self.hs.get_clock().time_msec()
  266. # We want to reduce to the total number of db writes, and are happy
  267. # to trade accuracy of timestamp in order to lighten load. This means
  268. # We always insert new users (where MAU threshold has not been reached),
  269. # but only update if we have not previously seen the user for
  270. # LAST_SEEN_GRANULARITY ms
  271. if last_seen_timestamp is None:
  272. # In the case where mau_stats_only is True and limit_usage_by_mau is
  273. # False, there is no point in checking get_monthly_active_count - it
  274. # adds no value and will break the logic if max_mau_value is exceeded.
  275. if not self.hs.config.limit_usage_by_mau:
  276. yield self.upsert_monthly_active_user(user_id)
  277. else:
  278. count = yield self.get_monthly_active_count()
  279. if count < self.hs.config.max_mau_value:
  280. yield self.upsert_monthly_active_user(user_id)
  281. elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY:
  282. yield self.upsert_monthly_active_user(user_id)