monthly_active_users.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2018 New Vector
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from twisted.internet import defer
  17. from synapse.util.caches.descriptors import cached
  18. from ._base import SQLBaseStore
  19. logger = logging.getLogger(__name__)
  20. # Number of msec of granularity to store the monthly_active_user timestamp
  21. # This means it is not necessary to update the table on every request
  22. LAST_SEEN_GRANULARITY = 60 * 60 * 1000
  23. class MonthlyActiveUsersStore(SQLBaseStore):
  24. def __init__(self, dbconn, hs):
  25. super(MonthlyActiveUsersStore, self).__init__(None, hs)
  26. self._clock = hs.get_clock()
  27. self.hs = hs
  28. self.reserved_users = ()
  29. @defer.inlineCallbacks
  30. def initialise_reserved_users(self, threepids):
  31. # TODO Why can't I do this in init?
  32. store = self.hs.get_datastore()
  33. reserved_user_list = []
  34. # Do not add more reserved users than the total allowable number
  35. for tp in threepids[:self.hs.config.max_mau_value]:
  36. user_id = yield store.get_user_id_by_threepid(
  37. tp["medium"], tp["address"]
  38. )
  39. if user_id:
  40. yield self.upsert_monthly_active_user(user_id)
  41. reserved_user_list.append(user_id)
  42. else:
  43. logger.warning(
  44. "mau limit reserved threepid %s not found in db" % tp
  45. )
  46. self.reserved_users = tuple(reserved_user_list)
  47. @defer.inlineCallbacks
  48. def reap_monthly_active_users(self):
  49. """
  50. Cleans out monthly active user table to ensure that no stale
  51. entries exist.
  52. Returns:
  53. Deferred[]
  54. """
  55. def _reap_users(txn):
  56. # Purge stale users
  57. thirty_days_ago = (
  58. int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
  59. )
  60. query_args = [thirty_days_ago]
  61. base_sql = "DELETE FROM monthly_active_users WHERE timestamp < ?"
  62. # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
  63. # when len(reserved_users) == 0. Works fine on sqlite.
  64. if len(self.reserved_users) > 0:
  65. # questionmarks is a hack to overcome sqlite not supporting
  66. # tuples in 'WHERE IN %s'
  67. questionmarks = '?' * len(self.reserved_users)
  68. query_args.extend(self.reserved_users)
  69. sql = base_sql + """ AND user_id NOT IN ({})""".format(
  70. ','.join(questionmarks)
  71. )
  72. else:
  73. sql = base_sql
  74. txn.execute(sql, query_args)
  75. # If MAU user count still exceeds the MAU threshold, then delete on
  76. # a least recently active basis.
  77. # Note it is not possible to write this query using OFFSET due to
  78. # incompatibilities in how sqlite and postgres support the feature.
  79. # sqlite requires 'LIMIT -1 OFFSET ?', the LIMIT must be present
  80. # While Postgres does not require 'LIMIT', but also does not support
  81. # negative LIMIT values. So there is no way to write it that both can
  82. # support
  83. query_args = [self.hs.config.max_mau_value]
  84. base_sql = """
  85. DELETE FROM monthly_active_users
  86. WHERE user_id NOT IN (
  87. SELECT user_id FROM monthly_active_users
  88. ORDER BY timestamp DESC
  89. LIMIT ?
  90. )
  91. """
  92. # Need if/else since 'AND user_id NOT IN ({})' fails on Postgres
  93. # when len(reserved_users) == 0. Works fine on sqlite.
  94. if len(self.reserved_users) > 0:
  95. query_args.extend(self.reserved_users)
  96. sql = base_sql + """ AND user_id NOT IN ({})""".format(
  97. ','.join(questionmarks)
  98. )
  99. else:
  100. sql = base_sql
  101. txn.execute(sql, query_args)
  102. yield self.runInteraction("reap_monthly_active_users", _reap_users)
  103. # It seems poor to invalidate the whole cache, Postgres supports
  104. # 'Returning' which would allow me to invalidate only the
  105. # specific users, but sqlite has no way to do this and instead
  106. # I would need to SELECT and the DELETE which without locking
  107. # is racy.
  108. # Have resolved to invalidate the whole cache for now and do
  109. # something about it if and when the perf becomes significant
  110. self.user_last_seen_monthly_active.invalidate_all()
  111. self.get_monthly_active_count.invalidate_all()
  112. @cached(num_args=0)
  113. def get_monthly_active_count(self):
  114. """Generates current count of monthly active users
  115. Returns:
  116. Defered[int]: Number of current monthly active users
  117. """
  118. def _count_users(txn):
  119. sql = "SELECT COALESCE(count(*), 0) FROM monthly_active_users"
  120. txn.execute(sql)
  121. count, = txn.fetchone()
  122. return count
  123. return self.runInteraction("count_users", _count_users)
  124. def upsert_monthly_active_user(self, user_id):
  125. """
  126. Updates or inserts monthly active user member
  127. Arguments:
  128. user_id (str): user to add/update
  129. Deferred[bool]: True if a new entry was created, False if an
  130. existing one was updated.
  131. """
  132. is_insert = self._simple_upsert(
  133. desc="upsert_monthly_active_user",
  134. table="monthly_active_users",
  135. keyvalues={
  136. "user_id": user_id,
  137. },
  138. values={
  139. "timestamp": int(self._clock.time_msec()),
  140. },
  141. lock=False,
  142. )
  143. if is_insert:
  144. self.user_last_seen_monthly_active.invalidate((user_id,))
  145. self.get_monthly_active_count.invalidate(())
  146. @cached(num_args=1)
  147. def user_last_seen_monthly_active(self, user_id):
  148. """
  149. Checks if a given user is part of the monthly active user group
  150. Arguments:
  151. user_id (str): user to add/update
  152. Return:
  153. Deferred[int] : timestamp since last seen, None if never seen
  154. """
  155. return(self._simple_select_one_onecol(
  156. table="monthly_active_users",
  157. keyvalues={
  158. "user_id": user_id,
  159. },
  160. retcol="timestamp",
  161. allow_none=True,
  162. desc="user_last_seen_monthly_active",
  163. ))
  164. @defer.inlineCallbacks
  165. def populate_monthly_active_users(self, user_id):
  166. """Checks on the state of monthly active user limits and optionally
  167. add the user to the monthly active tables
  168. Args:
  169. user_id(str): the user_id to query
  170. """
  171. if self.hs.config.limit_usage_by_mau:
  172. last_seen_timestamp = yield self.user_last_seen_monthly_active(user_id)
  173. now = self.hs.get_clock().time_msec()
  174. # We want to reduce to the total number of db writes, and are happy
  175. # to trade accuracy of timestamp in order to lighten load. This means
  176. # We always insert new users (where MAU threshold has not been reached),
  177. # but only update if we have not previously seen the user for
  178. # LAST_SEEN_GRANULARITY ms
  179. if last_seen_timestamp is None:
  180. count = yield self.get_monthly_active_count()
  181. if count < self.hs.config.max_mau_value:
  182. yield self.upsert_monthly_active_user(user_id)
  183. elif now - last_seen_timestamp > LAST_SEEN_GRANULARITY:
  184. yield self.upsert_monthly_active_user(user_id)