room.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer
  16. from synapse.api.errors import StoreError
  17. from synapse.util.caches.descriptors import cached
  18. from ._base import SQLBaseStore
  19. from .engines import PostgresEngine, Sqlite3Engine
  20. import collections
  21. import logging
  22. import ujson as json
  23. logger = logging.getLogger(__name__)
  24. OpsLevel = collections.namedtuple(
  25. "OpsLevel",
  26. ("ban_level", "kick_level", "redact_level",)
  27. )
  28. class RoomStore(SQLBaseStore):
  29. @defer.inlineCallbacks
  30. def store_room(self, room_id, room_creator_user_id, is_public):
  31. """Stores a room.
  32. Args:
  33. room_id (str): The desired room ID, can be None.
  34. room_creator_user_id (str): The user ID of the room creator.
  35. is_public (bool): True to indicate that this room should appear in
  36. public room lists.
  37. Raises:
  38. StoreError if the room could not be stored.
  39. """
  40. try:
  41. def store_room_txn(txn, next_id):
  42. self._simple_insert_txn(
  43. txn,
  44. "rooms",
  45. {
  46. "room_id": room_id,
  47. "creator": room_creator_user_id,
  48. "is_public": is_public,
  49. },
  50. )
  51. if is_public:
  52. self._simple_insert_txn(
  53. txn,
  54. table="public_room_list_stream",
  55. values={
  56. "stream_id": next_id,
  57. "room_id": room_id,
  58. "visibility": is_public,
  59. }
  60. )
  61. with self._public_room_id_gen.get_next() as next_id:
  62. yield self.runInteraction(
  63. "store_room_txn",
  64. store_room_txn, next_id,
  65. )
  66. except Exception as e:
  67. logger.error("store_room with room_id=%s failed: %s", room_id, e)
  68. raise StoreError(500, "Problem creating room.")
  69. def get_room(self, room_id):
  70. """Retrieve a room.
  71. Args:
  72. room_id (str): The ID of the room to retrieve.
  73. Returns:
  74. A namedtuple containing the room information, or an empty list.
  75. """
  76. return self._simple_select_one(
  77. table="rooms",
  78. keyvalues={"room_id": room_id},
  79. retcols=("room_id", "is_public", "creator"),
  80. desc="get_room",
  81. allow_none=True,
  82. )
  83. @defer.inlineCallbacks
  84. def set_room_is_public(self, room_id, is_public):
  85. def set_room_is_public_txn(txn, next_id):
  86. self._simple_update_one_txn(
  87. txn,
  88. table="rooms",
  89. keyvalues={"room_id": room_id},
  90. updatevalues={"is_public": is_public},
  91. )
  92. entries = self._simple_select_list_txn(
  93. txn,
  94. table="public_room_list_stream",
  95. keyvalues={
  96. "room_id": room_id,
  97. "appservice_id": None,
  98. "network_id": None,
  99. },
  100. retcols=("stream_id", "visibility"),
  101. )
  102. entries.sort(key=lambda r: r["stream_id"])
  103. add_to_stream = True
  104. if entries:
  105. add_to_stream = bool(entries[-1]["visibility"]) != is_public
  106. if add_to_stream:
  107. self._simple_insert_txn(
  108. txn,
  109. table="public_room_list_stream",
  110. values={
  111. "stream_id": next_id,
  112. "room_id": room_id,
  113. "visibility": is_public,
  114. "appservice_id": None,
  115. "network_id": None,
  116. }
  117. )
  118. with self._public_room_id_gen.get_next() as next_id:
  119. yield self.runInteraction(
  120. "set_room_is_public",
  121. set_room_is_public_txn, next_id,
  122. )
  123. self.hs.get_notifier().on_new_replication_data()
  124. @defer.inlineCallbacks
  125. def set_room_is_public_appservice(self, room_id, appservice_id, network_id,
  126. is_public):
  127. """Edit the appservice/network specific public room list.
  128. Each appservice can have a number of published room lists associated
  129. with them, keyed off of an appservice defined `network_id`, which
  130. basically represents a single instance of a bridge to a third party
  131. network.
  132. Args:
  133. room_id (str)
  134. appservice_id (str)
  135. network_id (str)
  136. is_public (bool): Whether to publish or unpublish the room from the
  137. list.
  138. """
  139. def set_room_is_public_appservice_txn(txn, next_id):
  140. if is_public:
  141. try:
  142. self._simple_insert_txn(
  143. txn,
  144. table="appservice_room_list",
  145. values={
  146. "appservice_id": appservice_id,
  147. "network_id": network_id,
  148. "room_id": room_id
  149. },
  150. )
  151. except self.database_engine.module.IntegrityError:
  152. # We've already inserted, nothing to do.
  153. return
  154. else:
  155. self._simple_delete_txn(
  156. txn,
  157. table="appservice_room_list",
  158. keyvalues={
  159. "appservice_id": appservice_id,
  160. "network_id": network_id,
  161. "room_id": room_id
  162. },
  163. )
  164. entries = self._simple_select_list_txn(
  165. txn,
  166. table="public_room_list_stream",
  167. keyvalues={
  168. "room_id": room_id,
  169. "appservice_id": appservice_id,
  170. "network_id": network_id,
  171. },
  172. retcols=("stream_id", "visibility"),
  173. )
  174. entries.sort(key=lambda r: r["stream_id"])
  175. add_to_stream = True
  176. if entries:
  177. add_to_stream = bool(entries[-1]["visibility"]) != is_public
  178. if add_to_stream:
  179. self._simple_insert_txn(
  180. txn,
  181. table="public_room_list_stream",
  182. values={
  183. "stream_id": next_id,
  184. "room_id": room_id,
  185. "visibility": is_public,
  186. "appservice_id": appservice_id,
  187. "network_id": network_id,
  188. }
  189. )
  190. with self._public_room_id_gen.get_next() as next_id:
  191. yield self.runInteraction(
  192. "set_room_is_public_appservice",
  193. set_room_is_public_appservice_txn, next_id,
  194. )
  195. self.hs.get_notifier().on_new_replication_data()
  196. def get_public_room_ids(self):
  197. return self._simple_select_onecol(
  198. table="rooms",
  199. keyvalues={
  200. "is_public": True,
  201. },
  202. retcol="room_id",
  203. desc="get_public_room_ids",
  204. )
  205. def get_room_count(self):
  206. """Retrieve a list of all rooms
  207. """
  208. def f(txn):
  209. sql = "SELECT count(*) FROM rooms"
  210. txn.execute(sql)
  211. row = txn.fetchone()
  212. return row[0] or 0
  213. return self.runInteraction(
  214. "get_rooms", f
  215. )
  216. def _store_room_topic_txn(self, txn, event):
  217. if hasattr(event, "content") and "topic" in event.content:
  218. self._simple_insert_txn(
  219. txn,
  220. "topics",
  221. {
  222. "event_id": event.event_id,
  223. "room_id": event.room_id,
  224. "topic": event.content["topic"],
  225. },
  226. )
  227. self._store_event_search_txn(
  228. txn, event, "content.topic", event.content["topic"]
  229. )
  230. def _store_room_name_txn(self, txn, event):
  231. if hasattr(event, "content") and "name" in event.content:
  232. self._simple_insert_txn(
  233. txn,
  234. "room_names",
  235. {
  236. "event_id": event.event_id,
  237. "room_id": event.room_id,
  238. "name": event.content["name"],
  239. }
  240. )
  241. self._store_event_search_txn(
  242. txn, event, "content.name", event.content["name"]
  243. )
  244. def _store_room_message_txn(self, txn, event):
  245. if hasattr(event, "content") and "body" in event.content:
  246. self._store_event_search_txn(
  247. txn, event, "content.body", event.content["body"]
  248. )
  249. def _store_history_visibility_txn(self, txn, event):
  250. self._store_content_index_txn(txn, event, "history_visibility")
  251. def _store_guest_access_txn(self, txn, event):
  252. self._store_content_index_txn(txn, event, "guest_access")
  253. def _store_content_index_txn(self, txn, event, key):
  254. if hasattr(event, "content") and key in event.content:
  255. sql = (
  256. "INSERT INTO %(key)s"
  257. " (event_id, room_id, %(key)s)"
  258. " VALUES (?, ?, ?)" % {"key": key}
  259. )
  260. txn.execute(sql, (
  261. event.event_id,
  262. event.room_id,
  263. event.content[key]
  264. ))
  265. def _store_event_search_txn(self, txn, event, key, value):
  266. if isinstance(self.database_engine, PostgresEngine):
  267. sql = (
  268. "INSERT INTO event_search"
  269. " (event_id, room_id, key, vector, stream_ordering, origin_server_ts)"
  270. " VALUES (?,?,?,to_tsvector('english', ?),?,?)"
  271. )
  272. txn.execute(
  273. sql,
  274. (
  275. event.event_id, event.room_id, key, value,
  276. event.internal_metadata.stream_ordering,
  277. event.origin_server_ts,
  278. )
  279. )
  280. elif isinstance(self.database_engine, Sqlite3Engine):
  281. sql = (
  282. "INSERT INTO event_search (event_id, room_id, key, value)"
  283. " VALUES (?,?,?,?)"
  284. )
  285. txn.execute(sql, (event.event_id, event.room_id, key, value,))
  286. else:
  287. # This should be unreachable.
  288. raise Exception("Unrecognized database engine")
  289. def add_event_report(self, room_id, event_id, user_id, reason, content,
  290. received_ts):
  291. next_id = self._event_reports_id_gen.get_next()
  292. return self._simple_insert(
  293. table="event_reports",
  294. values={
  295. "id": next_id,
  296. "received_ts": received_ts,
  297. "room_id": room_id,
  298. "event_id": event_id,
  299. "user_id": user_id,
  300. "reason": reason,
  301. "content": json.dumps(content),
  302. },
  303. desc="add_event_report"
  304. )
  305. def get_current_public_room_stream_id(self):
  306. return self._public_room_id_gen.get_current_token()
  307. @cached(num_args=2, max_entries=100)
  308. def get_public_room_ids_at_stream_id(self, stream_id, network_tuple):
  309. """Get pulbic rooms for a particular list, or across all lists.
  310. Args:
  311. stream_id (int)
  312. network_tuple (ThirdPartyInstanceID): The list to use (None, None)
  313. means the main list, None means all lsits.
  314. """
  315. return self.runInteraction(
  316. "get_public_room_ids_at_stream_id",
  317. self.get_public_room_ids_at_stream_id_txn,
  318. stream_id, network_tuple=network_tuple
  319. )
  320. def get_public_room_ids_at_stream_id_txn(self, txn, stream_id,
  321. network_tuple):
  322. return {
  323. rm
  324. for rm, vis in self.get_published_at_stream_id_txn(
  325. txn, stream_id, network_tuple=network_tuple
  326. ).items()
  327. if vis
  328. }
  329. def get_published_at_stream_id_txn(self, txn, stream_id, network_tuple):
  330. if network_tuple:
  331. # We want to get from a particular list. No aggregation required.
  332. sql = ("""
  333. SELECT room_id, visibility FROM public_room_list_stream
  334. INNER JOIN (
  335. SELECT room_id, max(stream_id) AS stream_id
  336. FROM public_room_list_stream
  337. WHERE stream_id <= ? %s
  338. GROUP BY room_id
  339. ) grouped USING (room_id, stream_id)
  340. """)
  341. if network_tuple.appservice_id is not None:
  342. txn.execute(
  343. sql % ("AND appservice_id = ? AND network_id = ?",),
  344. (stream_id, network_tuple.appservice_id, network_tuple.network_id,)
  345. )
  346. else:
  347. txn.execute(
  348. sql % ("AND appservice_id IS NULL",),
  349. (stream_id,)
  350. )
  351. return dict(txn.fetchall())
  352. else:
  353. # We want to get from all lists, so we need to aggregate the results
  354. logger.info("Executing full list")
  355. sql = ("""
  356. SELECT room_id, visibility
  357. FROM public_room_list_stream
  358. INNER JOIN (
  359. SELECT
  360. room_id, max(stream_id) AS stream_id, appservice_id,
  361. network_id
  362. FROM public_room_list_stream
  363. WHERE stream_id <= ?
  364. GROUP BY room_id, appservice_id, network_id
  365. ) grouped USING (room_id, stream_id)
  366. """)
  367. txn.execute(
  368. sql,
  369. (stream_id,)
  370. )
  371. results = {}
  372. # A room is visible if its visible on any list.
  373. for room_id, visibility in txn.fetchall():
  374. results[room_id] = bool(visibility) or results.get(room_id, False)
  375. return results
  376. def get_public_room_changes(self, prev_stream_id, new_stream_id,
  377. network_tuple):
  378. def get_public_room_changes_txn(txn):
  379. then_rooms = self.get_public_room_ids_at_stream_id_txn(
  380. txn, prev_stream_id, network_tuple
  381. )
  382. now_rooms_dict = self.get_published_at_stream_id_txn(
  383. txn, new_stream_id, network_tuple
  384. )
  385. now_rooms_visible = set(
  386. rm for rm, vis in now_rooms_dict.items() if vis
  387. )
  388. now_rooms_not_visible = set(
  389. rm for rm, vis in now_rooms_dict.items() if not vis
  390. )
  391. newly_visible = now_rooms_visible - then_rooms
  392. newly_unpublished = now_rooms_not_visible & then_rooms
  393. return newly_visible, newly_unpublished
  394. return self.runInteraction(
  395. "get_public_room_changes", get_public_room_changes_txn
  396. )
  397. def get_all_new_public_rooms(self, prev_id, current_id, limit):
  398. def get_all_new_public_rooms(txn):
  399. sql = ("""
  400. SELECT stream_id, room_id, visibility, appservice_id, network_id
  401. FROM public_room_list_stream
  402. WHERE stream_id > ? AND stream_id <= ?
  403. ORDER BY stream_id ASC
  404. LIMIT ?
  405. """)
  406. txn.execute(sql, (prev_id, current_id, limit,))
  407. return txn.fetchall()
  408. if prev_id == current_id:
  409. return defer.succeed([])
  410. return self.runInteraction(
  411. "get_all_new_public_rooms", get_all_new_public_rooms
  412. )