__init__.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. # Copyright 2018 New Vector Ltd
  4. # Copyright 2019 The Matrix.org Foundation C.I.C.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. import calendar
  18. import logging
  19. import time
  20. from synapse.api.constants import PresenceState
  21. from synapse.storage.database import Database
  22. from synapse.storage.engines import PostgresEngine
  23. from synapse.storage.util.id_generators import (
  24. ChainedIdGenerator,
  25. IdGenerator,
  26. StreamIdGenerator,
  27. )
  28. from synapse.util.caches.stream_change_cache import StreamChangeCache
  29. from .account_data import AccountDataStore
  30. from .appservice import ApplicationServiceStore, ApplicationServiceTransactionStore
  31. from .cache import CacheInvalidationStore
  32. from .client_ips import ClientIpStore
  33. from .deviceinbox import DeviceInboxStore
  34. from .devices import DeviceStore
  35. from .directory import DirectoryStore
  36. from .e2e_room_keys import EndToEndRoomKeyStore
  37. from .end_to_end_keys import EndToEndKeyStore
  38. from .event_federation import EventFederationStore
  39. from .event_push_actions import EventPushActionsStore
  40. from .events import EventsStore
  41. from .events_bg_updates import EventsBackgroundUpdatesStore
  42. from .filtering import FilteringStore
  43. from .group_server import GroupServerStore
  44. from .keys import KeyStore
  45. from .media_repository import MediaRepositoryStore
  46. from .monthly_active_users import MonthlyActiveUsersStore
  47. from .openid import OpenIdStore
  48. from .presence import PresenceStore, UserPresenceState
  49. from .profile import ProfileStore
  50. from .push_rule import PushRuleStore
  51. from .pusher import PusherStore
  52. from .receipts import ReceiptsStore
  53. from .registration import RegistrationStore
  54. from .rejections import RejectionsStore
  55. from .relations import RelationsStore
  56. from .room import RoomStore
  57. from .roommember import RoomMemberStore
  58. from .search import SearchStore
  59. from .signatures import SignatureStore
  60. from .state import StateStore
  61. from .stats import StatsStore
  62. from .stream import StreamStore
  63. from .tags import TagsStore
  64. from .transactions import TransactionStore
  65. from .user_directory import UserDirectoryStore
  66. from .user_erasure_store import UserErasureStore
  67. logger = logging.getLogger(__name__)
  68. class DataStore(
  69. EventsBackgroundUpdatesStore,
  70. RoomMemberStore,
  71. RoomStore,
  72. RegistrationStore,
  73. StreamStore,
  74. ProfileStore,
  75. PresenceStore,
  76. TransactionStore,
  77. DirectoryStore,
  78. KeyStore,
  79. StateStore,
  80. SignatureStore,
  81. ApplicationServiceStore,
  82. EventsStore,
  83. EventFederationStore,
  84. MediaRepositoryStore,
  85. RejectionsStore,
  86. FilteringStore,
  87. PusherStore,
  88. PushRuleStore,
  89. ApplicationServiceTransactionStore,
  90. ReceiptsStore,
  91. EndToEndKeyStore,
  92. EndToEndRoomKeyStore,
  93. SearchStore,
  94. TagsStore,
  95. AccountDataStore,
  96. EventPushActionsStore,
  97. OpenIdStore,
  98. ClientIpStore,
  99. DeviceStore,
  100. DeviceInboxStore,
  101. UserDirectoryStore,
  102. GroupServerStore,
  103. UserErasureStore,
  104. MonthlyActiveUsersStore,
  105. StatsStore,
  106. RelationsStore,
  107. CacheInvalidationStore,
  108. ):
  109. def __init__(self, database: Database, db_conn, hs):
  110. self.hs = hs
  111. self._clock = hs.get_clock()
  112. self.database_engine = database.engine
  113. all_users_native = are_all_users_on_domain(
  114. db_conn.cursor(), database.engine, hs.hostname
  115. )
  116. if not all_users_native:
  117. raise Exception(
  118. "Found users in database not native to %s!\n"
  119. "You cannot changed a synapse server_name after it's been configured"
  120. % (hs.hostname,)
  121. )
  122. self._stream_id_gen = StreamIdGenerator(
  123. db_conn,
  124. "events",
  125. "stream_ordering",
  126. extra_tables=[("local_invites", "stream_id")],
  127. )
  128. self._backfill_id_gen = StreamIdGenerator(
  129. db_conn,
  130. "events",
  131. "stream_ordering",
  132. step=-1,
  133. extra_tables=[("ex_outlier_stream", "event_stream_ordering")],
  134. )
  135. self._presence_id_gen = StreamIdGenerator(
  136. db_conn, "presence_stream", "stream_id"
  137. )
  138. self._device_inbox_id_gen = StreamIdGenerator(
  139. db_conn, "device_max_stream_id", "stream_id"
  140. )
  141. self._public_room_id_gen = StreamIdGenerator(
  142. db_conn, "public_room_list_stream", "stream_id"
  143. )
  144. self._device_list_id_gen = StreamIdGenerator(
  145. db_conn,
  146. "device_lists_stream",
  147. "stream_id",
  148. extra_tables=[("user_signature_stream", "stream_id")],
  149. )
  150. self._cross_signing_id_gen = StreamIdGenerator(
  151. db_conn, "e2e_cross_signing_keys", "stream_id"
  152. )
  153. self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
  154. self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
  155. self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
  156. self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
  157. self._push_rules_stream_id_gen = ChainedIdGenerator(
  158. self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
  159. )
  160. self._pushers_id_gen = StreamIdGenerator(
  161. db_conn, "pushers", "id", extra_tables=[("deleted_pushers", "stream_id")]
  162. )
  163. self._group_updates_id_gen = StreamIdGenerator(
  164. db_conn, "local_group_updates", "stream_id"
  165. )
  166. if isinstance(self.database_engine, PostgresEngine):
  167. self._cache_id_gen = StreamIdGenerator(
  168. db_conn, "cache_invalidation_stream", "stream_id"
  169. )
  170. else:
  171. self._cache_id_gen = None
  172. super(DataStore, self).__init__(database, db_conn, hs)
  173. self._presence_on_startup = self._get_active_presence(db_conn)
  174. presence_cache_prefill, min_presence_val = self.db.get_cache_dict(
  175. db_conn,
  176. "presence_stream",
  177. entity_column="user_id",
  178. stream_column="stream_id",
  179. max_value=self._presence_id_gen.get_current_token(),
  180. )
  181. self.presence_stream_cache = StreamChangeCache(
  182. "PresenceStreamChangeCache",
  183. min_presence_val,
  184. prefilled_cache=presence_cache_prefill,
  185. )
  186. max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
  187. device_inbox_prefill, min_device_inbox_id = self.db.get_cache_dict(
  188. db_conn,
  189. "device_inbox",
  190. entity_column="user_id",
  191. stream_column="stream_id",
  192. max_value=max_device_inbox_id,
  193. limit=1000,
  194. )
  195. self._device_inbox_stream_cache = StreamChangeCache(
  196. "DeviceInboxStreamChangeCache",
  197. min_device_inbox_id,
  198. prefilled_cache=device_inbox_prefill,
  199. )
  200. # The federation outbox and the local device inbox uses the same
  201. # stream_id generator.
  202. device_outbox_prefill, min_device_outbox_id = self.db.get_cache_dict(
  203. db_conn,
  204. "device_federation_outbox",
  205. entity_column="destination",
  206. stream_column="stream_id",
  207. max_value=max_device_inbox_id,
  208. limit=1000,
  209. )
  210. self._device_federation_outbox_stream_cache = StreamChangeCache(
  211. "DeviceFederationOutboxStreamChangeCache",
  212. min_device_outbox_id,
  213. prefilled_cache=device_outbox_prefill,
  214. )
  215. device_list_max = self._device_list_id_gen.get_current_token()
  216. self._device_list_stream_cache = StreamChangeCache(
  217. "DeviceListStreamChangeCache", device_list_max
  218. )
  219. self._user_signature_stream_cache = StreamChangeCache(
  220. "UserSignatureStreamChangeCache", device_list_max
  221. )
  222. self._device_list_federation_stream_cache = StreamChangeCache(
  223. "DeviceListFederationStreamChangeCache", device_list_max
  224. )
  225. events_max = self._stream_id_gen.get_current_token()
  226. curr_state_delta_prefill, min_curr_state_delta_id = self.db.get_cache_dict(
  227. db_conn,
  228. "current_state_delta_stream",
  229. entity_column="room_id",
  230. stream_column="stream_id",
  231. max_value=events_max, # As we share the stream id with events token
  232. limit=1000,
  233. )
  234. self._curr_state_delta_stream_cache = StreamChangeCache(
  235. "_curr_state_delta_stream_cache",
  236. min_curr_state_delta_id,
  237. prefilled_cache=curr_state_delta_prefill,
  238. )
  239. _group_updates_prefill, min_group_updates_id = self.db.get_cache_dict(
  240. db_conn,
  241. "local_group_updates",
  242. entity_column="user_id",
  243. stream_column="stream_id",
  244. max_value=self._group_updates_id_gen.get_current_token(),
  245. limit=1000,
  246. )
  247. self._group_updates_stream_cache = StreamChangeCache(
  248. "_group_updates_stream_cache",
  249. min_group_updates_id,
  250. prefilled_cache=_group_updates_prefill,
  251. )
  252. self._stream_order_on_start = self.get_room_max_stream_ordering()
  253. self._min_stream_order_on_start = self.get_room_min_stream_ordering()
  254. # Used in _generate_user_daily_visits to keep track of progress
  255. self._last_user_visit_update = self._get_start_of_day()
  256. def take_presence_startup_info(self):
  257. active_on_startup = self._presence_on_startup
  258. self._presence_on_startup = None
  259. return active_on_startup
  260. def _get_active_presence(self, db_conn):
  261. """Fetch non-offline presence from the database so that we can register
  262. the appropriate time outs.
  263. """
  264. sql = (
  265. "SELECT user_id, state, last_active_ts, last_federation_update_ts,"
  266. " last_user_sync_ts, status_msg, currently_active FROM presence_stream"
  267. " WHERE state != ?"
  268. )
  269. sql = self.database_engine.convert_param_style(sql)
  270. txn = db_conn.cursor()
  271. txn.execute(sql, (PresenceState.OFFLINE,))
  272. rows = self.db.cursor_to_dict(txn)
  273. txn.close()
  274. for row in rows:
  275. row["currently_active"] = bool(row["currently_active"])
  276. return [UserPresenceState(**row) for row in rows]
  277. def count_daily_users(self):
  278. """
  279. Counts the number of users who used this homeserver in the last 24 hours.
  280. """
  281. yesterday = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24)
  282. return self.db.runInteraction("count_daily_users", self._count_users, yesterday)
  283. def count_monthly_users(self):
  284. """
  285. Counts the number of users who used this homeserver in the last 30 days.
  286. Note this method is intended for phonehome metrics only and is different
  287. from the mau figure in synapse.storage.monthly_active_users which,
  288. amongst other things, includes a 3 day grace period before a user counts.
  289. """
  290. thirty_days_ago = int(self._clock.time_msec()) - (1000 * 60 * 60 * 24 * 30)
  291. return self.db.runInteraction(
  292. "count_monthly_users", self._count_users, thirty_days_ago
  293. )
  294. def _count_users(self, txn, time_from):
  295. """
  296. Returns number of users seen in the past time_from period
  297. """
  298. sql = """
  299. SELECT COALESCE(count(*), 0) FROM (
  300. SELECT user_id FROM user_ips
  301. WHERE last_seen > ?
  302. GROUP BY user_id
  303. ) u
  304. """
  305. txn.execute(sql, (time_from,))
  306. (count,) = txn.fetchone()
  307. return count
  308. def count_r30_users(self):
  309. """
  310. Counts the number of 30 day retained users, defined as:-
  311. * Users who have created their accounts more than 30 days ago
  312. * Where last seen at most 30 days ago
  313. * Where account creation and last_seen are > 30 days apart
  314. Returns counts globaly for a given user as well as breaking
  315. by platform
  316. """
  317. def _count_r30_users(txn):
  318. thirty_days_in_secs = 86400 * 30
  319. now = int(self._clock.time())
  320. thirty_days_ago_in_secs = now - thirty_days_in_secs
  321. sql = """
  322. SELECT platform, COALESCE(count(*), 0) FROM (
  323. SELECT
  324. users.name, platform, users.creation_ts * 1000,
  325. MAX(uip.last_seen)
  326. FROM users
  327. INNER JOIN (
  328. SELECT
  329. user_id,
  330. last_seen,
  331. CASE
  332. WHEN user_agent LIKE '%%Android%%' THEN 'android'
  333. WHEN user_agent LIKE '%%iOS%%' THEN 'ios'
  334. WHEN user_agent LIKE '%%Electron%%' THEN 'electron'
  335. WHEN user_agent LIKE '%%Mozilla%%' THEN 'web'
  336. WHEN user_agent LIKE '%%Gecko%%' THEN 'web'
  337. ELSE 'unknown'
  338. END
  339. AS platform
  340. FROM user_ips
  341. ) uip
  342. ON users.name = uip.user_id
  343. AND users.appservice_id is NULL
  344. AND users.creation_ts < ?
  345. AND uip.last_seen/1000 > ?
  346. AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
  347. GROUP BY users.name, platform, users.creation_ts
  348. ) u GROUP BY platform
  349. """
  350. results = {}
  351. txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
  352. for row in txn:
  353. if row[0] == "unknown":
  354. pass
  355. results[row[0]] = row[1]
  356. sql = """
  357. SELECT COALESCE(count(*), 0) FROM (
  358. SELECT users.name, users.creation_ts * 1000,
  359. MAX(uip.last_seen)
  360. FROM users
  361. INNER JOIN (
  362. SELECT
  363. user_id,
  364. last_seen
  365. FROM user_ips
  366. ) uip
  367. ON users.name = uip.user_id
  368. AND appservice_id is NULL
  369. AND users.creation_ts < ?
  370. AND uip.last_seen/1000 > ?
  371. AND (uip.last_seen/1000) - users.creation_ts > 86400 * 30
  372. GROUP BY users.name, users.creation_ts
  373. ) u
  374. """
  375. txn.execute(sql, (thirty_days_ago_in_secs, thirty_days_ago_in_secs))
  376. (count,) = txn.fetchone()
  377. results["all"] = count
  378. return results
  379. return self.db.runInteraction("count_r30_users", _count_r30_users)
  380. def _get_start_of_day(self):
  381. """
  382. Returns millisecond unixtime for start of UTC day.
  383. """
  384. now = time.gmtime()
  385. today_start = calendar.timegm((now.tm_year, now.tm_mon, now.tm_mday, 0, 0, 0))
  386. return today_start * 1000
  387. def generate_user_daily_visits(self):
  388. """
  389. Generates daily visit data for use in cohort/ retention analysis
  390. """
  391. def _generate_user_daily_visits(txn):
  392. logger.info("Calling _generate_user_daily_visits")
  393. today_start = self._get_start_of_day()
  394. a_day_in_milliseconds = 24 * 60 * 60 * 1000
  395. now = self.clock.time_msec()
  396. sql = """
  397. INSERT INTO user_daily_visits (user_id, device_id, timestamp)
  398. SELECT u.user_id, u.device_id, ?
  399. FROM user_ips AS u
  400. LEFT JOIN (
  401. SELECT user_id, device_id, timestamp FROM user_daily_visits
  402. WHERE timestamp = ?
  403. ) udv
  404. ON u.user_id = udv.user_id AND u.device_id=udv.device_id
  405. INNER JOIN users ON users.name=u.user_id
  406. WHERE last_seen > ? AND last_seen <= ?
  407. AND udv.timestamp IS NULL AND users.is_guest=0
  408. AND users.appservice_id IS NULL
  409. GROUP BY u.user_id, u.device_id
  410. """
  411. # This means that the day has rolled over but there could still
  412. # be entries from the previous day. There is an edge case
  413. # where if the user logs in at 23:59 and overwrites their
  414. # last_seen at 00:01 then they will not be counted in the
  415. # previous day's stats - it is important that the query is run
  416. # often to minimise this case.
  417. if today_start > self._last_user_visit_update:
  418. yesterday_start = today_start - a_day_in_milliseconds
  419. txn.execute(
  420. sql,
  421. (
  422. yesterday_start,
  423. yesterday_start,
  424. self._last_user_visit_update,
  425. today_start,
  426. ),
  427. )
  428. self._last_user_visit_update = today_start
  429. txn.execute(
  430. sql, (today_start, today_start, self._last_user_visit_update, now)
  431. )
  432. # Update _last_user_visit_update to now. The reason to do this
  433. # rather just clamping to the beginning of the day is to limit
  434. # the size of the join - meaning that the query can be run more
  435. # frequently
  436. self._last_user_visit_update = now
  437. return self.db.runInteraction(
  438. "generate_user_daily_visits", _generate_user_daily_visits
  439. )
  440. def get_users(self):
  441. """Function to retrieve a list of users in users table.
  442. Args:
  443. Returns:
  444. defer.Deferred: resolves to list[dict[str, Any]]
  445. """
  446. return self.db.simple_select_list(
  447. table="users",
  448. keyvalues={},
  449. retcols=[
  450. "name",
  451. "password_hash",
  452. "is_guest",
  453. "admin",
  454. "user_type",
  455. "deactivated",
  456. ],
  457. desc="get_users",
  458. )
  459. def get_users_paginate(
  460. self, start, limit, name=None, guests=True, deactivated=False
  461. ):
  462. """Function to retrieve a paginated list of users from
  463. users list. This will return a json list of users.
  464. Args:
  465. start (int): start number to begin the query from
  466. limit (int): number of rows to retrieve
  467. name (string): filter for user names
  468. guests (bool): whether to in include guest users
  469. deactivated (bool): whether to include deactivated users
  470. Returns:
  471. defer.Deferred: resolves to list[dict[str, Any]]
  472. """
  473. name_filter = {}
  474. if name:
  475. name_filter["name"] = "%" + name + "%"
  476. attr_filter = {}
  477. if not guests:
  478. attr_filter["is_guest"] = 0
  479. if not deactivated:
  480. attr_filter["deactivated"] = 0
  481. return self.db.simple_select_list_paginate(
  482. desc="get_users_paginate",
  483. table="users",
  484. orderby="name",
  485. start=start,
  486. limit=limit,
  487. filters=name_filter,
  488. keyvalues=attr_filter,
  489. retcols=[
  490. "name",
  491. "password_hash",
  492. "is_guest",
  493. "admin",
  494. "user_type",
  495. "deactivated",
  496. ],
  497. )
  498. def search_users(self, term):
  499. """Function to search users list for one or more users with
  500. the matched term.
  501. Args:
  502. term (str): search term
  503. col (str): column to query term should be matched to
  504. Returns:
  505. defer.Deferred: resolves to list[dict[str, Any]]
  506. """
  507. return self.db.simple_search_list(
  508. table="users",
  509. term=term,
  510. col="name",
  511. retcols=["name", "password_hash", "is_guest", "admin", "user_type"],
  512. desc="search_users",
  513. )
  514. def are_all_users_on_domain(txn, database_engine, domain):
  515. sql = database_engine.convert_param_style(
  516. "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?"
  517. )
  518. pat = "%:" + domain
  519. txn.execute(sql, (pat,))
  520. num_not_matching = txn.fetchall()[0][0]
  521. if num_not_matching == 0:
  522. return True
  523. return False