monthly_active_users.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from twisted.internet import defer
  17. from synapse.util.caches.descriptors import cached
  18. from ._base import SQLBaseStore
  19. logger = logging.getLogger(__name__)
  20. # Number of msec of granularity to store the monthly_active_user timestamp
  21. # This means it is not necessary to update the table on every request
  22. LAST_SEEN_GRANULARITY = 60 * 60 * 1000
  23. class MonthlyActiveUsersStore(SQLBaseStore):
  24. def __init__(self, dbconn, hs):
  25. super(MonthlyActiveUsersStore, self).__init__(None, hs)
  26. self._clock = hs.get_clock()
  27. self.hs = hs
  28. self.reserved_users = ()
  29. # Do not add more reserved users than the total allowable number
  30. self._new_transaction(
  31. dbconn, "initialise_mau_threepids", [], [],
  32. self._initialise_reserved_users,
  33. hs.config.mau_limits_reserved_threepids[:self.hs.config.max_mau_value],
  34. )
  35. def _initialise_reserved_users(self, txn, threepids):
  36. """Ensures that reserved threepids are accounted for in the MAU table, should
  37. be called on start up.
  38. Args:
  39. txn (cursor):
  40. threepids (list[dict]): List of threepid dicts to reserve
  41. """
  42. reserved_user_list = []
  43. for tp in threepids:
  44. user_id = self.get_user_id_by_threepid_txn(
  45. txn,
  46. tp["medium"], tp["address"]
  47. )
  48. if user_id:
  49. is_support = self.is_support_user_txn(txn, user_id)
  50. if not is_support:
  51. self.upsert_monthly_active_user_txn(txn, user_id)
  52. reserved_user_list.append(user_id)
  53. else:
  54. logger.warning(
  55. "mau limit reserved threepid %s not found in db" % tp
  56. )
  57. self.reserved_users = tuple(reserved_user_list)
  58. @defer.inlineCallbacks
  59. def reap_monthly_active_users(self):
  60. """Cleans out monthly active user table to ensure that no stale
  61. entries exist.
  62. Returns:
  63. Deferred[]
  64. """
  65. def _reap_users(txn):
  66. # Purge stale users
  67. thirty_days_ago = (
  68. int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
  69. )
  70. query_args = [thirty_days_ago]
  71. base_sql = "DELETE FROM monthly_active_users WHERE timestamp < ?"
  72. # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
  73. # when len(reserved_users) == 0. Works fine on sqlite.
  74. if len(self.reserved_users) > 0:
  75. # questionmarks is a hack to overcome sqlite not supporting
  76. # tuples in 'WHERE IN %s'
  77. questionmarks = '?' * len(self.reserved_users)
  78. query_args.extend(self.reserved_users)
  79. sql = base_sql + """ AND user_id NOT IN ({})""".format(
  80. ','.join(questionmarks)
  81. )
  82. else:
  83. sql = base_sql
  84. txn.execute(sql, query_args)
  85. if self.hs.config.limit_usage_by_mau:
  86. # If MAU user count still exceeds the MAU threshold, then delete on
  87. # a least recently active basis.
  88. # Note it is not possible to write this query using OFFSET due to
  89. # incompatibilities in how sqlite and postgres support the feature.
  90. # sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present
  91. # While Postgres does not require 'LIMIT', but also does not support
  92. # negative LIMIT values. So there is no way to write it that both can
  93. # support
  94. safe_guard = self.hs.config.max_mau_value - len(self.reserved_users)
  95. # Must be greater than zero for postgres
  96. safe_guard = safe_guard if safe_guard > 0 else 0
  97. query_args = [safe_guard]
  98. base_sql = """
  99. DELETE FROM monthly_active_users
  100. WHERE user_id NOT IN (
  101. SELECT user_id FROM monthly_active_users
  102. ORDER BY timestamp DESC
  103. LIMIT ?
  104. )
  105. """
  106. # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
  107. # when len(reserved_users) == 0. Works fine on sqlite.
  108. if len(self.reserved_users) > 0:
  109. query_args.extend(self.reserved_users)
  110. sql = base_sql + """ AND user_id NOT IN ({})""".format(
  111. ','.join(questionmarks)
  112. )
  113. else:
  114. sql = base_sql
  115. txn.execute(sql, query_args)
  116. yield self.runInteraction("reap_monthly_active_users", _reap_users)
  117. # It seems poor to invalidate the whole cache, Postgres supports
  118. # 'Returning' which would allow me to invalidate only the
  119. # specific users, but sqlite has no way to do this and instead
  120. # I would need to SELECT and the DELETE which without locking
  121. # is racy.
  122. # Have resolved to invalidate the whole cache for now and do
  123. # something about it if and when the perf becomes significant
  124. self.user_last_seen_monthly_active.invalidate_all()
  125. self.get_monthly_active_count.invalidate_all()
  126. @cached(num_args=0)
  127. def get_monthly_active_count(self):
  128. """Generates current count of monthly active users
  129. Returns:
  130. Defered[int]: Number of current monthly active users
  131. """
  132. def _count_users(txn):
  133. sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
  134. txn.execute(sql)
  135. count, = txn.fetchone()
  136. return count
  137. return self.runInteraction("count_users", _count_users)
  138. @defer.inlineCallbacks
  139. def get_registered_reserved_users_count(self):
  140. """Of the reserved threepids defined in config, how many are associated
  141. with registered users?
  142. Returns:
  143. Defered[int]: Number of real reserved users
  144. """
  145. count = 0
  146. for tp in self.hs.config.mau_limits_reserved_threepids:
  147. user_id = yield self.hs.get_datastore().get_user_id_by_threepid(
  148. tp["medium"], tp["address"]
  149. )
  150. if user_id:
  151. count = count + 1
  152. defer.returnValue(count)
  153. @defer.inlineCallbacks
  154. def upsert_monthly_active_user(self, user_id):
  155. """Updates or inserts the user into the monthly active user table, which
  156. is used to track the current MAU usage of the server
  157. Args:
  158. user_id (str): user to add/update
  159. """
  160. # Support user never to be included in MAU stats. Note I can't easily call this
  161. # from upsert_monthly_active_user_txn because then I need a _txn form of
  162. # is_support_user which is complicated because I want to cache the result.
  163. # Therefore I call it here and ignore the case where
  164. # upsert_monthly_active_user_txn is called directly from
  165. # _initialise_reserved_users reasoning that it would be very strange to
  166. # include a support user in this context.
  167. is_support = yield self.is_support_user(user_id)
  168. if is_support:
  169. return
  170. is_insert = yield self.runInteraction(
  171. "upsert_monthly_active_user", self.upsert_monthly_active_user_txn,
  172. user_id
  173. )
  174. if is_insert:
  175. self.user_last_seen_monthly_active.invalidate((user_id,))
  176. self.get_monthly_active_count.invalidate(())
  177. def upsert_monthly_active_user_txn(self, txn, user_id):
  178. """Updates or inserts monthly active user member
  179. Note that, after calling this method, it will generally be necessary
  180. to invalidate the caches on user_last_seen_monthly_active and
  181. get_monthly_active_count. We can't do that here, because we are running
  182. in a database thread rather than the main thread, and we can't call
  183. txn.call_after because txn may not be a LoggingTransaction.
  184. We consciously do not call is_support_txn from this method because it
  185. is not possible to cache the response. is_support_txn will be false in
  186. almost all cases, so it seems reasonable to call it only for
  187. upsert_monthly_active_user and to call is_support_txn manually
  188. for cases where upsert_monthly_active_user_txn is called directly,
  189. like _initialise_reserved_users
  190. In short, don't call this method with support users. (Support users
  191. should not appear in the MAU stats).
  192. Args:
  193. txn (cursor):
  194. user_id (str): user to add/update
  195. Returns:
  196. bool: True if a new entry was created, False if an
  197. existing one was updated.
  198. """
  199. # Am consciously deciding to lock the table on the basis that is ought
  200. # never be a big table and alternative approaches (batching multiple
  201. # upserts into a single txn) introduced a lot of extra complexity.
  202. # See https://github.com/matrix-org/synapse/issues/3854 for more
  203. is_insert = self._simple_upsert_txn(
  204. txn,
  205. table="monthly_active_users",
  206. keyvalues={
  207. "user_id": user_id,
  208. },
  209. values={
  210. "timestamp": int(self._clock.time_msec()),
  211. },
  212. )
  213. return is_insert
  214. @cached(num_args=1)
  215. def user_last_seen_monthly_active(self, user_id):
  216. """
  217. Checks if a given user is part of the monthly active user group
  218. Arguments:
  219. user_id (str): user to add/update
  220. Return:
  221. Deferred[int] : timestamp since last seen, None if never seen
  222. """
  223. return(self._simple_select_one_onecol(
  224. table="monthly_active_users",
  225. keyvalues={
  226. "user_id": user_id,
  227. },
  228. retcol="timestamp",
  229. allow_none=True,
  230. desc="user_last_seen_monthly_active",
  231. ))
  232. @defer.inlineCallbacks
  233. def populate_monthly_active_users(self, user_id):
  234. """Checks on the state of monthly active user limits and optionally
  235. add the user to the monthly active tables
  236. Args:
  237. user_id(str): the user_id to query
  238. """
  239. if self.hs.config.limit_usage_by_mau or self.hs.config.mau_stats_only:
  240. # Trial users and guests should not be included as part of MAU group
  241. is_guest = yield self.is_guest(user_id)
  242. if is_guest:
  243. return
  244. is_trial = yield self.is_trial_user(user_id)
  245. if is_trial:
  246. return
  247. last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id)
  248. now = self.hs.get_clock().time_msec()
  249. # We want to reduce to the total number of db writes, and are happy
  250. # to trade accuracy of timestamp in order to lighten load. This means
  251. # We always insert new users (where MAU threshold has not been reached),
  252. # but only update if we have not previously seen the user for
  253. # LAST_SEEN_GRANULARITY ms
  254. if last_seen_timestamp is None:
  255. # In the case where mau_stats_only is True and limit_usage_by_mau is
  256. # False, there is no point in checking get_monthly_active_count - it
  257. # adds no value and will break the logic if max_mau_value is exceeded.
  258. if not self.hs.config.limit_usage_by_mau:
  259. yield self.upsert_monthly_active_user(user_id)
  260. else:
  261. count = yield self.get_monthly_active_count()
  262. if count < self.hs.config.max_mau_value:
  263. yield self.upsert_monthly_active_user(user_id)
  264. elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY:
  265. yield self.upsert_monthly_active_user(user_id)