end_to_end_keys.py 52 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. # Copyright 2019 New Vector Ltd
  3. # Copyright 2019,2020 The Matrix.org Foundation C.I.C.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import abc
  17. from typing import (
  18. TYPE_CHECKING,
  19. Collection,
  20. Dict,
  21. Iterable,
  22. List,
  23. Mapping,
  24. Optional,
  25. Sequence,
  26. Tuple,
  27. Union,
  28. cast,
  29. overload,
  30. )
  31. import attr
  32. from canonicaljson import encode_canonical_json
  33. from typing_extensions import Literal
  34. from synapse.api.constants import DeviceKeyAlgorithms
  35. from synapse.appservice import (
  36. TransactionOneTimeKeysCount,
  37. TransactionUnusedFallbackKeys,
  38. )
  39. from synapse.logging.opentracing import log_kv, set_tag, trace
  40. from synapse.storage._base import SQLBaseStore, db_to_json
  41. from synapse.storage.database import (
  42. DatabasePool,
  43. LoggingDatabaseConnection,
  44. LoggingTransaction,
  45. make_in_list_sql_clause,
  46. make_tuple_in_list_sql_clause,
  47. )
  48. from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
  49. from synapse.storage.engines import PostgresEngine
  50. from synapse.storage.util.id_generators import StreamIdGenerator
  51. from synapse.types import JsonDict
  52. from synapse.util import json_encoder
  53. from synapse.util.caches.descriptors import cached, cachedList
  54. from synapse.util.cancellation import cancellable
  55. from synapse.util.iterutils import batch_iter
  56. if TYPE_CHECKING:
  57. from synapse.handlers.e2e_keys import SignatureListItem
  58. from synapse.server import HomeServer
  59. @attr.s(slots=True, auto_attribs=True)
  60. class DeviceKeyLookupResult:
  61. """The type returned by get_e2e_device_keys_and_signatures"""
  62. display_name: Optional[str]
  63. # the key data from e2e_device_keys_json. Typically includes fields like
  64. # "algorithm", "keys" (including the curve25519 identity key and the ed25519 signing
  65. # key) and "signatures" (a map from (user id) to (key id/device_id) to signature.)
  66. keys: Optional[JsonDict]
  67. class EndToEndKeyBackgroundStore(SQLBaseStore):
  68. def __init__(
  69. self,
  70. database: DatabasePool,
  71. db_conn: LoggingDatabaseConnection,
  72. hs: "HomeServer",
  73. ):
  74. super().__init__(database, db_conn, hs)
  75. self.db_pool.updates.register_background_index_update(
  76. "e2e_cross_signing_keys_idx",
  77. index_name="e2e_cross_signing_keys_stream_idx",
  78. table="e2e_cross_signing_keys",
  79. columns=["stream_id"],
  80. unique=True,
  81. )
  82. class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorkerStore):
  83. def __init__(
  84. self,
  85. database: DatabasePool,
  86. db_conn: LoggingDatabaseConnection,
  87. hs: "HomeServer",
  88. ):
  89. super().__init__(database, db_conn, hs)
  90. self._allow_device_name_lookup_over_federation = (
  91. self.hs.config.federation.allow_device_name_lookup_over_federation
  92. )
  93. async def get_e2e_device_keys_for_federation_query(
  94. self, user_id: str
  95. ) -> Tuple[int, List[JsonDict]]:
  96. """Get all devices (with any device keys) for a user
  97. Returns:
  98. (stream_id, devices)
  99. """
  100. now_stream_id = self.get_device_stream_token()
  101. devices = await self.get_e2e_device_keys_and_signatures([(user_id, None)])
  102. if devices:
  103. user_devices = devices[user_id]
  104. results = []
  105. for device_id, device in user_devices.items():
  106. result: JsonDict = {"device_id": device_id}
  107. keys = device.keys
  108. if keys:
  109. result["keys"] = keys
  110. device_display_name = None
  111. if self._allow_device_name_lookup_over_federation:
  112. device_display_name = device.display_name
  113. if device_display_name:
  114. result["device_display_name"] = device_display_name
  115. results.append(result)
  116. return now_stream_id, results
  117. return now_stream_id, []
  118. @trace
  119. @cancellable
  120. async def get_e2e_device_keys_for_cs_api(
  121. self,
  122. query_list: Collection[Tuple[str, Optional[str]]],
  123. include_displaynames: bool = True,
  124. ) -> Dict[str, Dict[str, JsonDict]]:
  125. """Fetch a list of device keys, formatted suitably for the C/S API.
  126. Args:
  127. query_list: List of pairs of user_ids and device_ids.
  128. include_displaynames: Whether to include the displayname of returned devices
  129. (if one exists).
  130. Returns:
  131. Dict mapping from user-id to dict mapping from device_id to
  132. key data. The key data will be a dict in the same format as the
  133. DeviceKeys type returned by POST /_matrix/client/r0/keys/query.
  134. """
  135. set_tag("query_list", str(query_list))
  136. if not query_list:
  137. return {}
  138. results = await self.get_e2e_device_keys_and_signatures(query_list)
  139. # Build the result structure, un-jsonify the results, and add the
  140. # "unsigned" section
  141. rv: Dict[str, Dict[str, JsonDict]] = {}
  142. for user_id, device_keys in results.items():
  143. rv[user_id] = {}
  144. for device_id, device_info in device_keys.items():
  145. r = device_info.keys
  146. if r is None:
  147. continue
  148. r["unsigned"] = {}
  149. if include_displaynames:
  150. # Include the device's display name in the "unsigned" dictionary
  151. display_name = device_info.display_name
  152. if display_name is not None:
  153. r["unsigned"]["device_display_name"] = display_name
  154. rv[user_id][device_id] = r
  155. return rv
  156. @overload
  157. async def get_e2e_device_keys_and_signatures(
  158. self,
  159. query_list: Collection[Tuple[str, Optional[str]]],
  160. include_all_devices: Literal[False] = False,
  161. ) -> Dict[str, Dict[str, DeviceKeyLookupResult]]:
  162. ...
  163. @overload
  164. async def get_e2e_device_keys_and_signatures(
  165. self,
  166. query_list: Collection[Tuple[str, Optional[str]]],
  167. include_all_devices: bool = False,
  168. include_deleted_devices: Literal[False] = False,
  169. ) -> Dict[str, Dict[str, DeviceKeyLookupResult]]:
  170. ...
  171. @overload
  172. async def get_e2e_device_keys_and_signatures(
  173. self,
  174. query_list: Collection[Tuple[str, Optional[str]]],
  175. include_all_devices: Literal[True],
  176. include_deleted_devices: Literal[True],
  177. ) -> Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]]:
  178. ...
  179. @trace
  180. @cancellable
  181. async def get_e2e_device_keys_and_signatures(
  182. self,
  183. query_list: Collection[Tuple[str, Optional[str]]],
  184. include_all_devices: bool = False,
  185. include_deleted_devices: bool = False,
  186. ) -> Union[
  187. Dict[str, Dict[str, DeviceKeyLookupResult]],
  188. Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]],
  189. ]:
  190. """Fetch a list of device keys
  191. Any cross-signatures made on the keys by the owner of the device are also
  192. included.
  193. The cross-signatures are added to the `signatures` field within the `keys`
  194. object in the response.
  195. Args:
  196. query_list: List of pairs of user_ids and device_ids. Device id can be None
  197. to indicate "all devices for this user"
  198. include_all_devices: whether to return devices without device keys
  199. include_deleted_devices: whether to include null entries for
  200. devices which no longer exist (but were in the query_list).
  201. This option only takes effect if include_all_devices is true.
  202. Returns:
  203. Dict mapping from user-id to dict mapping from device_id to
  204. key data.
  205. """
  206. set_tag("include_all_devices", include_all_devices)
  207. set_tag("include_deleted_devices", include_deleted_devices)
  208. result = await self._get_e2e_device_keys(
  209. query_list,
  210. include_all_devices,
  211. include_deleted_devices,
  212. )
  213. # get the (user_id, device_id) tuples to look up cross-signatures for
  214. signature_query = (
  215. (user_id, device_id)
  216. for user_id, dev in result.items()
  217. for device_id, d in dev.items()
  218. if d is not None and d.keys is not None
  219. )
  220. for batch in batch_iter(signature_query, 50):
  221. cross_sigs_result = await self.db_pool.runInteraction(
  222. "get_e2e_cross_signing_signatures_for_devices",
  223. self._get_e2e_cross_signing_signatures_for_devices_txn,
  224. batch,
  225. )
  226. # add each cross-signing signature to the correct device in the result dict.
  227. for user_id, key_id, device_id, signature in cross_sigs_result:
  228. target_device_result = result[user_id][device_id]
  229. # We've only looked up cross-signatures for non-deleted devices with key
  230. # data.
  231. assert target_device_result is not None
  232. assert target_device_result.keys is not None
  233. target_device_signatures = target_device_result.keys.setdefault(
  234. "signatures", {}
  235. )
  236. signing_user_signatures = target_device_signatures.setdefault(
  237. user_id, {}
  238. )
  239. signing_user_signatures[key_id] = signature
  240. log_kv(result)
  241. return result
  242. async def _get_e2e_device_keys(
  243. self,
  244. query_list: Collection[Tuple[str, Optional[str]]],
  245. include_all_devices: bool = False,
  246. include_deleted_devices: bool = False,
  247. ) -> Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]]:
  248. """Get information on devices from the database
  249. The results include the device's keys and self-signatures, but *not* any
  250. cross-signing signatures which have been added subsequently (for which, see
  251. get_e2e_device_keys_and_signatures)
  252. """
  253. query_clauses: List[str] = []
  254. query_params_list: List[List[object]] = []
  255. if include_all_devices is False:
  256. include_deleted_devices = False
  257. if include_deleted_devices:
  258. deleted_devices = set(query_list)
  259. # Split the query list into queries for users and queries for particular
  260. # devices.
  261. user_list = []
  262. user_device_list = []
  263. for user_id, device_id in query_list:
  264. if device_id is None:
  265. user_list.append(user_id)
  266. else:
  267. user_device_list.append((user_id, device_id))
  268. if user_list:
  269. user_id_in_list_clause, user_args = make_in_list_sql_clause(
  270. self.database_engine, "user_id", user_list
  271. )
  272. query_clauses.append(user_id_in_list_clause)
  273. query_params_list.append(user_args)
  274. if user_device_list:
  275. # Divide the device queries into batches, to avoid excessively large
  276. # queries.
  277. for user_device_batch in batch_iter(user_device_list, 1024):
  278. (
  279. user_device_id_in_list_clause,
  280. user_device_args,
  281. ) = make_tuple_in_list_sql_clause(
  282. self.database_engine, ("user_id", "device_id"), user_device_batch
  283. )
  284. query_clauses.append(user_device_id_in_list_clause)
  285. query_params_list.append(user_device_args)
  286. result: Dict[str, Dict[str, Optional[DeviceKeyLookupResult]]] = {}
  287. def get_e2e_device_keys_txn(
  288. txn: LoggingTransaction, query_clause: str, query_params: list
  289. ) -> None:
  290. sql = (
  291. "SELECT user_id, device_id, "
  292. " d.display_name, "
  293. " k.key_json"
  294. " FROM devices d"
  295. " %s JOIN e2e_device_keys_json k USING (user_id, device_id)"
  296. " WHERE %s AND NOT d.hidden"
  297. ) % (
  298. "LEFT" if include_all_devices else "INNER",
  299. query_clause,
  300. )
  301. txn.execute(sql, query_params)
  302. for user_id, device_id, display_name, key_json in txn:
  303. assert device_id is not None
  304. if include_deleted_devices:
  305. deleted_devices.remove((user_id, device_id))
  306. result.setdefault(user_id, {})[device_id] = DeviceKeyLookupResult(
  307. display_name, db_to_json(key_json) if key_json else None
  308. )
  309. for query_clause, query_params in zip(query_clauses, query_params_list):
  310. await self.db_pool.runInteraction(
  311. "_get_e2e_device_keys",
  312. get_e2e_device_keys_txn,
  313. query_clause,
  314. query_params,
  315. )
  316. if include_deleted_devices:
  317. for user_id, device_id in deleted_devices:
  318. if device_id is None:
  319. continue
  320. result.setdefault(user_id, {})[device_id] = None
  321. return result
  322. def _get_e2e_cross_signing_signatures_for_devices_txn(
  323. self, txn: LoggingTransaction, device_query: Iterable[Tuple[str, str]]
  324. ) -> List[Tuple[str, str, str, str]]:
  325. """Get cross-signing signatures for a given list of devices
  326. Returns signatures made by the owners of the devices.
  327. Returns: a list of results; each entry in the list is a tuple of
  328. (user_id, key_id, target_device_id, signature).
  329. """
  330. signature_query_clauses = []
  331. signature_query_params = []
  332. for user_id, device_id in device_query:
  333. signature_query_clauses.append(
  334. "target_user_id = ? AND target_device_id = ? AND user_id = ?"
  335. )
  336. signature_query_params.extend([user_id, device_id, user_id])
  337. signature_sql = """
  338. SELECT user_id, key_id, target_device_id, signature
  339. FROM e2e_cross_signing_signatures WHERE %s
  340. """ % (
  341. " OR ".join("(" + q + ")" for q in signature_query_clauses)
  342. )
  343. txn.execute(signature_sql, signature_query_params)
  344. return cast(
  345. List[
  346. Tuple[
  347. str,
  348. str,
  349. str,
  350. str,
  351. ]
  352. ],
  353. txn.fetchall(),
  354. )
  355. async def get_e2e_one_time_keys(
  356. self, user_id: str, device_id: str, key_ids: List[str]
  357. ) -> Dict[Tuple[str, str], str]:
  358. """Retrieve a number of one-time keys for a user
  359. Args:
  360. user_id: id of user to get keys for
  361. device_id: id of device to get keys for
  362. key_ids: list of key ids (excluding algorithm) to retrieve
  363. Returns:
  364. A map from (algorithm, key_id) to json string for key
  365. """
  366. rows = await self.db_pool.simple_select_many_batch(
  367. table="e2e_one_time_keys_json",
  368. column="key_id",
  369. iterable=key_ids,
  370. retcols=("algorithm", "key_id", "key_json"),
  371. keyvalues={"user_id": user_id, "device_id": device_id},
  372. desc="add_e2e_one_time_keys_check",
  373. )
  374. result = {(row["algorithm"], row["key_id"]): row["key_json"] for row in rows}
  375. log_kv({"message": "Fetched one time keys for user", "one_time_keys": result})
  376. return result
  377. async def add_e2e_one_time_keys(
  378. self,
  379. user_id: str,
  380. device_id: str,
  381. time_now: int,
  382. new_keys: Iterable[Tuple[str, str, str]],
  383. ) -> None:
  384. """Insert some new one time keys for a device. Errors if any of the
  385. keys already exist.
  386. Args:
  387. user_id: id of user to get keys for
  388. device_id: id of device to get keys for
  389. time_now: insertion time to record (ms since epoch)
  390. new_keys: keys to add - each a tuple of (algorithm, key_id, key json)
  391. """
  392. def _add_e2e_one_time_keys(txn: LoggingTransaction) -> None:
  393. set_tag("user_id", user_id)
  394. set_tag("device_id", device_id)
  395. set_tag("new_keys", str(new_keys))
  396. # We are protected from race between lookup and insertion due to
  397. # a unique constraint. If there is a race of two calls to
  398. # `add_e2e_one_time_keys` then they'll conflict and we will only
  399. # insert one set.
  400. self.db_pool.simple_insert_many_txn(
  401. txn,
  402. table="e2e_one_time_keys_json",
  403. keys=(
  404. "user_id",
  405. "device_id",
  406. "algorithm",
  407. "key_id",
  408. "ts_added_ms",
  409. "key_json",
  410. ),
  411. values=[
  412. (user_id, device_id, algorithm, key_id, time_now, json_bytes)
  413. for algorithm, key_id, json_bytes in new_keys
  414. ],
  415. )
  416. self._invalidate_cache_and_stream(
  417. txn, self.count_e2e_one_time_keys, (user_id, device_id)
  418. )
  419. await self.db_pool.runInteraction(
  420. "add_e2e_one_time_keys_insert", _add_e2e_one_time_keys
  421. )
  422. @cached(max_entries=10000)
  423. async def count_e2e_one_time_keys(
  424. self, user_id: str, device_id: str
  425. ) -> Dict[str, int]:
  426. """Count the number of one time keys the server has for a device
  427. Returns:
  428. A mapping from algorithm to number of keys for that algorithm.
  429. """
  430. def _count_e2e_one_time_keys(txn: LoggingTransaction) -> Dict[str, int]:
  431. sql = (
  432. "SELECT algorithm, COUNT(key_id) FROM e2e_one_time_keys_json"
  433. " WHERE user_id = ? AND device_id = ?"
  434. " GROUP BY algorithm"
  435. )
  436. txn.execute(sql, (user_id, device_id))
  437. # Initially set the key count to 0. This ensures that the client will always
  438. # receive *some count*, even if it's 0.
  439. result = {DeviceKeyAlgorithms.SIGNED_CURVE25519: 0}
  440. # Override entries with the count of any keys we pulled from the database
  441. for algorithm, key_count in txn:
  442. result[algorithm] = key_count
  443. return result
  444. return await self.db_pool.runInteraction(
  445. "count_e2e_one_time_keys", _count_e2e_one_time_keys
  446. )
  447. async def count_bulk_e2e_one_time_keys_for_as(
  448. self, user_ids: Collection[str]
  449. ) -> TransactionOneTimeKeysCount:
  450. """
  451. Counts, in bulk, the one-time keys for all the users specified.
  452. Intended to be used by application services for populating OTK counts in
  453. transactions.
  454. Return structure is of the shape:
  455. user_id -> device_id -> algorithm -> count
  456. Empty algorithm -> count dicts are created if needed to represent a
  457. lack of unused one-time keys.
  458. """
  459. def _count_bulk_e2e_one_time_keys_txn(
  460. txn: LoggingTransaction,
  461. ) -> TransactionOneTimeKeysCount:
  462. user_in_where_clause, user_parameters = make_in_list_sql_clause(
  463. self.database_engine, "user_id", user_ids
  464. )
  465. sql = f"""
  466. SELECT user_id, device_id, algorithm, COUNT(key_id)
  467. FROM devices
  468. LEFT JOIN e2e_one_time_keys_json USING (user_id, device_id)
  469. WHERE {user_in_where_clause}
  470. GROUP BY user_id, device_id, algorithm
  471. """
  472. txn.execute(sql, user_parameters)
  473. result: TransactionOneTimeKeysCount = {}
  474. for user_id, device_id, algorithm, count in txn:
  475. # We deliberately construct empty dictionaries for
  476. # users and devices without any unused one-time keys.
  477. # We *could* omit these empty dicts if there have been no
  478. # changes since the last transaction, but we currently don't
  479. # do any change tracking!
  480. device_count_by_algo = result.setdefault(user_id, {}).setdefault(
  481. device_id, {}
  482. )
  483. if algorithm is not None:
  484. # algorithm will be None if this device has no keys.
  485. device_count_by_algo[algorithm] = count
  486. return result
  487. return await self.db_pool.runInteraction(
  488. "count_bulk_e2e_one_time_keys", _count_bulk_e2e_one_time_keys_txn
  489. )
  490. async def get_e2e_bulk_unused_fallback_key_types(
  491. self, user_ids: Collection[str]
  492. ) -> TransactionUnusedFallbackKeys:
  493. """
  494. Finds, in bulk, the types of unused fallback keys for all the users specified.
  495. Intended to be used by application services for populating unused fallback
  496. keys in transactions.
  497. Return structure is of the shape:
  498. user_id -> device_id -> algorithms
  499. Empty lists are created for devices if there are no unused fallback
  500. keys. This matches the response structure of MSC3202.
  501. """
  502. if len(user_ids) == 0:
  503. return {}
  504. def _get_bulk_e2e_unused_fallback_keys_txn(
  505. txn: LoggingTransaction,
  506. ) -> TransactionUnusedFallbackKeys:
  507. user_in_where_clause, user_parameters = make_in_list_sql_clause(
  508. self.database_engine, "devices.user_id", user_ids
  509. )
  510. # We can't use USING here because we require the `.used` condition
  511. # to be part of the JOIN condition so that we generate empty lists
  512. # when all keys are used (as opposed to just when there are no keys at all).
  513. sql = f"""
  514. SELECT devices.user_id, devices.device_id, algorithm
  515. FROM devices
  516. LEFT JOIN e2e_fallback_keys_json AS fallback_keys
  517. ON devices.user_id = fallback_keys.user_id
  518. AND devices.device_id = fallback_keys.device_id
  519. AND NOT fallback_keys.used
  520. WHERE
  521. {user_in_where_clause}
  522. """
  523. txn.execute(sql, user_parameters)
  524. result: TransactionUnusedFallbackKeys = {}
  525. for user_id, device_id, algorithm in txn:
  526. # We deliberately construct empty dictionaries and lists for
  527. # users and devices without any unused fallback keys.
  528. # We *could* omit these empty dicts if there have been no
  529. # changes since the last transaction, but we currently don't
  530. # do any change tracking!
  531. device_unused_keys = result.setdefault(user_id, {}).setdefault(
  532. device_id, []
  533. )
  534. if algorithm is not None:
  535. # algorithm will be None if this device has no keys.
  536. device_unused_keys.append(algorithm)
  537. return result
  538. return await self.db_pool.runInteraction(
  539. "_get_bulk_e2e_unused_fallback_keys", _get_bulk_e2e_unused_fallback_keys_txn
  540. )
  541. async def set_e2e_fallback_keys(
  542. self, user_id: str, device_id: str, fallback_keys: JsonDict
  543. ) -> None:
  544. """Set the user's e2e fallback keys.
  545. Args:
  546. user_id: the user whose keys are being set
  547. device_id: the device whose keys are being set
  548. fallback_keys: the keys to set. This is a map from key ID (which is
  549. of the form "algorithm:id") to key data.
  550. """
  551. await self.db_pool.runInteraction(
  552. "set_e2e_fallback_keys_txn",
  553. self._set_e2e_fallback_keys_txn,
  554. user_id,
  555. device_id,
  556. fallback_keys,
  557. )
  558. await self.invalidate_cache_and_stream(
  559. "get_e2e_unused_fallback_key_types", (user_id, device_id)
  560. )
  561. def _set_e2e_fallback_keys_txn(
  562. self,
  563. txn: LoggingTransaction,
  564. user_id: str,
  565. device_id: str,
  566. fallback_keys: JsonDict,
  567. ) -> None:
  568. # fallback_keys will usually only have one item in it, so using a for
  569. # loop (as opposed to calling simple_upsert_many_txn) won't be too bad
  570. # FIXME: make sure that only one key per algorithm is uploaded
  571. for key_id, fallback_key in fallback_keys.items():
  572. algorithm, key_id = key_id.split(":", 1)
  573. old_key_json = self.db_pool.simple_select_one_onecol_txn(
  574. txn,
  575. table="e2e_fallback_keys_json",
  576. keyvalues={
  577. "user_id": user_id,
  578. "device_id": device_id,
  579. "algorithm": algorithm,
  580. },
  581. retcol="key_json",
  582. allow_none=True,
  583. )
  584. new_key_json = encode_canonical_json(fallback_key).decode("utf-8")
  585. # If the uploaded key is the same as the current fallback key,
  586. # don't do anything. This prevents marking the key as unused if it
  587. # was already used.
  588. if old_key_json != new_key_json:
  589. self.db_pool.simple_upsert_txn(
  590. txn,
  591. table="e2e_fallback_keys_json",
  592. keyvalues={
  593. "user_id": user_id,
  594. "device_id": device_id,
  595. "algorithm": algorithm,
  596. },
  597. values={
  598. "key_id": key_id,
  599. "key_json": json_encoder.encode(fallback_key),
  600. "used": False,
  601. },
  602. )
  603. @cached(max_entries=10000)
  604. async def get_e2e_unused_fallback_key_types(
  605. self, user_id: str, device_id: str
  606. ) -> Sequence[str]:
  607. """Returns the fallback key types that have an unused key.
  608. Args:
  609. user_id: the user whose keys are being queried
  610. device_id: the device whose keys are being queried
  611. Returns:
  612. a list of key types
  613. """
  614. return await self.db_pool.simple_select_onecol(
  615. "e2e_fallback_keys_json",
  616. keyvalues={"user_id": user_id, "device_id": device_id, "used": False},
  617. retcol="algorithm",
  618. desc="get_e2e_unused_fallback_key_types",
  619. )
  620. async def get_e2e_cross_signing_key(
  621. self, user_id: str, key_type: str, from_user_id: Optional[str] = None
  622. ) -> Optional[JsonDict]:
  623. """Returns a user's cross-signing key.
  624. Args:
  625. user_id: the user whose key is being requested
  626. key_type: the type of key that is being requested: either 'master'
  627. for a master key, 'self_signing' for a self-signing key, or
  628. 'user_signing' for a user-signing key
  629. from_user_id: if specified, signatures made by this user on
  630. the self-signing key will be included in the result
  631. Returns:
  632. dict of the key data or None if not found
  633. """
  634. res = await self.get_e2e_cross_signing_keys_bulk([user_id], from_user_id)
  635. user_keys = res.get(user_id)
  636. if not user_keys:
  637. return None
  638. return user_keys.get(key_type)
  639. @cached(num_args=1)
  640. def _get_bare_e2e_cross_signing_keys(self, user_id: str) -> Mapping[str, JsonDict]:
  641. """Dummy function. Only used to make a cache for
  642. _get_bare_e2e_cross_signing_keys_bulk.
  643. """
  644. raise NotImplementedError()
  645. @cachedList(
  646. cached_method_name="_get_bare_e2e_cross_signing_keys",
  647. list_name="user_ids",
  648. num_args=1,
  649. )
  650. async def _get_bare_e2e_cross_signing_keys_bulk(
  651. self, user_ids: Iterable[str]
  652. ) -> Dict[str, Optional[Mapping[str, JsonDict]]]:
  653. """Returns the cross-signing keys for a set of users. The output of this
  654. function should be passed to _get_e2e_cross_signing_signatures_txn if
  655. the signatures for the calling user need to be fetched.
  656. Args:
  657. user_ids: the users whose keys are being requested
  658. Returns:
  659. A mapping from user ID to key type to key data. If a user's cross-signing
  660. keys were not found, either their user ID will not be in the dict, or
  661. their user ID will map to None.
  662. """
  663. result = await self.db_pool.runInteraction(
  664. "get_bare_e2e_cross_signing_keys_bulk",
  665. self._get_bare_e2e_cross_signing_keys_bulk_txn,
  666. user_ids,
  667. )
  668. # The `Optional` comes from the `@cachedList` decorator.
  669. return cast(Dict[str, Optional[Mapping[str, JsonDict]]], result)
  670. def _get_bare_e2e_cross_signing_keys_bulk_txn(
  671. self,
  672. txn: LoggingTransaction,
  673. user_ids: Iterable[str],
  674. ) -> Dict[str, Dict[str, JsonDict]]:
  675. """Returns the cross-signing keys for a set of users. The output of this
  676. function should be passed to _get_e2e_cross_signing_signatures_txn if
  677. the signatures for the calling user need to be fetched.
  678. Args:
  679. txn: db connection
  680. user_ids: the users whose keys are being requested
  681. Returns:
  682. Mapping from user ID to key type to key data.
  683. If a user's cross-signing keys were not found, their user ID will not be in
  684. the dict.
  685. """
  686. result: Dict[str, Dict[str, JsonDict]] = {}
  687. for user_chunk in batch_iter(user_ids, 100):
  688. clause, params = make_in_list_sql_clause(
  689. txn.database_engine, "user_id", user_chunk
  690. )
  691. # Fetch the latest key for each type per user.
  692. if isinstance(self.database_engine, PostgresEngine):
  693. # The `DISTINCT ON` clause will pick the *first* row it
  694. # encounters, so ordering by stream ID desc will ensure we get
  695. # the latest key.
  696. sql = """
  697. SELECT DISTINCT ON (user_id, keytype) user_id, keytype, keydata, stream_id
  698. FROM e2e_cross_signing_keys
  699. WHERE %(clause)s
  700. ORDER BY user_id, keytype, stream_id DESC
  701. """ % {
  702. "clause": clause
  703. }
  704. else:
  705. # SQLite has special handling for bare columns when using
  706. # MIN/MAX with a `GROUP BY` clause where it picks the value from
  707. # a row that matches the MIN/MAX.
  708. sql = """
  709. SELECT user_id, keytype, keydata, MAX(stream_id)
  710. FROM e2e_cross_signing_keys
  711. WHERE %(clause)s
  712. GROUP BY user_id, keytype
  713. """ % {
  714. "clause": clause
  715. }
  716. txn.execute(sql, params)
  717. rows = self.db_pool.cursor_to_dict(txn)
  718. for row in rows:
  719. user_id = row["user_id"]
  720. key_type = row["keytype"]
  721. key = db_to_json(row["keydata"])
  722. user_keys = result.setdefault(user_id, {})
  723. user_keys[key_type] = key
  724. return result
  725. def _get_e2e_cross_signing_signatures_txn(
  726. self,
  727. txn: LoggingTransaction,
  728. keys: Dict[str, Optional[Dict[str, JsonDict]]],
  729. from_user_id: str,
  730. ) -> Dict[str, Optional[Dict[str, JsonDict]]]:
  731. """Returns the cross-signing signatures made by a user on a set of keys.
  732. Args:
  733. txn: db connection
  734. keys: a map of user ID to key type to key data.
  735. This dict will be modified to add signatures.
  736. from_user_id: fetch the signatures made by this user
  737. Returns:
  738. Mapping from user ID to key type to key data.
  739. The return value will be the same as the keys argument, with the
  740. modifications included.
  741. """
  742. # find out what cross-signing keys (a.k.a. devices) we need to get
  743. # signatures for. This is a map of (user_id, device_id) to key type
  744. # (device_id is the key's public part).
  745. devices: Dict[Tuple[str, str], str] = {}
  746. for user_id, user_keys in keys.items():
  747. if user_keys is None:
  748. continue
  749. for key_type, key in user_keys.items():
  750. device_id = None
  751. for k in key["keys"].values():
  752. device_id = k
  753. # `key` ought to be a `CrossSigningKey`, whose .keys property is a
  754. # dictionary with a single entry:
  755. # "algorithm:base64_public_key": "base64_public_key"
  756. # See https://spec.matrix.org/v1.1/client-server-api/#cross-signing
  757. assert isinstance(device_id, str)
  758. devices[(user_id, device_id)] = key_type
  759. for batch in batch_iter(devices.keys(), size=100):
  760. sql = """
  761. SELECT target_user_id, target_device_id, key_id, signature
  762. FROM e2e_cross_signing_signatures
  763. WHERE user_id = ?
  764. AND (%s)
  765. """ % (
  766. " OR ".join(
  767. "(target_user_id = ? AND target_device_id = ?)" for _ in batch
  768. )
  769. )
  770. query_params = [from_user_id]
  771. for item in batch:
  772. # item is a (user_id, device_id) tuple
  773. query_params.extend(item)
  774. txn.execute(sql, query_params)
  775. rows = self.db_pool.cursor_to_dict(txn)
  776. # and add the signatures to the appropriate keys
  777. for row in rows:
  778. key_id: str = row["key_id"]
  779. target_user_id: str = row["target_user_id"]
  780. target_device_id: str = row["target_device_id"]
  781. key_type = devices[(target_user_id, target_device_id)]
  782. # We need to copy everything, because the result may have come
  783. # from the cache. dict.copy only does a shallow copy, so we
  784. # need to recursively copy the dicts that will be modified.
  785. user_keys = keys[target_user_id]
  786. # `user_keys` cannot be `None` because we only fetched signatures for
  787. # users with keys
  788. assert user_keys is not None
  789. user_keys = keys[target_user_id] = user_keys.copy()
  790. target_user_key = user_keys[key_type] = user_keys[key_type].copy()
  791. if "signatures" in target_user_key:
  792. signatures = target_user_key["signatures"] = target_user_key[
  793. "signatures"
  794. ].copy()
  795. if from_user_id in signatures:
  796. user_sigs = signatures[from_user_id] = signatures[from_user_id]
  797. user_sigs[key_id] = row["signature"]
  798. else:
  799. signatures[from_user_id] = {key_id: row["signature"]}
  800. else:
  801. target_user_key["signatures"] = {
  802. from_user_id: {key_id: row["signature"]}
  803. }
  804. return keys
  805. @cancellable
  806. async def get_e2e_cross_signing_keys_bulk(
  807. self, user_ids: List[str], from_user_id: Optional[str] = None
  808. ) -> Dict[str, Optional[Mapping[str, JsonDict]]]:
  809. """Returns the cross-signing keys for a set of users.
  810. Args:
  811. user_ids: the users whose keys are being requested
  812. from_user_id: if specified, signatures made by this user on
  813. the self-signing keys will be included in the result
  814. Returns:
  815. A map of user ID to key type to key data. If a user's cross-signing
  816. keys were not found, either their user ID will not be in the dict,
  817. or their user ID will map to None.
  818. """
  819. result = await self._get_bare_e2e_cross_signing_keys_bulk(user_ids)
  820. if from_user_id:
  821. result = cast(
  822. Dict[str, Optional[Mapping[str, JsonDict]]],
  823. await self.db_pool.runInteraction(
  824. "get_e2e_cross_signing_signatures",
  825. self._get_e2e_cross_signing_signatures_txn,
  826. result,
  827. from_user_id,
  828. ),
  829. )
  830. return result
  831. async def get_all_user_signature_changes_for_remotes(
  832. self, instance_name: str, last_id: int, current_id: int, limit: int
  833. ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
  834. """Get updates for groups replication stream.
  835. Note that the user signature stream represents when a user signs their
  836. device with their user-signing key, which is not published to other
  837. users or servers, so no `destination` is needed in the returned
  838. list. However, this is needed to poke workers.
  839. Args:
  840. instance_name: The writer we want to fetch updates from. Unused
  841. here since there is only ever one writer.
  842. last_id: The token to fetch updates from. Exclusive.
  843. current_id: The token to fetch updates up to. Inclusive.
  844. limit: The requested limit for the number of rows to return. The
  845. function may return more or fewer rows.
  846. Returns:
  847. A tuple consisting of: the updates, a token to use to fetch
  848. subsequent updates, and whether we returned fewer rows than exists
  849. between the requested tokens due to the limit.
  850. The token returned can be used in a subsequent call to this
  851. function to get further updatees.
  852. The updates are a list of 2-tuples of stream ID and the row data
  853. """
  854. if last_id == current_id:
  855. return [], current_id, False
  856. def _get_all_user_signature_changes_for_remotes_txn(
  857. txn: LoggingTransaction,
  858. ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
  859. sql = """
  860. SELECT stream_id, from_user_id AS user_id
  861. FROM user_signature_stream
  862. WHERE ? < stream_id AND stream_id <= ?
  863. ORDER BY stream_id ASC
  864. LIMIT ?
  865. """
  866. txn.execute(sql, (last_id, current_id, limit))
  867. updates = [(row[0], (row[1:])) for row in txn]
  868. limited = False
  869. upto_token = current_id
  870. if len(updates) >= limit:
  871. upto_token = updates[-1][0]
  872. limited = True
  873. return updates, upto_token, limited
  874. return await self.db_pool.runInteraction(
  875. "get_all_user_signature_changes_for_remotes",
  876. _get_all_user_signature_changes_for_remotes_txn,
  877. )
  878. @abc.abstractmethod
  879. def get_device_stream_token(self) -> int:
  880. """Get the current stream id from the _device_list_id_gen"""
  881. ...
  882. async def claim_e2e_one_time_keys(
  883. self, query_list: Iterable[Tuple[str, str, str]]
  884. ) -> Dict[str, Dict[str, Dict[str, str]]]:
  885. """Take a list of one time keys out of the database.
  886. Args:
  887. query_list: An iterable of tuples of (user ID, device ID, algorithm).
  888. Returns:
  889. A map of user ID -> a map device ID -> a map of key ID -> JSON bytes.
  890. """
  891. @trace
  892. def _claim_e2e_one_time_key_simple(
  893. txn: LoggingTransaction, user_id: str, device_id: str, algorithm: str
  894. ) -> Optional[Tuple[str, str]]:
  895. """Claim OTK for device for DBs that don't support RETURNING.
  896. Returns:
  897. A tuple of key name (algorithm + key ID) and key JSON, if an
  898. OTK was found.
  899. """
  900. sql = """
  901. SELECT key_id, key_json FROM e2e_one_time_keys_json
  902. WHERE user_id = ? AND device_id = ? AND algorithm = ?
  903. LIMIT 1
  904. """
  905. txn.execute(sql, (user_id, device_id, algorithm))
  906. otk_row = txn.fetchone()
  907. if otk_row is None:
  908. return None
  909. key_id, key_json = otk_row
  910. self.db_pool.simple_delete_one_txn(
  911. txn,
  912. table="e2e_one_time_keys_json",
  913. keyvalues={
  914. "user_id": user_id,
  915. "device_id": device_id,
  916. "algorithm": algorithm,
  917. "key_id": key_id,
  918. },
  919. )
  920. self._invalidate_cache_and_stream(
  921. txn, self.count_e2e_one_time_keys, (user_id, device_id)
  922. )
  923. return f"{algorithm}:{key_id}", key_json
  924. @trace
  925. def _claim_e2e_one_time_key_returning(
  926. txn: LoggingTransaction, user_id: str, device_id: str, algorithm: str
  927. ) -> Optional[Tuple[str, str]]:
  928. """Claim OTK for device for DBs that support RETURNING.
  929. Returns:
  930. A tuple of key name (algorithm + key ID) and key JSON, if an
  931. OTK was found.
  932. """
  933. # We can use RETURNING to do the fetch and DELETE in once step.
  934. sql = """
  935. DELETE FROM e2e_one_time_keys_json
  936. WHERE user_id = ? AND device_id = ? AND algorithm = ?
  937. AND key_id IN (
  938. SELECT key_id FROM e2e_one_time_keys_json
  939. WHERE user_id = ? AND device_id = ? AND algorithm = ?
  940. LIMIT 1
  941. )
  942. RETURNING key_id, key_json
  943. """
  944. txn.execute(
  945. sql, (user_id, device_id, algorithm, user_id, device_id, algorithm)
  946. )
  947. otk_row = txn.fetchone()
  948. if otk_row is None:
  949. return None
  950. self._invalidate_cache_and_stream(
  951. txn, self.count_e2e_one_time_keys, (user_id, device_id)
  952. )
  953. key_id, key_json = otk_row
  954. return f"{algorithm}:{key_id}", key_json
  955. results: Dict[str, Dict[str, Dict[str, str]]] = {}
  956. for user_id, device_id, algorithm in query_list:
  957. if self.database_engine.supports_returning:
  958. # If we support RETURNING clause we can use a single query that
  959. # allows us to use autocommit mode.
  960. _claim_e2e_one_time_key = _claim_e2e_one_time_key_returning
  961. db_autocommit = True
  962. else:
  963. _claim_e2e_one_time_key = _claim_e2e_one_time_key_simple
  964. db_autocommit = False
  965. claim_row = await self.db_pool.runInteraction(
  966. "claim_e2e_one_time_keys",
  967. _claim_e2e_one_time_key,
  968. user_id,
  969. device_id,
  970. algorithm,
  971. db_autocommit=db_autocommit,
  972. )
  973. if claim_row:
  974. device_results = results.setdefault(user_id, {}).setdefault(
  975. device_id, {}
  976. )
  977. device_results[claim_row[0]] = claim_row[1]
  978. continue
  979. # No one-time key available, so see if there's a fallback
  980. # key
  981. row = await self.db_pool.simple_select_one(
  982. table="e2e_fallback_keys_json",
  983. keyvalues={
  984. "user_id": user_id,
  985. "device_id": device_id,
  986. "algorithm": algorithm,
  987. },
  988. retcols=("key_id", "key_json", "used"),
  989. desc="_get_fallback_key",
  990. allow_none=True,
  991. )
  992. if row is None:
  993. continue
  994. key_id = row["key_id"]
  995. key_json = row["key_json"]
  996. used = row["used"]
  997. # Mark fallback key as used if not already.
  998. if not used:
  999. await self.db_pool.simple_update_one(
  1000. table="e2e_fallback_keys_json",
  1001. keyvalues={
  1002. "user_id": user_id,
  1003. "device_id": device_id,
  1004. "algorithm": algorithm,
  1005. "key_id": key_id,
  1006. },
  1007. updatevalues={"used": True},
  1008. desc="_get_fallback_key_set_used",
  1009. )
  1010. await self.invalidate_cache_and_stream(
  1011. "get_e2e_unused_fallback_key_types", (user_id, device_id)
  1012. )
  1013. device_results = results.setdefault(user_id, {}).setdefault(device_id, {})
  1014. device_results[f"{algorithm}:{key_id}"] = key_json
  1015. return results
  1016. class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore):
  1017. def __init__(
  1018. self,
  1019. database: DatabasePool,
  1020. db_conn: LoggingDatabaseConnection,
  1021. hs: "HomeServer",
  1022. ):
  1023. super().__init__(database, db_conn, hs)
  1024. self._cross_signing_id_gen = StreamIdGenerator(
  1025. db_conn,
  1026. hs.get_replication_notifier(),
  1027. "e2e_cross_signing_keys",
  1028. "stream_id",
  1029. )
  1030. async def set_e2e_device_keys(
  1031. self, user_id: str, device_id: str, time_now: int, device_keys: JsonDict
  1032. ) -> bool:
  1033. """Stores device keys for a device. Returns whether there was a change
  1034. or the keys were already in the database.
  1035. """
  1036. def _set_e2e_device_keys_txn(txn: LoggingTransaction) -> bool:
  1037. set_tag("user_id", user_id)
  1038. set_tag("device_id", device_id)
  1039. set_tag("time_now", time_now)
  1040. set_tag("device_keys", str(device_keys))
  1041. old_key_json = self.db_pool.simple_select_one_onecol_txn(
  1042. txn,
  1043. table="e2e_device_keys_json",
  1044. keyvalues={"user_id": user_id, "device_id": device_id},
  1045. retcol="key_json",
  1046. allow_none=True,
  1047. )
  1048. # In py3 we need old_key_json to match new_key_json type. The DB
  1049. # returns unicode while encode_canonical_json returns bytes.
  1050. new_key_json = encode_canonical_json(device_keys).decode("utf-8")
  1051. if old_key_json == new_key_json:
  1052. log_kv({"Message": "Device key already stored."})
  1053. return False
  1054. self.db_pool.simple_upsert_txn(
  1055. txn,
  1056. table="e2e_device_keys_json",
  1057. keyvalues={"user_id": user_id, "device_id": device_id},
  1058. values={"ts_added_ms": time_now, "key_json": new_key_json},
  1059. )
  1060. log_kv({"message": "Device keys stored."})
  1061. return True
  1062. return await self.db_pool.runInteraction(
  1063. "set_e2e_device_keys", _set_e2e_device_keys_txn
  1064. )
  1065. async def delete_e2e_keys_by_device(self, user_id: str, device_id: str) -> None:
  1066. def delete_e2e_keys_by_device_txn(txn: LoggingTransaction) -> None:
  1067. log_kv(
  1068. {
  1069. "message": "Deleting keys for device",
  1070. "device_id": device_id,
  1071. "user_id": user_id,
  1072. }
  1073. )
  1074. self.db_pool.simple_delete_txn(
  1075. txn,
  1076. table="e2e_device_keys_json",
  1077. keyvalues={"user_id": user_id, "device_id": device_id},
  1078. )
  1079. self.db_pool.simple_delete_txn(
  1080. txn,
  1081. table="e2e_one_time_keys_json",
  1082. keyvalues={"user_id": user_id, "device_id": device_id},
  1083. )
  1084. self._invalidate_cache_and_stream(
  1085. txn, self.count_e2e_one_time_keys, (user_id, device_id)
  1086. )
  1087. self.db_pool.simple_delete_txn(
  1088. txn,
  1089. table="dehydrated_devices",
  1090. keyvalues={"user_id": user_id, "device_id": device_id},
  1091. )
  1092. self.db_pool.simple_delete_txn(
  1093. txn,
  1094. table="e2e_fallback_keys_json",
  1095. keyvalues={"user_id": user_id, "device_id": device_id},
  1096. )
  1097. self._invalidate_cache_and_stream(
  1098. txn, self.get_e2e_unused_fallback_key_types, (user_id, device_id)
  1099. )
  1100. await self.db_pool.runInteraction(
  1101. "delete_e2e_keys_by_device", delete_e2e_keys_by_device_txn
  1102. )
  1103. def _set_e2e_cross_signing_key_txn(
  1104. self,
  1105. txn: LoggingTransaction,
  1106. user_id: str,
  1107. key_type: str,
  1108. key: JsonDict,
  1109. stream_id: int,
  1110. ) -> None:
  1111. """Set a user's cross-signing key.
  1112. Args:
  1113. txn: db connection
  1114. user_id: the user to set the signing key for
  1115. key_type: the type of key that is being set: either 'master'
  1116. for a master key, 'self_signing' for a self-signing key, or
  1117. 'user_signing' for a user-signing key
  1118. key: the key data
  1119. stream_id
  1120. """
  1121. # the 'key' dict will look something like:
  1122. # {
  1123. # "user_id": "@alice:example.com",
  1124. # "usage": ["self_signing"],
  1125. # "keys": {
  1126. # "ed25519:base64+self+signing+public+key": "base64+self+signing+public+key",
  1127. # },
  1128. # "signatures": {
  1129. # "@alice:example.com": {
  1130. # "ed25519:base64+master+public+key": "base64+signature"
  1131. # }
  1132. # }
  1133. # }
  1134. # The "keys" property must only have one entry, which will be the public
  1135. # key, so we just grab the first value in there
  1136. pubkey = next(iter(key["keys"].values()))
  1137. # The cross-signing keys need to occupy the same namespace as devices,
  1138. # since signatures are identified by device ID. So add an entry to the
  1139. # device table to make sure that we don't have a collision with device
  1140. # IDs.
  1141. # We only need to do this for local users, since remote servers should be
  1142. # responsible for checking this for their own users.
  1143. if self.hs.is_mine_id(user_id):
  1144. self.db_pool.simple_insert_txn(
  1145. txn,
  1146. "devices",
  1147. values={
  1148. "user_id": user_id,
  1149. "device_id": pubkey,
  1150. "display_name": key_type + " signing key",
  1151. "hidden": True,
  1152. },
  1153. )
  1154. # and finally, store the key itself
  1155. self.db_pool.simple_insert_txn(
  1156. txn,
  1157. "e2e_cross_signing_keys",
  1158. values={
  1159. "user_id": user_id,
  1160. "keytype": key_type,
  1161. "keydata": json_encoder.encode(key),
  1162. "stream_id": stream_id,
  1163. },
  1164. )
  1165. self._invalidate_cache_and_stream(
  1166. txn, self._get_bare_e2e_cross_signing_keys, (user_id,)
  1167. )
  1168. async def set_e2e_cross_signing_key(
  1169. self, user_id: str, key_type: str, key: JsonDict
  1170. ) -> None:
  1171. """Set a user's cross-signing key.
  1172. Args:
  1173. user_id: the user to set the user-signing key for
  1174. key_type: the type of cross-signing key to set
  1175. key: the key data
  1176. """
  1177. async with self._cross_signing_id_gen.get_next() as stream_id:
  1178. return await self.db_pool.runInteraction(
  1179. "add_e2e_cross_signing_key",
  1180. self._set_e2e_cross_signing_key_txn,
  1181. user_id,
  1182. key_type,
  1183. key,
  1184. stream_id,
  1185. )
  1186. async def store_e2e_cross_signing_signatures(
  1187. self, user_id: str, signatures: "Iterable[SignatureListItem]"
  1188. ) -> None:
  1189. """Stores cross-signing signatures.
  1190. Args:
  1191. user_id: the user who made the signatures
  1192. signatures: signatures to add
  1193. """
  1194. await self.db_pool.simple_insert_many(
  1195. "e2e_cross_signing_signatures",
  1196. keys=(
  1197. "user_id",
  1198. "key_id",
  1199. "target_user_id",
  1200. "target_device_id",
  1201. "signature",
  1202. ),
  1203. values=[
  1204. (
  1205. user_id,
  1206. item.signing_key_id,
  1207. item.target_user_id,
  1208. item.target_device_id,
  1209. item.signature,
  1210. )
  1211. for item in signatures
  1212. ],
  1213. desc="add_e2e_signing_key",
  1214. )