account_data.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from ._base import SQLBaseStore
  16. from twisted.internet import defer
  17. from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
  18. import ujson as json
  19. import logging
  20. logger = logging.getLogger(__name__)
  21. class AccountDataStore(SQLBaseStore):
  22. @cached()
  23. def get_account_data_for_user(self, user_id):
  24. """Get all the client account_data for a user.
  25. Args:
  26. user_id(str): The user to get the account_data for.
  27. Returns:
  28. A deferred pair of a dict of global account_data and a dict
  29. mapping from room_id string to per room account_data dicts.
  30. """
  31. def get_account_data_for_user_txn(txn):
  32. rows = self._simple_select_list_txn(
  33. txn, "account_data", {"user_id": user_id},
  34. ["account_data_type", "content"]
  35. )
  36. global_account_data = {
  37. row["account_data_type"]: json.loads(row["content"]) for row in rows
  38. }
  39. rows = self._simple_select_list_txn(
  40. txn, "room_account_data", {"user_id": user_id},
  41. ["room_id", "account_data_type", "content"]
  42. )
  43. by_room = {}
  44. for row in rows:
  45. room_data = by_room.setdefault(row["room_id"], {})
  46. room_data[row["account_data_type"]] = json.loads(row["content"])
  47. return (global_account_data, by_room)
  48. return self.runInteraction(
  49. "get_account_data_for_user", get_account_data_for_user_txn
  50. )
  51. @cachedInlineCallbacks(num_args=2)
  52. def get_global_account_data_by_type_for_user(self, data_type, user_id):
  53. """
  54. Returns:
  55. Deferred: A dict
  56. """
  57. result = yield self._simple_select_one_onecol(
  58. table="account_data",
  59. keyvalues={
  60. "user_id": user_id,
  61. "account_data_type": data_type,
  62. },
  63. retcol="content",
  64. desc="get_global_account_data_by_type_for_user",
  65. allow_none=True,
  66. )
  67. if result:
  68. defer.returnValue(json.loads(result))
  69. else:
  70. defer.returnValue(None)
  71. @cachedList(cached_method_name="get_global_account_data_by_type_for_user",
  72. num_args=2, list_name="user_ids", inlineCallbacks=True)
  73. def get_global_account_data_by_type_for_users(self, data_type, user_ids):
  74. rows = yield self._simple_select_many_batch(
  75. table="account_data",
  76. column="user_id",
  77. iterable=user_ids,
  78. keyvalues={
  79. "account_data_type": data_type,
  80. },
  81. retcols=("user_id", "content",),
  82. desc="get_global_account_data_by_type_for_users",
  83. )
  84. defer.returnValue({
  85. row["user_id"]: json.loads(row["content"]) if row["content"] else None
  86. for row in rows
  87. })
  88. def get_account_data_for_room(self, user_id, room_id):
  89. """Get all the client account_data for a user for a room.
  90. Args:
  91. user_id(str): The user to get the account_data for.
  92. room_id(str): The room to get the account_data for.
  93. Returns:
  94. A deferred dict of the room account_data
  95. """
  96. def get_account_data_for_room_txn(txn):
  97. rows = self._simple_select_list_txn(
  98. txn, "room_account_data", {"user_id": user_id, "room_id": room_id},
  99. ["account_data_type", "content"]
  100. )
  101. return {
  102. row["account_data_type"]: json.loads(row["content"]) for row in rows
  103. }
  104. return self.runInteraction(
  105. "get_account_data_for_room", get_account_data_for_room_txn
  106. )
  107. def get_all_updated_account_data(self, last_global_id, last_room_id,
  108. current_id, limit):
  109. """Get all the client account_data that has changed on the server
  110. Args:
  111. last_global_id(int): The position to fetch from for top level data
  112. last_room_id(int): The position to fetch from for per room data
  113. current_id(int): The position to fetch up to.
  114. Returns:
  115. A deferred pair of lists of tuples of stream_id int, user_id string,
  116. room_id string, type string, and content string.
  117. """
  118. if last_room_id == current_id and last_global_id == current_id:
  119. return defer.succeed(([], []))
  120. def get_updated_account_data_txn(txn):
  121. sql = (
  122. "SELECT stream_id, user_id, account_data_type, content"
  123. " FROM account_data WHERE ? < stream_id AND stream_id <= ?"
  124. " ORDER BY stream_id ASC LIMIT ?"
  125. )
  126. txn.execute(sql, (last_global_id, current_id, limit))
  127. global_results = txn.fetchall()
  128. sql = (
  129. "SELECT stream_id, user_id, room_id, account_data_type, content"
  130. " FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
  131. " ORDER BY stream_id ASC LIMIT ?"
  132. )
  133. txn.execute(sql, (last_room_id, current_id, limit))
  134. room_results = txn.fetchall()
  135. return (global_results, room_results)
  136. return self.runInteraction(
  137. "get_all_updated_account_data_txn", get_updated_account_data_txn
  138. )
  139. def get_updated_account_data_for_user(self, user_id, stream_id):
  140. """Get all the client account_data for a that's changed for a user
  141. Args:
  142. user_id(str): The user to get the account_data for.
  143. stream_id(int): The point in the stream since which to get updates
  144. Returns:
  145. A deferred pair of a dict of global account_data and a dict
  146. mapping from room_id string to per room account_data dicts.
  147. """
  148. def get_updated_account_data_for_user_txn(txn):
  149. sql = (
  150. "SELECT account_data_type, content FROM account_data"
  151. " WHERE user_id = ? AND stream_id > ?"
  152. )
  153. txn.execute(sql, (user_id, stream_id))
  154. global_account_data = {
  155. row[0]: json.loads(row[1]) for row in txn.fetchall()
  156. }
  157. sql = (
  158. "SELECT room_id, account_data_type, content FROM room_account_data"
  159. " WHERE user_id = ? AND stream_id > ?"
  160. )
  161. txn.execute(sql, (user_id, stream_id))
  162. account_data_by_room = {}
  163. for row in txn.fetchall():
  164. room_account_data = account_data_by_room.setdefault(row[0], {})
  165. room_account_data[row[1]] = json.loads(row[2])
  166. return (global_account_data, account_data_by_room)
  167. changed = self._account_data_stream_cache.has_entity_changed(
  168. user_id, int(stream_id)
  169. )
  170. if not changed:
  171. return ({}, {})
  172. return self.runInteraction(
  173. "get_updated_account_data_for_user", get_updated_account_data_for_user_txn
  174. )
  175. @defer.inlineCallbacks
  176. def add_account_data_to_room(self, user_id, room_id, account_data_type, content):
  177. """Add some account_data to a room for a user.
  178. Args:
  179. user_id(str): The user to add a tag for.
  180. room_id(str): The room to add a tag for.
  181. account_data_type(str): The type of account_data to add.
  182. content(dict): A json object to associate with the tag.
  183. Returns:
  184. A deferred that completes once the account_data has been added.
  185. """
  186. content_json = json.dumps(content)
  187. def add_account_data_txn(txn, next_id):
  188. self._simple_upsert_txn(
  189. txn,
  190. table="room_account_data",
  191. keyvalues={
  192. "user_id": user_id,
  193. "room_id": room_id,
  194. "account_data_type": account_data_type,
  195. },
  196. values={
  197. "stream_id": next_id,
  198. "content": content_json,
  199. }
  200. )
  201. txn.call_after(
  202. self._account_data_stream_cache.entity_has_changed,
  203. user_id, next_id,
  204. )
  205. txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
  206. self._update_max_stream_id(txn, next_id)
  207. with self._account_data_id_gen.get_next() as next_id:
  208. yield self.runInteraction(
  209. "add_room_account_data", add_account_data_txn, next_id
  210. )
  211. result = self._account_data_id_gen.get_current_token()
  212. defer.returnValue(result)
  213. @defer.inlineCallbacks
  214. def add_account_data_for_user(self, user_id, account_data_type, content):
  215. """Add some account_data to a room for a user.
  216. Args:
  217. user_id(str): The user to add a tag for.
  218. account_data_type(str): The type of account_data to add.
  219. content(dict): A json object to associate with the tag.
  220. Returns:
  221. A deferred that completes once the account_data has been added.
  222. """
  223. content_json = json.dumps(content)
  224. def add_account_data_txn(txn, next_id):
  225. self._simple_upsert_txn(
  226. txn,
  227. table="account_data",
  228. keyvalues={
  229. "user_id": user_id,
  230. "account_data_type": account_data_type,
  231. },
  232. values={
  233. "stream_id": next_id,
  234. "content": content_json,
  235. }
  236. )
  237. txn.call_after(
  238. self._account_data_stream_cache.entity_has_changed,
  239. user_id, next_id,
  240. )
  241. txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
  242. txn.call_after(
  243. self.get_global_account_data_by_type_for_user.invalidate,
  244. (account_data_type, user_id,)
  245. )
  246. self._update_max_stream_id(txn, next_id)
  247. with self._account_data_id_gen.get_next() as next_id:
  248. yield self.runInteraction(
  249. "add_user_account_data", add_account_data_txn, next_id
  250. )
  251. result = self._account_data_id_gen.get_current_token()
  252. defer.returnValue(result)
  253. def _update_max_stream_id(self, txn, next_id):
  254. """Update the max stream_id
  255. Args:
  256. txn: The database cursor
  257. next_id(int): The the revision to advance to.
  258. """
  259. update_max_id_sql = (
  260. "UPDATE account_data_max_stream_id"
  261. " SET stream_id = ?"
  262. " WHERE stream_id < ?"
  263. )
  264. txn.execute(update_max_id_sql, (next_id, next_id))