api.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. # Copyright 2022 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import urllib.parse
  17. from typing import (
  18. TYPE_CHECKING,
  19. Dict,
  20. Iterable,
  21. List,
  22. Mapping,
  23. Optional,
  24. Sequence,
  25. Tuple,
  26. TypeVar,
  27. Union,
  28. )
  29. from prometheus_client import Counter
  30. from typing_extensions import ParamSpec, TypeGuard
  31. from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind
  32. from synapse.api.errors import CodeMessageException, HttpResponseException
  33. from synapse.appservice import (
  34. ApplicationService,
  35. TransactionOneTimeKeysCount,
  36. TransactionUnusedFallbackKeys,
  37. )
  38. from synapse.events import EventBase
  39. from synapse.events.utils import SerializeEventConfig, serialize_event
  40. from synapse.http.client import SimpleHttpClient, is_unknown_endpoint
  41. from synapse.types import DeviceListUpdates, JsonDict, ThirdPartyInstanceID
  42. from synapse.util.caches.response_cache import ResponseCache
  43. if TYPE_CHECKING:
  44. from synapse.server import HomeServer
  45. logger = logging.getLogger(__name__)
  46. sent_transactions_counter = Counter(
  47. "synapse_appservice_api_sent_transactions",
  48. "Number of /transactions/ requests sent",
  49. ["service"],
  50. )
  51. failed_transactions_counter = Counter(
  52. "synapse_appservice_api_failed_transactions",
  53. "Number of /transactions/ requests that failed to send",
  54. ["service"],
  55. )
  56. sent_events_counter = Counter(
  57. "synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
  58. )
  59. sent_ephemeral_counter = Counter(
  60. "synapse_appservice_api_sent_ephemeral",
  61. "Number of ephemeral events sent to the AS",
  62. ["service"],
  63. )
  64. sent_todevice_counter = Counter(
  65. "synapse_appservice_api_sent_todevice",
  66. "Number of todevice messages sent to the AS",
  67. ["service"],
  68. )
  69. HOUR_IN_MS = 60 * 60 * 1000
  70. APP_SERVICE_PREFIX = "/_matrix/app/v1"
  71. P = ParamSpec("P")
  72. R = TypeVar("R")
  73. def _is_valid_3pe_metadata(info: JsonDict) -> bool:
  74. if "instances" not in info:
  75. return False
  76. if not isinstance(info["instances"], list):
  77. return False
  78. return True
  79. def _is_valid_3pe_result(r: object, field: str) -> TypeGuard[JsonDict]:
  80. if not isinstance(r, dict):
  81. return False
  82. for k in (field, "protocol"):
  83. if k not in r:
  84. return False
  85. if not isinstance(r[k], str):
  86. return False
  87. if "fields" not in r:
  88. return False
  89. fields = r["fields"]
  90. if not isinstance(fields, dict):
  91. return False
  92. return True
  93. class ApplicationServiceApi(SimpleHttpClient):
  94. """This class manages HS -> AS communications, including querying and
  95. pushing.
  96. """
  97. def __init__(self, hs: "HomeServer"):
  98. super().__init__(hs)
  99. self.clock = hs.get_clock()
  100. self.config = hs.config.appservice
  101. self.protocol_meta_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
  102. hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
  103. )
  104. async def query_user(self, service: "ApplicationService", user_id: str) -> bool:
  105. if service.url is None:
  106. return False
  107. # This is required by the configuration.
  108. assert service.hs_token is not None
  109. try:
  110. args = None
  111. if self.config.use_appservice_legacy_authorization:
  112. args = {"access_token": service.hs_token}
  113. response = await self.get_json(
  114. f"{service.url}{APP_SERVICE_PREFIX}/users/{urllib.parse.quote(user_id)}",
  115. args,
  116. headers={"Authorization": [f"Bearer {service.hs_token}"]},
  117. )
  118. if response is not None: # just an empty json object
  119. return True
  120. except CodeMessageException as e:
  121. if e.code == 404:
  122. return False
  123. logger.warning("query_user to %s received %s", service.url, e.code)
  124. except Exception as ex:
  125. logger.warning("query_user to %s threw exception %s", service.url, ex)
  126. return False
  127. async def query_alias(self, service: "ApplicationService", alias: str) -> bool:
  128. if service.url is None:
  129. return False
  130. # This is required by the configuration.
  131. assert service.hs_token is not None
  132. try:
  133. args = None
  134. if self.config.use_appservice_legacy_authorization:
  135. args = {"access_token": service.hs_token}
  136. response = await self.get_json(
  137. f"{service.url}{APP_SERVICE_PREFIX}/rooms/{urllib.parse.quote(alias)}",
  138. args,
  139. headers={"Authorization": [f"Bearer {service.hs_token}"]},
  140. )
  141. if response is not None: # just an empty json object
  142. return True
  143. except CodeMessageException as e:
  144. logger.warning("query_alias to %s received %s", service.url, e.code)
  145. if e.code == 404:
  146. return False
  147. except Exception as ex:
  148. logger.warning("query_alias to %s threw exception %s", service.url, ex)
  149. return False
  150. async def query_3pe(
  151. self,
  152. service: "ApplicationService",
  153. kind: str,
  154. protocol: str,
  155. fields: Dict[bytes, List[bytes]],
  156. ) -> List[JsonDict]:
  157. if kind == ThirdPartyEntityKind.USER:
  158. required_field = "userid"
  159. elif kind == ThirdPartyEntityKind.LOCATION:
  160. required_field = "alias"
  161. else:
  162. raise ValueError("Unrecognised 'kind' argument %r to query_3pe()", kind)
  163. if service.url is None:
  164. return []
  165. # This is required by the configuration.
  166. assert service.hs_token is not None
  167. try:
  168. args: Mapping[bytes, Union[List[bytes], str]] = fields
  169. if self.config.use_appservice_legacy_authorization:
  170. args = {
  171. **fields,
  172. b"access_token": service.hs_token,
  173. }
  174. response = await self.get_json(
  175. f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/{kind}/{urllib.parse.quote(protocol)}",
  176. args=args,
  177. headers={"Authorization": [f"Bearer {service.hs_token}"]},
  178. )
  179. if not isinstance(response, list):
  180. logger.warning(
  181. "query_3pe to %s returned an invalid response %r",
  182. service.url,
  183. response,
  184. )
  185. return []
  186. ret = []
  187. for r in response:
  188. if _is_valid_3pe_result(r, field=required_field):
  189. ret.append(r)
  190. else:
  191. logger.warning(
  192. "query_3pe to %s returned an invalid result %r", service.url, r
  193. )
  194. return ret
  195. except Exception as ex:
  196. logger.warning("query_3pe to %s threw exception %s", service.url, ex)
  197. return []
  198. async def get_3pe_protocol(
  199. self, service: "ApplicationService", protocol: str
  200. ) -> Optional[JsonDict]:
  201. if service.url is None:
  202. return {}
  203. async def _get() -> Optional[JsonDict]:
  204. # This is required by the configuration.
  205. assert service.hs_token is not None
  206. try:
  207. args = None
  208. if self.config.use_appservice_legacy_authorization:
  209. args = {"access_token": service.hs_token}
  210. info = await self.get_json(
  211. f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/protocol/{urllib.parse.quote(protocol)}",
  212. args,
  213. headers={"Authorization": [f"Bearer {service.hs_token}"]},
  214. )
  215. if not _is_valid_3pe_metadata(info):
  216. logger.warning(
  217. "query_3pe_protocol to %s did not return a valid result",
  218. service.url,
  219. )
  220. return None
  221. for instance in info.get("instances", []):
  222. network_id = instance.get("network_id", None)
  223. if network_id is not None:
  224. instance["instance_id"] = ThirdPartyInstanceID(
  225. service.id, network_id
  226. ).to_string()
  227. return info
  228. except Exception as ex:
  229. logger.warning(
  230. "query_3pe_protocol to %s threw exception %s", service.url, ex
  231. )
  232. return None
  233. key = (service.id, protocol)
  234. return await self.protocol_meta_cache.wrap(key, _get)
  235. async def ping(self, service: "ApplicationService", txn_id: Optional[str]) -> None:
  236. # The caller should check that url is set
  237. assert service.url is not None, "ping called without URL being set"
  238. # This is required by the configuration.
  239. assert service.hs_token is not None
  240. await self.post_json_get_json(
  241. uri=f"{service.url}{APP_SERVICE_PREFIX}/ping",
  242. post_json={"transaction_id": txn_id},
  243. headers={"Authorization": [f"Bearer {service.hs_token}"]},
  244. )
  245. async def push_bulk(
  246. self,
  247. service: "ApplicationService",
  248. events: Sequence[EventBase],
  249. ephemeral: List[JsonDict],
  250. to_device_messages: List[JsonDict],
  251. one_time_keys_count: TransactionOneTimeKeysCount,
  252. unused_fallback_keys: TransactionUnusedFallbackKeys,
  253. device_list_summary: DeviceListUpdates,
  254. txn_id: Optional[int] = None,
  255. ) -> bool:
  256. """
  257. Push data to an application service.
  258. Args:
  259. service: The application service to send to.
  260. events: The persistent events to send.
  261. ephemeral: The ephemeral events to send.
  262. to_device_messages: The to-device messages to send.
  263. txn_id: An unique ID to assign to this transaction. Application services should
  264. deduplicate transactions received with identitical IDs.
  265. Returns:
  266. True if the task succeeded, False if it failed.
  267. """
  268. if service.url is None:
  269. return True
  270. # This is required by the configuration.
  271. assert service.hs_token is not None
  272. serialized_events = self._serialize(service, events)
  273. if txn_id is None:
  274. logger.warning(
  275. "push_bulk: Missing txn ID sending events to %s", service.url
  276. )
  277. txn_id = 0
  278. # Never send ephemeral events to appservices that do not support it
  279. body: JsonDict = {"events": serialized_events}
  280. if service.supports_ephemeral:
  281. body.update(
  282. {
  283. # TODO: Update to stable prefixes once MSC2409 completes FCP merge.
  284. "de.sorunome.msc2409.ephemeral": ephemeral,
  285. "de.sorunome.msc2409.to_device": to_device_messages,
  286. }
  287. )
  288. # TODO: Update to stable prefixes once MSC3202 completes FCP merge
  289. if service.msc3202_transaction_extensions:
  290. if one_time_keys_count:
  291. body[
  292. "org.matrix.msc3202.device_one_time_key_counts"
  293. ] = one_time_keys_count
  294. body[
  295. "org.matrix.msc3202.device_one_time_keys_count"
  296. ] = one_time_keys_count
  297. if unused_fallback_keys:
  298. body[
  299. "org.matrix.msc3202.device_unused_fallback_key_types"
  300. ] = unused_fallback_keys
  301. if device_list_summary:
  302. body["org.matrix.msc3202.device_lists"] = {
  303. "changed": list(device_list_summary.changed),
  304. "left": list(device_list_summary.left),
  305. }
  306. try:
  307. args = None
  308. if self.config.use_appservice_legacy_authorization:
  309. args = {"access_token": service.hs_token}
  310. await self.put_json(
  311. f"{service.url}{APP_SERVICE_PREFIX}/transactions/{urllib.parse.quote(str(txn_id))}",
  312. json_body=body,
  313. args=args,
  314. headers={"Authorization": [f"Bearer {service.hs_token}"]},
  315. )
  316. if logger.isEnabledFor(logging.DEBUG):
  317. logger.debug(
  318. "push_bulk to %s succeeded! events=%s",
  319. service.url,
  320. [event.get("event_id") for event in events],
  321. )
  322. sent_transactions_counter.labels(service.id).inc()
  323. sent_events_counter.labels(service.id).inc(len(serialized_events))
  324. sent_ephemeral_counter.labels(service.id).inc(len(ephemeral))
  325. sent_todevice_counter.labels(service.id).inc(len(to_device_messages))
  326. return True
  327. except CodeMessageException as e:
  328. logger.warning(
  329. "push_bulk to %s received code=%s msg=%s",
  330. service.url,
  331. e.code,
  332. e.msg,
  333. exc_info=logger.isEnabledFor(logging.DEBUG),
  334. )
  335. except Exception as ex:
  336. logger.warning(
  337. "push_bulk to %s threw exception(%s) %s args=%s",
  338. service.url,
  339. type(ex).__name__,
  340. ex,
  341. ex.args,
  342. exc_info=logger.isEnabledFor(logging.DEBUG),
  343. )
  344. failed_transactions_counter.labels(service.id).inc()
  345. return False
  346. async def claim_client_keys(
  347. self, service: "ApplicationService", query: List[Tuple[str, str, str, int]]
  348. ) -> Tuple[
  349. Dict[str, Dict[str, Dict[str, JsonDict]]], List[Tuple[str, str, str, int]]
  350. ]:
  351. """Claim one time keys from an application service.
  352. Note that any error (including a timeout) is treated as the application
  353. service having no information.
  354. Args:
  355. service: The application service to query.
  356. query: An iterable of tuples of (user ID, device ID, algorithm).
  357. Returns:
  358. A tuple of:
  359. A map of user ID -> a map device ID -> a map of key ID -> JSON dict.
  360. A copy of the input which has not been fulfilled because the
  361. appservice doesn't support this endpoint or has not returned
  362. data for that tuple.
  363. """
  364. if service.url is None:
  365. return {}, query
  366. # This is required by the configuration.
  367. assert service.hs_token is not None
  368. # Create the expected payload shape.
  369. body: Dict[str, Dict[str, List[str]]] = {}
  370. for user_id, device, algorithm, count in query:
  371. body.setdefault(user_id, {}).setdefault(device, []).extend(
  372. [algorithm] * count
  373. )
  374. uri = f"{service.url}/_matrix/app/unstable/org.matrix.msc3983/keys/claim"
  375. try:
  376. response = await self.post_json_get_json(
  377. uri,
  378. body,
  379. headers={"Authorization": [f"Bearer {service.hs_token}"]},
  380. )
  381. except HttpResponseException as e:
  382. # The appservice doesn't support this endpoint.
  383. if is_unknown_endpoint(e):
  384. return {}, query
  385. logger.warning("claim_keys to %s received %s", uri, e.code)
  386. return {}, query
  387. except Exception as ex:
  388. logger.warning("claim_keys to %s threw exception %s", uri, ex)
  389. return {}, query
  390. # Check if the appservice fulfilled all of the queried user/device/algorithms
  391. # or if some are still missing.
  392. #
  393. # TODO This places a lot of faith in the response shape being correct.
  394. missing = []
  395. for user_id, device, algorithm, count in query:
  396. # Count the number of keys in the response for this algorithm by
  397. # checking which key IDs start with the algorithm. This uses that
  398. # True == 1 in Python to generate a count.
  399. response_count = sum(
  400. key_id.startswith(f"{algorithm}:")
  401. for key_id in response.get(user_id, {}).get(device, {})
  402. )
  403. count -= response_count
  404. # If the appservice responds with fewer keys than requested, then
  405. # consider the request unfulfilled.
  406. if count > 0:
  407. missing.append((user_id, device, algorithm, count))
  408. return response, missing
  409. async def query_keys(
  410. self, service: "ApplicationService", query: Dict[str, List[str]]
  411. ) -> Dict[str, Dict[str, Dict[str, JsonDict]]]:
  412. """Query the application service for keys.
  413. Note that any error (including a timeout) is treated as the application
  414. service having no information.
  415. Args:
  416. service: The application service to query.
  417. query: An iterable of tuples of (user ID, device ID, algorithm).
  418. Returns:
  419. A map of device_keys/master_keys/self_signing_keys/user_signing_keys:
  420. device_keys is a map of user ID -> a map device ID -> device info.
  421. """
  422. if service.url is None:
  423. return {}
  424. # This is required by the configuration.
  425. assert service.hs_token is not None
  426. uri = f"{service.url}/_matrix/app/unstable/org.matrix.msc3984/keys/query"
  427. try:
  428. response = await self.post_json_get_json(
  429. uri,
  430. query,
  431. headers={"Authorization": [f"Bearer {service.hs_token}"]},
  432. )
  433. except HttpResponseException as e:
  434. # The appservice doesn't support this endpoint.
  435. if is_unknown_endpoint(e):
  436. return {}
  437. logger.warning("query_keys to %s received %s", uri, e.code)
  438. return {}
  439. except Exception as ex:
  440. logger.warning("query_keys to %s threw exception %s", uri, ex)
  441. return {}
  442. return response
  443. def _serialize(
  444. self, service: "ApplicationService", events: Iterable[EventBase]
  445. ) -> List[JsonDict]:
  446. time_now = self.clock.time_msec()
  447. return [
  448. serialize_event(
  449. e,
  450. time_now,
  451. config=SerializeEventConfig(
  452. as_client_event=True,
  453. # If this is an invite or a knock membership event, and we're interested
  454. # in this user, then include any stripped state alongside the event.
  455. include_stripped_room_state=(
  456. e.type == EventTypes.Member
  457. and (
  458. e.membership == Membership.INVITE
  459. or e.membership == Membership.KNOCK
  460. )
  461. and service.is_interested_in_user(e.state_key)
  462. ),
  463. ),
  464. )
  465. for e in events
  466. ]