api.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. # Copyright 2022 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import urllib.parse
  17. from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Mapping, Optional, Tuple
  18. from prometheus_client import Counter
  19. from typing_extensions import TypeGuard
  20. from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind
  21. from synapse.api.errors import CodeMessageException
  22. from synapse.appservice import (
  23. ApplicationService,
  24. TransactionOneTimeKeyCounts,
  25. TransactionUnusedFallbackKeys,
  26. )
  27. from synapse.events import EventBase
  28. from synapse.events.utils import SerializeEventConfig, serialize_event
  29. from synapse.http.client import SimpleHttpClient
  30. from synapse.types import DeviceListUpdates, JsonDict, ThirdPartyInstanceID
  31. from synapse.util.caches.response_cache import ResponseCache
  32. if TYPE_CHECKING:
  33. from synapse.server import HomeServer
  34. logger = logging.getLogger(__name__)
  35. sent_transactions_counter = Counter(
  36. "synapse_appservice_api_sent_transactions",
  37. "Number of /transactions/ requests sent",
  38. ["service"],
  39. )
  40. failed_transactions_counter = Counter(
  41. "synapse_appservice_api_failed_transactions",
  42. "Number of /transactions/ requests that failed to send",
  43. ["service"],
  44. )
  45. sent_events_counter = Counter(
  46. "synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
  47. )
  48. HOUR_IN_MS = 60 * 60 * 1000
  49. APP_SERVICE_PREFIX = "/_matrix/app/unstable"
  50. def _is_valid_3pe_metadata(info: JsonDict) -> bool:
  51. if "instances" not in info:
  52. return False
  53. if not isinstance(info["instances"], list):
  54. return False
  55. return True
  56. def _is_valid_3pe_result(r: object, field: str) -> TypeGuard[JsonDict]:
  57. if not isinstance(r, dict):
  58. return False
  59. for k in (field, "protocol"):
  60. if k not in r:
  61. return False
  62. if not isinstance(r[k], str):
  63. return False
  64. if "fields" not in r:
  65. return False
  66. fields = r["fields"]
  67. if not isinstance(fields, dict):
  68. return False
  69. return True
  70. class ApplicationServiceApi(SimpleHttpClient):
  71. """This class manages HS -> AS communications, including querying and
  72. pushing.
  73. """
  74. def __init__(self, hs: "HomeServer"):
  75. super().__init__(hs)
  76. self.clock = hs.get_clock()
  77. self.protocol_meta_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
  78. hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
  79. )
  80. async def query_user(self, service: "ApplicationService", user_id: str) -> bool:
  81. if service.url is None:
  82. return False
  83. # This is required by the configuration.
  84. assert service.hs_token is not None
  85. uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
  86. try:
  87. response = await self.get_json(uri, {"access_token": service.hs_token})
  88. if response is not None: # just an empty json object
  89. return True
  90. except CodeMessageException as e:
  91. if e.code == 404:
  92. return False
  93. logger.warning("query_user to %s received %s", uri, e.code)
  94. except Exception as ex:
  95. logger.warning("query_user to %s threw exception %s", uri, ex)
  96. return False
  97. async def query_alias(self, service: "ApplicationService", alias: str) -> bool:
  98. if service.url is None:
  99. return False
  100. # This is required by the configuration.
  101. assert service.hs_token is not None
  102. uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
  103. try:
  104. response = await self.get_json(uri, {"access_token": service.hs_token})
  105. if response is not None: # just an empty json object
  106. return True
  107. except CodeMessageException as e:
  108. logger.warning("query_alias to %s received %s", uri, e.code)
  109. if e.code == 404:
  110. return False
  111. except Exception as ex:
  112. logger.warning("query_alias to %s threw exception %s", uri, ex)
  113. return False
  114. async def query_3pe(
  115. self,
  116. service: "ApplicationService",
  117. kind: str,
  118. protocol: str,
  119. fields: Dict[bytes, List[bytes]],
  120. ) -> List[JsonDict]:
  121. if kind == ThirdPartyEntityKind.USER:
  122. required_field = "userid"
  123. elif kind == ThirdPartyEntityKind.LOCATION:
  124. required_field = "alias"
  125. else:
  126. raise ValueError("Unrecognised 'kind' argument %r to query_3pe()", kind)
  127. if service.url is None:
  128. return []
  129. # This is required by the configuration.
  130. assert service.hs_token is not None
  131. uri = "%s%s/thirdparty/%s/%s" % (
  132. service.url,
  133. APP_SERVICE_PREFIX,
  134. kind,
  135. urllib.parse.quote(protocol),
  136. )
  137. try:
  138. args: Mapping[Any, Any] = {
  139. **fields,
  140. b"access_token": service.hs_token,
  141. }
  142. response = await self.get_json(uri, args=args)
  143. if not isinstance(response, list):
  144. logger.warning(
  145. "query_3pe to %s returned an invalid response %r", uri, response
  146. )
  147. return []
  148. ret = []
  149. for r in response:
  150. if _is_valid_3pe_result(r, field=required_field):
  151. ret.append(r)
  152. else:
  153. logger.warning(
  154. "query_3pe to %s returned an invalid result %r", uri, r
  155. )
  156. return ret
  157. except Exception as ex:
  158. logger.warning("query_3pe to %s threw exception %s", uri, ex)
  159. return []
  160. async def get_3pe_protocol(
  161. self, service: "ApplicationService", protocol: str
  162. ) -> Optional[JsonDict]:
  163. if service.url is None:
  164. return {}
  165. async def _get() -> Optional[JsonDict]:
  166. # This is required by the configuration.
  167. assert service.hs_token is not None
  168. uri = "%s%s/thirdparty/protocol/%s" % (
  169. service.url,
  170. APP_SERVICE_PREFIX,
  171. urllib.parse.quote(protocol),
  172. )
  173. try:
  174. info = await self.get_json(uri, {"access_token": service.hs_token})
  175. if not _is_valid_3pe_metadata(info):
  176. logger.warning(
  177. "query_3pe_protocol to %s did not return a valid result", uri
  178. )
  179. return None
  180. for instance in info.get("instances", []):
  181. network_id = instance.get("network_id", None)
  182. if network_id is not None:
  183. instance["instance_id"] = ThirdPartyInstanceID(
  184. service.id, network_id
  185. ).to_string()
  186. return info
  187. except Exception as ex:
  188. logger.warning("query_3pe_protocol to %s threw exception %s", uri, ex)
  189. return None
  190. key = (service.id, protocol)
  191. return await self.protocol_meta_cache.wrap(key, _get)
  192. async def push_bulk(
  193. self,
  194. service: "ApplicationService",
  195. events: List[EventBase],
  196. ephemeral: List[JsonDict],
  197. to_device_messages: List[JsonDict],
  198. one_time_key_counts: TransactionOneTimeKeyCounts,
  199. unused_fallback_keys: TransactionUnusedFallbackKeys,
  200. device_list_summary: DeviceListUpdates,
  201. txn_id: Optional[int] = None,
  202. ) -> bool:
  203. """
  204. Push data to an application service.
  205. Args:
  206. service: The application service to send to.
  207. events: The persistent events to send.
  208. ephemeral: The ephemeral events to send.
  209. to_device_messages: The to-device messages to send.
  210. txn_id: An unique ID to assign to this transaction. Application services should
  211. deduplicate transactions received with identitical IDs.
  212. Returns:
  213. True if the task succeeded, False if it failed.
  214. """
  215. if service.url is None:
  216. return True
  217. # This is required by the configuration.
  218. assert service.hs_token is not None
  219. serialized_events = self._serialize(service, events)
  220. if txn_id is None:
  221. logger.warning(
  222. "push_bulk: Missing txn ID sending events to %s", service.url
  223. )
  224. txn_id = 0
  225. uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
  226. # Never send ephemeral events to appservices that do not support it
  227. body: JsonDict = {"events": serialized_events}
  228. if service.supports_ephemeral:
  229. body.update(
  230. {
  231. # TODO: Update to stable prefixes once MSC2409 completes FCP merge.
  232. "de.sorunome.msc2409.ephemeral": ephemeral,
  233. "de.sorunome.msc2409.to_device": to_device_messages,
  234. }
  235. )
  236. # TODO: Update to stable prefixes once MSC3202 completes FCP merge
  237. if service.msc3202_transaction_extensions:
  238. if one_time_key_counts:
  239. body[
  240. "org.matrix.msc3202.device_one_time_key_counts"
  241. ] = one_time_key_counts
  242. if unused_fallback_keys:
  243. body[
  244. "org.matrix.msc3202.device_unused_fallback_key_types"
  245. ] = unused_fallback_keys
  246. if device_list_summary:
  247. body["org.matrix.msc3202.device_lists"] = {
  248. "changed": list(device_list_summary.changed),
  249. "left": list(device_list_summary.left),
  250. }
  251. try:
  252. await self.put_json(
  253. uri=uri,
  254. json_body=body,
  255. args={"access_token": service.hs_token},
  256. )
  257. if logger.isEnabledFor(logging.DEBUG):
  258. logger.debug(
  259. "push_bulk to %s succeeded! events=%s",
  260. uri,
  261. [event.get("event_id") for event in events],
  262. )
  263. sent_transactions_counter.labels(service.id).inc()
  264. sent_events_counter.labels(service.id).inc(len(serialized_events))
  265. return True
  266. except CodeMessageException as e:
  267. logger.warning(
  268. "push_bulk to %s received code=%s msg=%s",
  269. uri,
  270. e.code,
  271. e.msg,
  272. exc_info=logger.isEnabledFor(logging.DEBUG),
  273. )
  274. except Exception as ex:
  275. logger.warning(
  276. "push_bulk to %s threw exception(%s) %s args=%s",
  277. uri,
  278. type(ex).__name__,
  279. ex,
  280. ex.args,
  281. exc_info=logger.isEnabledFor(logging.DEBUG),
  282. )
  283. failed_transactions_counter.labels(service.id).inc()
  284. return False
  285. def _serialize(
  286. self, service: "ApplicationService", events: Iterable[EventBase]
  287. ) -> List[JsonDict]:
  288. time_now = self.clock.time_msec()
  289. return [
  290. serialize_event(
  291. e,
  292. time_now,
  293. config=SerializeEventConfig(
  294. as_client_event=True,
  295. # If this is an invite or a knock membership event, and we're interested
  296. # in this user, then include any stripped state alongside the event.
  297. include_stripped_room_state=(
  298. e.type == EventTypes.Member
  299. and (
  300. e.membership == Membership.INVITE
  301. or e.membership == Membership.KNOCK
  302. )
  303. and service.is_interested_in_user(e.state_key)
  304. ),
  305. ),
  306. )
  307. for e in events
  308. ]