__init__.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer
  16. from synapse.storage.devices import DeviceStore
  17. from .appservice import (
  18. ApplicationServiceStore, ApplicationServiceTransactionStore
  19. )
  20. from ._base import LoggingTransaction
  21. from .directory import DirectoryStore
  22. from .events import EventsStore
  23. from .presence import PresenceStore, UserPresenceState
  24. from .profile import ProfileStore
  25. from .registration import RegistrationStore
  26. from .room import RoomStore
  27. from .roommember import RoomMemberStore
  28. from .stream import StreamStore
  29. from .transactions import TransactionStore
  30. from .keys import KeyStore
  31. from .event_federation import EventFederationStore
  32. from .pusher import PusherStore
  33. from .push_rule import PushRuleStore
  34. from .media_repository import MediaRepositoryStore
  35. from .rejections import RejectionsStore
  36. from .event_push_actions import EventPushActionsStore
  37. from .deviceinbox import DeviceInboxStore
  38. from .state import StateStore
  39. from .signatures import SignatureStore
  40. from .filtering import FilteringStore
  41. from .end_to_end_keys import EndToEndKeyStore
  42. from .receipts import ReceiptsStore
  43. from .search import SearchStore
  44. from .tags import TagsStore
  45. from .account_data import AccountDataStore
  46. from .openid import OpenIdStore
  47. from .client_ips import ClientIpStore
  48. from .util.id_generators import IdGenerator, StreamIdGenerator, ChainedIdGenerator
  49. from .engines import PostgresEngine
  50. from synapse.api.constants import PresenceState
  51. from synapse.util.caches.stream_change_cache import StreamChangeCache
  52. import logging
  53. logger = logging.getLogger(__name__)
  54. class DataStore(RoomMemberStore, RoomStore,
  55. RegistrationStore, StreamStore, ProfileStore,
  56. PresenceStore, TransactionStore,
  57. DirectoryStore, KeyStore, StateStore, SignatureStore,
  58. ApplicationServiceStore,
  59. EventFederationStore,
  60. MediaRepositoryStore,
  61. RejectionsStore,
  62. FilteringStore,
  63. PusherStore,
  64. PushRuleStore,
  65. ApplicationServiceTransactionStore,
  66. EventsStore,
  67. ReceiptsStore,
  68. EndToEndKeyStore,
  69. SearchStore,
  70. TagsStore,
  71. AccountDataStore,
  72. EventPushActionsStore,
  73. OpenIdStore,
  74. ClientIpStore,
  75. DeviceStore,
  76. DeviceInboxStore,
  77. ):
  78. def __init__(self, db_conn, hs):
  79. self.hs = hs
  80. self._clock = hs.get_clock()
  81. self.database_engine = hs.database_engine
  82. self._stream_id_gen = StreamIdGenerator(
  83. db_conn, "events", "stream_ordering",
  84. extra_tables=[("local_invites", "stream_id")]
  85. )
  86. self._backfill_id_gen = StreamIdGenerator(
  87. db_conn, "events", "stream_ordering", step=-1,
  88. extra_tables=[("ex_outlier_stream", "event_stream_ordering")]
  89. )
  90. self._receipts_id_gen = StreamIdGenerator(
  91. db_conn, "receipts_linearized", "stream_id"
  92. )
  93. self._account_data_id_gen = StreamIdGenerator(
  94. db_conn, "account_data_max_stream_id", "stream_id"
  95. )
  96. self._presence_id_gen = StreamIdGenerator(
  97. db_conn, "presence_stream", "stream_id"
  98. )
  99. self._device_inbox_id_gen = StreamIdGenerator(
  100. db_conn, "device_max_stream_id", "stream_id"
  101. )
  102. self._public_room_id_gen = StreamIdGenerator(
  103. db_conn, "public_room_list_stream", "stream_id"
  104. )
  105. self._transaction_id_gen = IdGenerator(db_conn, "sent_transactions", "id")
  106. self._state_groups_id_gen = IdGenerator(db_conn, "state_groups", "id")
  107. self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
  108. self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
  109. self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
  110. self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
  111. self._push_rules_stream_id_gen = ChainedIdGenerator(
  112. self._stream_id_gen, db_conn, "push_rules_stream", "stream_id"
  113. )
  114. self._pushers_id_gen = StreamIdGenerator(
  115. db_conn, "pushers", "id",
  116. extra_tables=[("deleted_pushers", "stream_id")],
  117. )
  118. if isinstance(self.database_engine, PostgresEngine):
  119. self._cache_id_gen = StreamIdGenerator(
  120. db_conn, "cache_invalidation_stream", "stream_id",
  121. )
  122. else:
  123. self._cache_id_gen = None
  124. events_max = self._stream_id_gen.get_current_token()
  125. event_cache_prefill, min_event_val = self._get_cache_dict(
  126. db_conn, "events",
  127. entity_column="room_id",
  128. stream_column="stream_ordering",
  129. max_value=events_max,
  130. )
  131. self._events_stream_cache = StreamChangeCache(
  132. "EventsRoomStreamChangeCache", min_event_val,
  133. prefilled_cache=event_cache_prefill,
  134. )
  135. self._membership_stream_cache = StreamChangeCache(
  136. "MembershipStreamChangeCache", events_max,
  137. )
  138. account_max = self._account_data_id_gen.get_current_token()
  139. self._account_data_stream_cache = StreamChangeCache(
  140. "AccountDataAndTagsChangeCache", account_max,
  141. )
  142. self._presence_on_startup = self._get_active_presence(db_conn)
  143. presence_cache_prefill, min_presence_val = self._get_cache_dict(
  144. db_conn, "presence_stream",
  145. entity_column="user_id",
  146. stream_column="stream_id",
  147. max_value=self._presence_id_gen.get_current_token(),
  148. )
  149. self.presence_stream_cache = StreamChangeCache(
  150. "PresenceStreamChangeCache", min_presence_val,
  151. prefilled_cache=presence_cache_prefill
  152. )
  153. push_rules_prefill, push_rules_id = self._get_cache_dict(
  154. db_conn, "push_rules_stream",
  155. entity_column="user_id",
  156. stream_column="stream_id",
  157. max_value=self._push_rules_stream_id_gen.get_current_token()[0],
  158. )
  159. self.push_rules_stream_cache = StreamChangeCache(
  160. "PushRulesStreamChangeCache", push_rules_id,
  161. prefilled_cache=push_rules_prefill,
  162. )
  163. max_device_inbox_id = self._device_inbox_id_gen.get_current_token()
  164. device_inbox_prefill, min_device_inbox_id = self._get_cache_dict(
  165. db_conn, "device_inbox",
  166. entity_column="user_id",
  167. stream_column="stream_id",
  168. max_value=max_device_inbox_id
  169. )
  170. self._device_inbox_stream_cache = StreamChangeCache(
  171. "DeviceInboxStreamChangeCache", min_device_inbox_id,
  172. prefilled_cache=device_inbox_prefill,
  173. )
  174. # The federation outbox and the local device inbox uses the same
  175. # stream_id generator.
  176. device_outbox_prefill, min_device_outbox_id = self._get_cache_dict(
  177. db_conn, "device_federation_outbox",
  178. entity_column="destination",
  179. stream_column="stream_id",
  180. max_value=max_device_inbox_id,
  181. )
  182. self._device_federation_outbox_stream_cache = StreamChangeCache(
  183. "DeviceFederationOutboxStreamChangeCache", min_device_outbox_id,
  184. prefilled_cache=device_outbox_prefill,
  185. )
  186. cur = LoggingTransaction(
  187. db_conn.cursor(),
  188. name="_find_stream_orderings_for_times_txn",
  189. database_engine=self.database_engine,
  190. after_callbacks=[]
  191. )
  192. self._find_stream_orderings_for_times_txn(cur)
  193. cur.close()
  194. self.find_stream_orderings_looping_call = self._clock.looping_call(
  195. self._find_stream_orderings_for_times, 60 * 60 * 1000
  196. )
  197. self._stream_order_on_start = self.get_room_max_stream_ordering()
  198. self._min_stream_order_on_start = self.get_room_min_stream_ordering()
  199. super(DataStore, self).__init__(hs)
  200. def take_presence_startup_info(self):
  201. active_on_startup = self._presence_on_startup
  202. self._presence_on_startup = None
  203. return active_on_startup
  204. def _get_active_presence(self, db_conn):
  205. """Fetch non-offline presence from the database so that we can register
  206. the appropriate time outs.
  207. """
  208. sql = (
  209. "SELECT user_id, state, last_active_ts, last_federation_update_ts,"
  210. " last_user_sync_ts, status_msg, currently_active FROM presence_stream"
  211. " WHERE state != ?"
  212. )
  213. sql = self.database_engine.convert_param_style(sql)
  214. txn = db_conn.cursor()
  215. txn.execute(sql, (PresenceState.OFFLINE,))
  216. rows = self.cursor_to_dict(txn)
  217. txn.close()
  218. for row in rows:
  219. row["currently_active"] = bool(row["currently_active"])
  220. return [UserPresenceState(**row) for row in rows]
  221. @defer.inlineCallbacks
  222. def count_daily_users(self):
  223. """
  224. Counts the number of users who used this homeserver in the last 24 hours.
  225. """
  226. def _count_users(txn):
  227. txn.execute(
  228. "SELECT COUNT(DISTINCT user_id) AS users"
  229. " FROM user_ips"
  230. " WHERE last_seen > ?",
  231. # This is close enough to a day for our purposes.
  232. (int(self._clock.time_msec()) - (1000 * 60 * 60 * 24),)
  233. )
  234. rows = self.cursor_to_dict(txn)
  235. if rows:
  236. return rows[0]["users"]
  237. return 0
  238. ret = yield self.runInteraction("count_users", _count_users)
  239. defer.returnValue(ret)
  240. def get_user_ip_and_agents(self, user):
  241. return self._simple_select_list(
  242. table="user_ips",
  243. keyvalues={"user_id": user.to_string()},
  244. retcols=[
  245. "access_token", "ip", "user_agent", "last_seen"
  246. ],
  247. desc="get_user_ip_and_agents",
  248. )
  249. def are_all_users_on_domain(txn, database_engine, domain):
  250. sql = database_engine.convert_param_style(
  251. "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?"
  252. )
  253. pat = "%:" + domain
  254. txn.execute(sql, (pat,))
  255. num_not_matching = txn.fetchall()[0][0]
  256. if num_not_matching == 0:
  257. return True
  258. return False