end_to_end_keys.py 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. # Copyright 2019 New Vector Ltd
  4. # Copyright 2019 The Matrix.org Foundation C.I.C.
  5. #
  6. # Licensed under the Apache License, Version 2.0 (the "License");
  7. # you may not use this file except in compliance with the License.
  8. # You may obtain a copy of the License at
  9. #
  10. # http://www.apache.org/licenses/LICENSE-2.0
  11. #
  12. # Unless required by applicable law or agreed to in writing, software
  13. # distributed under the License is distributed on an "AS IS" BASIS,
  14. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  15. # See the License for the specific language governing permissions and
  16. # limitations under the License.
  17. from typing import Dict, List
  18. from six import iteritems
  19. from canonicaljson import encode_canonical_json, json
  20. from twisted.enterprise.adbapi import Connection
  21. from twisted.internet import defer
  22. from synapse.logging.opentracing import log_kv, set_tag, trace
  23. from synapse.storage._base import SQLBaseStore, db_to_json
  24. from synapse.util.caches.descriptors import cached, cachedList
  25. class EndToEndKeyWorkerStore(SQLBaseStore):
  26. @trace
  27. @defer.inlineCallbacks
  28. def get_e2e_device_keys(
  29. self, query_list, include_all_devices=False, include_deleted_devices=False
  30. ):
  31. """Fetch a list of device keys.
  32. Args:
  33. query_list(list): List of pairs of user_ids and device_ids.
  34. include_all_devices (bool): whether to include entries for devices
  35. that don't have device keys
  36. include_deleted_devices (bool): whether to include null entries for
  37. devices which no longer exist (but were in the query_list).
  38. This option only takes effect if include_all_devices is true.
  39. Returns:
  40. Dict mapping from user-id to dict mapping from device_id to
  41. key data. The key data will be a dict in the same format as the
  42. DeviceKeys type returned by POST /_matrix/client/r0/keys/query.
  43. """
  44. set_tag("query_list", query_list)
  45. if not query_list:
  46. return {}
  47. results = yield self.db.runInteraction(
  48. "get_e2e_device_keys",
  49. self._get_e2e_device_keys_txn,
  50. query_list,
  51. include_all_devices,
  52. include_deleted_devices,
  53. )
  54. # Build the result structure, un-jsonify the results, and add the
  55. # "unsigned" section
  56. rv = {}
  57. for user_id, device_keys in iteritems(results):
  58. rv[user_id] = {}
  59. for device_id, device_info in iteritems(device_keys):
  60. r = db_to_json(device_info.pop("key_json"))
  61. r["unsigned"] = {}
  62. display_name = device_info["device_display_name"]
  63. if display_name is not None:
  64. r["unsigned"]["device_display_name"] = display_name
  65. if "signatures" in device_info:
  66. for sig_user_id, sigs in device_info["signatures"].items():
  67. r.setdefault("signatures", {}).setdefault(
  68. sig_user_id, {}
  69. ).update(sigs)
  70. rv[user_id][device_id] = r
  71. return rv
  72. @trace
  73. def _get_e2e_device_keys_txn(
  74. self, txn, query_list, include_all_devices=False, include_deleted_devices=False
  75. ):
  76. set_tag("include_all_devices", include_all_devices)
  77. set_tag("include_deleted_devices", include_deleted_devices)
  78. query_clauses = []
  79. query_params = []
  80. signature_query_clauses = []
  81. signature_query_params = []
  82. if include_all_devices is False:
  83. include_deleted_devices = False
  84. if include_deleted_devices:
  85. deleted_devices = set(query_list)
  86. for (user_id, device_id) in query_list:
  87. query_clause = "user_id = ?"
  88. query_params.append(user_id)
  89. signature_query_clause = "target_user_id = ?"
  90. signature_query_params.append(user_id)
  91. if device_id is not None:
  92. query_clause += " AND device_id = ?"
  93. query_params.append(device_id)
  94. signature_query_clause += " AND target_device_id = ?"
  95. signature_query_params.append(device_id)
  96. signature_query_clause += " AND user_id = ?"
  97. signature_query_params.append(user_id)
  98. query_clauses.append(query_clause)
  99. signature_query_clauses.append(signature_query_clause)
  100. sql = (
  101. "SELECT user_id, device_id, "
  102. " d.display_name AS device_display_name, "
  103. " k.key_json"
  104. " FROM devices d"
  105. " %s JOIN e2e_device_keys_json k USING (user_id, device_id)"
  106. " WHERE %s AND NOT d.hidden"
  107. ) % (
  108. "LEFT" if include_all_devices else "INNER",
  109. " OR ".join("(" + q + ")" for q in query_clauses),
  110. )
  111. txn.execute(sql, query_params)
  112. rows = self.db.cursor_to_dict(txn)
  113. result = {}
  114. for row in rows:
  115. if include_deleted_devices:
  116. deleted_devices.remove((row["user_id"], row["device_id"]))
  117. result.setdefault(row["user_id"], {})[row["device_id"]] = row
  118. if include_deleted_devices:
  119. for user_id, device_id in deleted_devices:
  120. result.setdefault(user_id, {})[device_id] = None
  121. # get signatures on the device
  122. signature_sql = ("SELECT * FROM e2e_cross_signing_signatures WHERE %s") % (
  123. " OR ".join("(" + q + ")" for q in signature_query_clauses)
  124. )
  125. txn.execute(signature_sql, signature_query_params)
  126. rows = self.db.cursor_to_dict(txn)
  127. # add each cross-signing signature to the correct device in the result dict.
  128. for row in rows:
  129. signing_user_id = row["user_id"]
  130. signing_key_id = row["key_id"]
  131. target_user_id = row["target_user_id"]
  132. target_device_id = row["target_device_id"]
  133. signature = row["signature"]
  134. target_user_result = result.get(target_user_id)
  135. if not target_user_result:
  136. continue
  137. target_device_result = target_user_result.get(target_device_id)
  138. if not target_device_result:
  139. # note that target_device_result will be None for deleted devices.
  140. continue
  141. target_device_signatures = target_device_result.setdefault("signatures", {})
  142. signing_user_signatures = target_device_signatures.setdefault(
  143. signing_user_id, {}
  144. )
  145. signing_user_signatures[signing_key_id] = signature
  146. log_kv(result)
  147. return result
  148. @defer.inlineCallbacks
  149. def get_e2e_one_time_keys(self, user_id, device_id, key_ids):
  150. """Retrieve a number of one-time keys for a user
  151. Args:
  152. user_id(str): id of user to get keys for
  153. device_id(str): id of device to get keys for
  154. key_ids(list[str]): list of key ids (excluding algorithm) to
  155. retrieve
  156. Returns:
  157. deferred resolving to Dict[(str, str), str]: map from (algorithm,
  158. key_id) to json string for key
  159. """
  160. rows = yield self.db.simple_select_many_batch(
  161. table="e2e_one_time_keys_json",
  162. column="key_id",
  163. iterable=key_ids,
  164. retcols=("algorithm", "key_id", "key_json"),
  165. keyvalues={"user_id": user_id, "device_id": device_id},
  166. desc="add_e2e_one_time_keys_check",
  167. )
  168. result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
  169. log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
  170. return result
  171. @defer.inlineCallbacks
  172. def add_e2e_one_time_keys(self, user_id, device_id, time_now, new_keys):
  173. """Insert some new one time keys for a device. Errors if any of the
  174. keys already exist.
  175. Args:
  176. user_id(str): id of user to get keys for
  177. device_id(str): id of device to get keys for
  178. time_now(long): insertion time to record (ms since epoch)
  179. new_keys(iterable[(str, str, str)]: keys to add - each a tuple of
  180. (algorithm, key_id, key json)
  181. """
  182. def _add_e2e_one_time_keys(txn):
  183. set_tag("user_id", user_id)
  184. set_tag("device_id", device_id)
  185. set_tag("new_keys", new_keys)
  186. # We are protected from race between lookup and insertion due to
  187. # a unique constraint. If there is a race of two calls to
  188. # `add_e2e_one_time_keys` then they'll conflict and we will only
  189. # insert one set.
  190. self.db.simple_insert_many_txn(
  191. txn,
  192. table="e2e_one_time_keys_json",
  193. values=[
  194. {
  195. "user_id": user_id,
  196. "device_id": device_id,
  197. "algorithm": algorithm,
  198. "key_id": key_id,
  199. "ts_added_ms": time_now,
  200. "key_json": json_bytes,
  201. }
  202. for algorithm, key_id, json_bytes in new_keys
  203. ],
  204. )
  205. self._invalidate_cache_and_stream(
  206. txn, self.count_e2e_one_time_keys, (user_id, device_id)
  207. )
  208. yield self.db.runInteraction(
  209. "add_e2e_one_time_keys_insert", _add_e2e_one_time_keys
  210. )
  211. @cached(max_entries=10000)
  212. def count_e2e_one_time_keys(self, user_id, device_id):
  213. """ Count the number of one time keys the server has for a device
  214. Returns:
  215. Dict mapping from algorithm to number of keys for that algorithm.
  216. """
  217. def _count_e2e_one_time_keys(txn):
  218. sql = (
  219. "SELECT algorithm, COUNT(key_id) FROM e2e_one_time_keys_json"
  220. " WHERE user_id = ? AND device_id = ?"
  221. " GROUP BY algorithm"
  222. )
  223. txn.execute(sql, (user_id, device_id))
  224. result = {}
  225. for algorithm, key_count in txn:
  226. result[algorithm] = key_count
  227. return result
  228. return self.db.runInteraction(
  229. "count_e2e_one_time_keys", _count_e2e_one_time_keys
  230. )
  231. def _get_e2e_cross_signing_key_txn(self, txn, user_id, key_type, from_user_id=None):
  232. """Returns a user's cross-signing key.
  233. Args:
  234. txn (twisted.enterprise.adbapi.Connection): db connection
  235. user_id (str): the user whose key is being requested
  236. key_type (str): the type of key that is being requested: either 'master'
  237. for a master key, 'self_signing' for a self-signing key, or
  238. 'user_signing' for a user-signing key
  239. from_user_id (str): if specified, signatures made by this user on
  240. the key will be included in the result
  241. Returns:
  242. dict of the key data or None if not found
  243. """
  244. sql = (
  245. "SELECT keydata "
  246. " FROM e2e_cross_signing_keys "
  247. " WHERE user_id = ? AND keytype = ? ORDER BY stream_id DESC LIMIT 1"
  248. )
  249. txn.execute(sql, (user_id, key_type))
  250. row = txn.fetchone()
  251. if not row:
  252. return None
  253. key = json.loads(row[0])
  254. device_id = None
  255. for k in key["keys"].values():
  256. device_id = k
  257. if from_user_id is not None:
  258. sql = (
  259. "SELECT key_id, signature "
  260. " FROM e2e_cross_signing_signatures "
  261. " WHERE user_id = ? "
  262. " AND target_user_id = ? "
  263. " AND target_device_id = ? "
  264. )
  265. txn.execute(sql, (from_user_id, user_id, device_id))
  266. row = txn.fetchone()
  267. if row:
  268. key.setdefault("signatures", {}).setdefault(from_user_id, {})[
  269. row[0]
  270. ] = row[1]
  271. return key
  272. def get_e2e_cross_signing_key(self, user_id, key_type, from_user_id=None):
  273. """Returns a user's cross-signing key.
  274. Args:
  275. user_id (str): the user whose key is being requested
  276. key_type (str): the type of key that is being requested: either 'master'
  277. for a master key, 'self_signing' for a self-signing key, or
  278. 'user_signing' for a user-signing key
  279. from_user_id (str): if specified, signatures made by this user on
  280. the self-signing key will be included in the result
  281. Returns:
  282. dict of the key data or None if not found
  283. """
  284. return self.db.runInteraction(
  285. "get_e2e_cross_signing_key",
  286. self._get_e2e_cross_signing_key_txn,
  287. user_id,
  288. key_type,
  289. from_user_id,
  290. )
  291. @cached(num_args=1)
  292. def _get_bare_e2e_cross_signing_keys(self, user_id):
  293. """Dummy function. Only used to make a cache for
  294. _get_bare_e2e_cross_signing_keys_bulk.
  295. """
  296. raise NotImplementedError()
  297. @cachedList(
  298. cached_method_name="_get_bare_e2e_cross_signing_keys",
  299. list_name="user_ids",
  300. num_args=1,
  301. )
  302. def _get_bare_e2e_cross_signing_keys_bulk(
  303. self, user_ids: List[str]
  304. ) -> Dict[str, Dict[str, dict]]:
  305. """Returns the cross-signing keys for a set of users. The output of this
  306. function should be passed to _get_e2e_cross_signing_signatures_txn if
  307. the signatures for the calling user need to be fetched.
  308. Args:
  309. user_ids (list[str]): the users whose keys are being requested
  310. Returns:
  311. dict[str, dict[str, dict]]: mapping from user ID to key type to key
  312. data. If a user's cross-signing keys were not found, either
  313. their user ID will not be in the dict, or their user ID will map
  314. to None.
  315. """
  316. return self.db.runInteraction(
  317. "get_bare_e2e_cross_signing_keys_bulk",
  318. self._get_bare_e2e_cross_signing_keys_bulk_txn,
  319. user_ids,
  320. )
  321. def _get_bare_e2e_cross_signing_keys_bulk_txn(
  322. self, txn: Connection, user_ids: List[str],
  323. ) -> Dict[str, Dict[str, dict]]:
  324. """Returns the cross-signing keys for a set of users. The output of this
  325. function should be passed to _get_e2e_cross_signing_signatures_txn if
  326. the signatures for the calling user need to be fetched.
  327. Args:
  328. txn (twisted.enterprise.adbapi.Connection): db connection
  329. user_ids (list[str]): the users whose keys are being requested
  330. Returns:
  331. dict[str, dict[str, dict]]: mapping from user ID to key type to key
  332. data. If a user's cross-signing keys were not found, their user
  333. ID will not be in the dict.
  334. """
  335. result = {}
  336. batch_size = 100
  337. chunks = [
  338. user_ids[i : i + batch_size] for i in range(0, len(user_ids), batch_size)
  339. ]
  340. for user_chunk in chunks:
  341. sql = """
  342. SELECT k.user_id, k.keytype, k.keydata, k.stream_id
  343. FROM e2e_cross_signing_keys k
  344. INNER JOIN (SELECT user_id, keytype, MAX(stream_id) AS stream_id
  345. FROM e2e_cross_signing_keys
  346. GROUP BY user_id, keytype) s
  347. USING (user_id, stream_id, keytype)
  348. WHERE k.user_id IN (%s)
  349. """ % (
  350. ",".join("?" for u in user_chunk),
  351. )
  352. query_params = []
  353. query_params.extend(user_chunk)
  354. txn.execute(sql, query_params)
  355. rows = self.db.cursor_to_dict(txn)
  356. for row in rows:
  357. user_id = row["user_id"]
  358. key_type = row["keytype"]
  359. key = json.loads(row["keydata"])
  360. user_info = result.setdefault(user_id, {})
  361. user_info[key_type] = key
  362. return result
  363. def _get_e2e_cross_signing_signatures_txn(
  364. self, txn: Connection, keys: Dict[str, Dict[str, dict]], from_user_id: str,
  365. ) -> Dict[str, Dict[str, dict]]:
  366. """Returns the cross-signing signatures made by a user on a set of keys.
  367. Args:
  368. txn (twisted.enterprise.adbapi.Connection): db connection
  369. keys (dict[str, dict[str, dict]]): a map of user ID to key type to
  370. key data. This dict will be modified to add signatures.
  371. from_user_id (str): fetch the signatures made by this user
  372. Returns:
  373. dict[str, dict[str, dict]]: mapping from user ID to key type to key
  374. data. The return value will be the same as the keys argument,
  375. with the modifications included.
  376. """
  377. # find out what cross-signing keys (a.k.a. devices) we need to get
  378. # signatures for. This is a map of (user_id, device_id) to key type
  379. # (device_id is the key's public part).
  380. devices = {}
  381. for user_id, user_info in keys.items():
  382. if user_info is None:
  383. continue
  384. for key_type, key in user_info.items():
  385. device_id = None
  386. for k in key["keys"].values():
  387. device_id = k
  388. devices[(user_id, device_id)] = key_type
  389. device_list = list(devices)
  390. # split into batches
  391. batch_size = 100
  392. chunks = [
  393. device_list[i : i + batch_size]
  394. for i in range(0, len(device_list), batch_size)
  395. ]
  396. for user_chunk in chunks:
  397. sql = """
  398. SELECT target_user_id, target_device_id, key_id, signature
  399. FROM e2e_cross_signing_signatures
  400. WHERE user_id = ?
  401. AND (%s)
  402. """ % (
  403. " OR ".join(
  404. "(target_user_id = ? AND target_device_id = ?)" for d in devices
  405. )
  406. )
  407. query_params = [from_user_id]
  408. for item in devices:
  409. # item is a (user_id, device_id) tuple
  410. query_params.extend(item)
  411. txn.execute(sql, query_params)
  412. rows = self.db.cursor_to_dict(txn)
  413. # and add the signatures to the appropriate keys
  414. for row in rows:
  415. key_id = row["key_id"]
  416. target_user_id = row["target_user_id"]
  417. target_device_id = row["target_device_id"]
  418. key_type = devices[(target_user_id, target_device_id)]
  419. # We need to copy everything, because the result may have come
  420. # from the cache. dict.copy only does a shallow copy, so we
  421. # need to recursively copy the dicts that will be modified.
  422. user_info = keys[target_user_id] = keys[target_user_id].copy()
  423. target_user_key = user_info[key_type] = user_info[key_type].copy()
  424. if "signatures" in target_user_key:
  425. signatures = target_user_key["signatures"] = target_user_key[
  426. "signatures"
  427. ].copy()
  428. if from_user_id in signatures:
  429. user_sigs = signatures[from_user_id] = signatures[from_user_id]
  430. user_sigs[key_id] = row["signature"]
  431. else:
  432. signatures[from_user_id] = {key_id: row["signature"]}
  433. else:
  434. target_user_key["signatures"] = {
  435. from_user_id: {key_id: row["signature"]}
  436. }
  437. return keys
  438. @defer.inlineCallbacks
  439. def get_e2e_cross_signing_keys_bulk(
  440. self, user_ids: List[str], from_user_id: str = None
  441. ) -> defer.Deferred:
  442. """Returns the cross-signing keys for a set of users.
  443. Args:
  444. user_ids (list[str]): the users whose keys are being requested
  445. from_user_id (str): if specified, signatures made by this user on
  446. the self-signing keys will be included in the result
  447. Returns:
  448. Deferred[dict[str, dict[str, dict]]]: map of user ID to key type to
  449. key data. If a user's cross-signing keys were not found, either
  450. their user ID will not be in the dict, or their user ID will map
  451. to None.
  452. """
  453. result = yield self._get_bare_e2e_cross_signing_keys_bulk(user_ids)
  454. if from_user_id:
  455. result = yield self.db.runInteraction(
  456. "get_e2e_cross_signing_signatures",
  457. self._get_e2e_cross_signing_signatures_txn,
  458. result,
  459. from_user_id,
  460. )
  461. return result
  462. def get_all_user_signature_changes_for_remotes(self, from_key, to_key):
  463. """Return a list of changes from the user signature stream to notify remotes.
  464. Note that the user signature stream represents when a user signs their
  465. device with their user-signing key, which is not published to other
  466. users or servers, so no `destination` is needed in the returned
  467. list. However, this is needed to poke workers.
  468. Args:
  469. from_key (int): the stream ID to start at (exclusive)
  470. to_key (int): the stream ID to end at (inclusive)
  471. Returns:
  472. Deferred[list[(int,str)]] a list of `(stream_id, user_id)`
  473. """
  474. sql = """
  475. SELECT MAX(stream_id) AS stream_id, from_user_id AS user_id
  476. FROM user_signature_stream
  477. WHERE ? < stream_id AND stream_id <= ?
  478. GROUP BY user_id
  479. """
  480. return self.db.execute(
  481. "get_all_user_signature_changes_for_remotes", None, sql, from_key, to_key
  482. )
  483. class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
  484. def set_e2e_device_keys(self, user_id, device_id, time_now, device_keys):
  485. """Stores device keys for a device. Returns whether there was a change
  486. or the keys were already in the database.
  487. """
  488. def _set_e2e_device_keys_txn(txn):
  489. set_tag("user_id", user_id)
  490. set_tag("device_id", device_id)
  491. set_tag("time_now", time_now)
  492. set_tag("device_keys", device_keys)
  493. old_key_json = self.db.simple_select_one_onecol_txn(
  494. txn,
  495. table="e2e_device_keys_json",
  496. keyvalues={"user_id": user_id, "device_id": device_id},
  497. retcol="key_json",
  498. allow_none=True,
  499. )
  500. # In py3 we need old_key_json to match new_key_json type. The DB
  501. # returns unicode while encode_canonical_json returns bytes.
  502. new_key_json = encode_canonical_json(device_keys).decode("utf-8")
  503. if old_key_json == new_key_json:
  504. log_kv({"Message": "Device key already stored."})
  505. return False
  506. self.db.simple_upsert_txn(
  507. txn,
  508. table="e2e_device_keys_json",
  509. keyvalues={"user_id": user_id, "device_id": device_id},
  510. values={"ts_added_ms": time_now, "key_json": new_key_json},
  511. )
  512. log_kv({"message": "Device keys stored."})
  513. return True
  514. return self.db.runInteraction("set_e2e_device_keys", _set_e2e_device_keys_txn)
  515. def claim_e2e_one_time_keys(self, query_list):
  516. """Take a list of one time keys out of the database"""
  517. @trace
  518. def _claim_e2e_one_time_keys(txn):
  519. sql = (
  520. "SELECT key_id, key_json FROM e2e_one_time_keys_json"
  521. " WHERE user_id = ? AND device_id = ? AND algorithm = ?"
  522. " LIMIT 1"
  523. )
  524. result = {}
  525. delete = []
  526. for user_id, device_id, algorithm in query_list:
  527. user_result = result.setdefault(user_id, {})
  528. device_result = user_result.setdefault(device_id, {})
  529. txn.execute(sql, (user_id, device_id, algorithm))
  530. for key_id, key_json in txn:
  531. device_result[algorithm + ":" + key_id] = key_json
  532. delete.append((user_id, device_id, algorithm, key_id))
  533. sql = (
  534. "DELETE FROM e2e_one_time_keys_json"
  535. " WHERE user_id = ? AND device_id = ? AND algorithm = ?"
  536. " AND key_id = ?"
  537. )
  538. for user_id, device_id, algorithm, key_id in delete:
  539. log_kv(
  540. {
  541. "message": "Executing claim e2e_one_time_keys transaction on database."
  542. }
  543. )
  544. txn.execute(sql, (user_id, device_id, algorithm, key_id))
  545. log_kv({"message": "finished executing and invalidating cache"})
  546. self._invalidate_cache_and_stream(
  547. txn, self.count_e2e_one_time_keys, (user_id, device_id)
  548. )
  549. return result
  550. return self.db.runInteraction(
  551. "claim_e2e_one_time_keys", _claim_e2e_one_time_keys
  552. )
  553. def delete_e2e_keys_by_device(self, user_id, device_id):
  554. def delete_e2e_keys_by_device_txn(txn):
  555. log_kv(
  556. {
  557. "message": "Deleting keys for device",
  558. "device_id": device_id,
  559. "user_id": user_id,
  560. }
  561. )
  562. self.db.simple_delete_txn(
  563. txn,
  564. table="e2e_device_keys_json",
  565. keyvalues={"user_id": user_id, "device_id": device_id},
  566. )
  567. self.db.simple_delete_txn(
  568. txn,
  569. table="e2e_one_time_keys_json",
  570. keyvalues={"user_id": user_id, "device_id": device_id},
  571. )
  572. self._invalidate_cache_and_stream(
  573. txn, self.count_e2e_one_time_keys, (user_id, device_id)
  574. )
  575. return self.db.runInteraction(
  576. "delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn
  577. )
  578. def _set_e2e_cross_signing_key_txn(self, txn, user_id, key_type, key):
  579. """Set a user's cross-signing key.
  580. Args:
  581. txn (twisted.enterprise.adbapi.Connection): db connection
  582. user_id (str): the user to set the signing key for
  583. key_type (str): the type of key that is being set: either 'master'
  584. for a master key, 'self_signing' for a self-signing key, or
  585. 'user_signing' for a user-signing key
  586. key (dict): the key data
  587. """
  588. # the 'key' dict will look something like:
  589. # {
  590. # "user_id": "@alice:example.com",
  591. # "usage": ["self_signing"],
  592. # "keys": {
  593. # "ed25519:base64+self+signing+public+key": "base64+self+signing+public+key",
  594. # },
  595. # "signatures": {
  596. # "@alice:example.com": {
  597. # "ed25519:base64+master+public+key": "base64+signature"
  598. # }
  599. # }
  600. # }
  601. # The "keys" property must only have one entry, which will be the public
  602. # key, so we just grab the first value in there
  603. pubkey = next(iter(key["keys"].values()))
  604. # The cross-signing keys need to occupy the same namespace as devices,
  605. # since signatures are identified by device ID. So add an entry to the
  606. # device table to make sure that we don't have a collision with device
  607. # IDs.
  608. # We only need to do this for local users, since remote servers should be
  609. # responsible for checking this for their own users.
  610. if self.hs.is_mine_id(user_id):
  611. self.db.simple_insert_txn(
  612. txn,
  613. "devices",
  614. values={
  615. "user_id": user_id,
  616. "device_id": pubkey,
  617. "display_name": key_type + " signing key",
  618. "hidden": True,
  619. },
  620. )
  621. # and finally, store the key itself
  622. with self._cross_signing_id_gen.get_next() as stream_id:
  623. self.db.simple_insert_txn(
  624. txn,
  625. "e2e_cross_signing_keys",
  626. values={
  627. "user_id": user_id,
  628. "keytype": key_type,
  629. "keydata": json.dumps(key),
  630. "stream_id": stream_id,
  631. },
  632. )
  633. self._invalidate_cache_and_stream(
  634. txn, self._get_bare_e2e_cross_signing_keys, (user_id,)
  635. )
  636. def set_e2e_cross_signing_key(self, user_id, key_type, key):
  637. """Set a user's cross-signing key.
  638. Args:
  639. user_id (str): the user to set the user-signing key for
  640. key_type (str): the type of cross-signing key to set
  641. key (dict): the key data
  642. """
  643. return self.db.runInteraction(
  644. "add_e2e_cross_signing_key",
  645. self._set_e2e_cross_signing_key_txn,
  646. user_id,
  647. key_type,
  648. key,
  649. )
  650. def store_e2e_cross_signing_signatures(self, user_id, signatures):
  651. """Stores cross-signing signatures.
  652. Args:
  653. user_id (str): the user who made the signatures
  654. signatures (iterable[SignatureListItem]): signatures to add
  655. """
  656. return self.db.simple_insert_many(
  657. "e2e_cross_signing_signatures",
  658. [
  659. {
  660. "user_id": user_id,
  661. "key_id": item.signing_key_id,
  662. "target_user_id": item.target_user_id,
  663. "target_device_id": item.target_device_id,
  664. "signature": item.signature,
  665. }
  666. for item in signatures
  667. ],
  668. "add_e2e_signing_key",
  669. )