account_data.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from ._base import SQLBaseStore
  16. from twisted.internet import defer
  17. from synapse.util.caches.descriptors import cached, cachedList, cachedInlineCallbacks
  18. import ujson as json
  19. import logging
  20. logger = logging.getLogger(__name__)
  21. class AccountDataStore(SQLBaseStore):
  22. @cached()
  23. def get_account_data_for_user(self, user_id):
  24. """Get all the client account_data for a user.
  25. Args:
  26. user_id(str): The user to get the account_data for.
  27. Returns:
  28. A deferred pair of a dict of global account_data and a dict
  29. mapping from room_id string to per room account_data dicts.
  30. """
  31. def get_account_data_for_user_txn(txn):
  32. rows = self._simple_select_list_txn(
  33. txn, "account_data", {"user_id": user_id},
  34. ["account_data_type", "content"]
  35. )
  36. global_account_data = {
  37. row["account_data_type"]: json.loads(row["content"]) for row in rows
  38. }
  39. rows = self._simple_select_list_txn(
  40. txn, "room_account_data", {"user_id": user_id},
  41. ["room_id", "account_data_type", "content"]
  42. )
  43. by_room = {}
  44. for row in rows:
  45. room_data = by_room.setdefault(row["room_id"], {})
  46. room_data[row["account_data_type"]] = json.loads(row["content"])
  47. return (global_account_data, by_room)
  48. return self.runInteraction(
  49. "get_account_data_for_user", get_account_data_for_user_txn
  50. )
  51. @cachedInlineCallbacks(num_args=2)
  52. def get_global_account_data_by_type_for_user(self, data_type, user_id):
  53. """
  54. Returns:
  55. Deferred: A dict
  56. """
  57. result = yield self._simple_select_one_onecol(
  58. table="account_data",
  59. keyvalues={
  60. "user_id": user_id,
  61. "account_data_type": data_type,
  62. },
  63. retcol="content",
  64. desc="get_global_account_data_by_type_for_user",
  65. allow_none=True,
  66. )
  67. if result:
  68. defer.returnValue(json.loads(result))
  69. else:
  70. defer.returnValue(None)
  71. @cachedList(cached_method_name="get_global_account_data_by_type_for_user",
  72. num_args=2, list_name="user_ids", inlineCallbacks=True)
  73. def get_global_account_data_by_type_for_users(self, data_type, user_ids):
  74. rows = yield self._simple_select_many_batch(
  75. table="account_data",
  76. column="user_id",
  77. iterable=user_ids,
  78. keyvalues={
  79. "account_data_type": data_type,
  80. },
  81. retcols=("user_id", "content",),
  82. desc="get_global_account_data_by_type_for_users",
  83. )
  84. defer.returnValue({
  85. row["user_id"]: json.loads(row["content"]) if row["content"] else None
  86. for row in rows
  87. })
  88. def get_account_data_for_room(self, user_id, room_id):
  89. """Get all the client account_data for a user for a room.
  90. Args:
  91. user_id(str): The user to get the account_data for.
  92. room_id(str): The room to get the account_data for.
  93. Returns:
  94. A deferred dict of the room account_data
  95. """
  96. def get_account_data_for_room_txn(txn):
  97. rows = self._simple_select_list_txn(
  98. txn, "room_account_data", {"user_id": user_id, "room_id": room_id},
  99. ["account_data_type", "content"]
  100. )
  101. return {
  102. row["account_data_type"]: json.loads(row["content"]) for row in rows
  103. }
  104. return self.runInteraction(
  105. "get_account_data_for_room", get_account_data_for_room_txn
  106. )
  107. def get_all_updated_account_data(self, last_global_id, last_room_id,
  108. current_id, limit):
  109. """Get all the client account_data that has changed on the server
  110. Args:
  111. last_global_id(int): The position to fetch from for top level data
  112. last_room_id(int): The position to fetch from for per room data
  113. current_id(int): The position to fetch up to.
  114. Returns:
  115. A deferred pair of lists of tuples of stream_id int, user_id string,
  116. room_id string, type string, and content string.
  117. """
  118. def get_updated_account_data_txn(txn):
  119. sql = (
  120. "SELECT stream_id, user_id, account_data_type, content"
  121. " FROM account_data WHERE ? < stream_id AND stream_id <= ?"
  122. " ORDER BY stream_id ASC LIMIT ?"
  123. )
  124. txn.execute(sql, (last_global_id, current_id, limit))
  125. global_results = txn.fetchall()
  126. sql = (
  127. "SELECT stream_id, user_id, room_id, account_data_type, content"
  128. " FROM room_account_data WHERE ? < stream_id AND stream_id <= ?"
  129. " ORDER BY stream_id ASC LIMIT ?"
  130. )
  131. txn.execute(sql, (last_room_id, current_id, limit))
  132. room_results = txn.fetchall()
  133. return (global_results, room_results)
  134. return self.runInteraction(
  135. "get_all_updated_account_data_txn", get_updated_account_data_txn
  136. )
  137. def get_updated_account_data_for_user(self, user_id, stream_id):
  138. """Get all the client account_data for a that's changed for a user
  139. Args:
  140. user_id(str): The user to get the account_data for.
  141. stream_id(int): The point in the stream since which to get updates
  142. Returns:
  143. A deferred pair of a dict of global account_data and a dict
  144. mapping from room_id string to per room account_data dicts.
  145. """
  146. def get_updated_account_data_for_user_txn(txn):
  147. sql = (
  148. "SELECT account_data_type, content FROM account_data"
  149. " WHERE user_id = ? AND stream_id > ?"
  150. )
  151. txn.execute(sql, (user_id, stream_id))
  152. global_account_data = {
  153. row[0]: json.loads(row[1]) for row in txn.fetchall()
  154. }
  155. sql = (
  156. "SELECT room_id, account_data_type, content FROM room_account_data"
  157. " WHERE user_id = ? AND stream_id > ?"
  158. )
  159. txn.execute(sql, (user_id, stream_id))
  160. account_data_by_room = {}
  161. for row in txn.fetchall():
  162. room_account_data = account_data_by_room.setdefault(row[0], {})
  163. room_account_data[row[1]] = json.loads(row[2])
  164. return (global_account_data, account_data_by_room)
  165. changed = self._account_data_stream_cache.has_entity_changed(
  166. user_id, int(stream_id)
  167. )
  168. if not changed:
  169. return ({}, {})
  170. return self.runInteraction(
  171. "get_updated_account_data_for_user", get_updated_account_data_for_user_txn
  172. )
  173. @defer.inlineCallbacks
  174. def add_account_data_to_room(self, user_id, room_id, account_data_type, content):
  175. """Add some account_data to a room for a user.
  176. Args:
  177. user_id(str): The user to add a tag for.
  178. room_id(str): The room to add a tag for.
  179. account_data_type(str): The type of account_data to add.
  180. content(dict): A json object to associate with the tag.
  181. Returns:
  182. A deferred that completes once the account_data has been added.
  183. """
  184. content_json = json.dumps(content)
  185. def add_account_data_txn(txn, next_id):
  186. self._simple_upsert_txn(
  187. txn,
  188. table="room_account_data",
  189. keyvalues={
  190. "user_id": user_id,
  191. "room_id": room_id,
  192. "account_data_type": account_data_type,
  193. },
  194. values={
  195. "stream_id": next_id,
  196. "content": content_json,
  197. }
  198. )
  199. txn.call_after(
  200. self._account_data_stream_cache.entity_has_changed,
  201. user_id, next_id,
  202. )
  203. txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
  204. self._update_max_stream_id(txn, next_id)
  205. with self._account_data_id_gen.get_next() as next_id:
  206. yield self.runInteraction(
  207. "add_room_account_data", add_account_data_txn, next_id
  208. )
  209. result = self._account_data_id_gen.get_current_token()
  210. defer.returnValue(result)
  211. @defer.inlineCallbacks
  212. def add_account_data_for_user(self, user_id, account_data_type, content):
  213. """Add some account_data to a room for a user.
  214. Args:
  215. user_id(str): The user to add a tag for.
  216. account_data_type(str): The type of account_data to add.
  217. content(dict): A json object to associate with the tag.
  218. Returns:
  219. A deferred that completes once the account_data has been added.
  220. """
  221. content_json = json.dumps(content)
  222. def add_account_data_txn(txn, next_id):
  223. self._simple_upsert_txn(
  224. txn,
  225. table="account_data",
  226. keyvalues={
  227. "user_id": user_id,
  228. "account_data_type": account_data_type,
  229. },
  230. values={
  231. "stream_id": next_id,
  232. "content": content_json,
  233. }
  234. )
  235. txn.call_after(
  236. self._account_data_stream_cache.entity_has_changed,
  237. user_id, next_id,
  238. )
  239. txn.call_after(self.get_account_data_for_user.invalidate, (user_id,))
  240. txn.call_after(
  241. self.get_global_account_data_by_type_for_user.invalidate,
  242. (account_data_type, user_id,)
  243. )
  244. self._update_max_stream_id(txn, next_id)
  245. with self._account_data_id_gen.get_next() as next_id:
  246. yield self.runInteraction(
  247. "add_user_account_data", add_account_data_txn, next_id
  248. )
  249. result = self._account_data_id_gen.get_current_token()
  250. defer.returnValue(result)
  251. def _update_max_stream_id(self, txn, next_id):
  252. """Update the max stream_id
  253. Args:
  254. txn: The database cursor
  255. next_id(int): The the revision to advance to.
  256. """
  257. update_max_id_sql = (
  258. "UPDATE account_data_max_stream_id"
  259. " SET stream_id = ?"
  260. " WHERE stream_id < ?"
  261. )
  262. txn.execute(update_max_id_sql, (next_id, next_id))