api.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import logging
  15. import urllib.parse
  16. from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple
  17. from prometheus_client import Counter
  18. from synapse.api.constants import EventTypes, Membership, ThirdPartyEntityKind
  19. from synapse.api.errors import CodeMessageException
  20. from synapse.appservice import (
  21. ApplicationService,
  22. TransactionOneTimeKeyCounts,
  23. TransactionUnusedFallbackKeys,
  24. )
  25. from synapse.events import EventBase
  26. from synapse.events.utils import SerializeEventConfig, serialize_event
  27. from synapse.http.client import SimpleHttpClient
  28. from synapse.types import JsonDict, ThirdPartyInstanceID
  29. from synapse.util.caches.response_cache import ResponseCache
  30. if TYPE_CHECKING:
  31. from synapse.server import HomeServer
  32. logger = logging.getLogger(__name__)
  33. sent_transactions_counter = Counter(
  34. "synapse_appservice_api_sent_transactions",
  35. "Number of /transactions/ requests sent",
  36. ["service"],
  37. )
  38. failed_transactions_counter = Counter(
  39. "synapse_appservice_api_failed_transactions",
  40. "Number of /transactions/ requests that failed to send",
  41. ["service"],
  42. )
  43. sent_events_counter = Counter(
  44. "synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
  45. )
  46. HOUR_IN_MS = 60 * 60 * 1000
  47. APP_SERVICE_PREFIX = "/_matrix/app/unstable"
  48. def _is_valid_3pe_metadata(info: JsonDict) -> bool:
  49. if "instances" not in info:
  50. return False
  51. if not isinstance(info["instances"], list):
  52. return False
  53. return True
  54. def _is_valid_3pe_result(r: JsonDict, field: str) -> bool:
  55. if not isinstance(r, dict):
  56. return False
  57. for k in (field, "protocol"):
  58. if k not in r:
  59. return False
  60. if not isinstance(r[k], str):
  61. return False
  62. if "fields" not in r:
  63. return False
  64. fields = r["fields"]
  65. if not isinstance(fields, dict):
  66. return False
  67. return True
  68. class ApplicationServiceApi(SimpleHttpClient):
  69. """This class manages HS -> AS communications, including querying and
  70. pushing.
  71. """
  72. def __init__(self, hs: "HomeServer"):
  73. super().__init__(hs)
  74. self.clock = hs.get_clock()
  75. self.protocol_meta_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
  76. hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS
  77. )
  78. async def query_user(self, service: "ApplicationService", user_id: str) -> bool:
  79. if service.url is None:
  80. return False
  81. # This is required by the configuration.
  82. assert service.hs_token is not None
  83. uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
  84. try:
  85. response = await self.get_json(uri, {"access_token": service.hs_token})
  86. if response is not None: # just an empty json object
  87. return True
  88. except CodeMessageException as e:
  89. if e.code == 404:
  90. return False
  91. logger.warning("query_user to %s received %s", uri, e.code)
  92. except Exception as ex:
  93. logger.warning("query_user to %s threw exception %s", uri, ex)
  94. return False
  95. async def query_alias(self, service: "ApplicationService", alias: str) -> bool:
  96. if service.url is None:
  97. return False
  98. # This is required by the configuration.
  99. assert service.hs_token is not None
  100. uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
  101. try:
  102. response = await self.get_json(uri, {"access_token": service.hs_token})
  103. if response is not None: # just an empty json object
  104. return True
  105. except CodeMessageException as e:
  106. logger.warning("query_alias to %s received %s", uri, e.code)
  107. if e.code == 404:
  108. return False
  109. except Exception as ex:
  110. logger.warning("query_alias to %s threw exception %s", uri, ex)
  111. return False
  112. async def query_3pe(
  113. self,
  114. service: "ApplicationService",
  115. kind: str,
  116. protocol: str,
  117. fields: Dict[bytes, List[bytes]],
  118. ) -> List[JsonDict]:
  119. if kind == ThirdPartyEntityKind.USER:
  120. required_field = "userid"
  121. elif kind == ThirdPartyEntityKind.LOCATION:
  122. required_field = "alias"
  123. else:
  124. raise ValueError("Unrecognised 'kind' argument %r to query_3pe()", kind)
  125. if service.url is None:
  126. return []
  127. uri = "%s%s/thirdparty/%s/%s" % (
  128. service.url,
  129. APP_SERVICE_PREFIX,
  130. kind,
  131. urllib.parse.quote(protocol),
  132. )
  133. try:
  134. response = await self.get_json(uri, fields)
  135. if not isinstance(response, list):
  136. logger.warning(
  137. "query_3pe to %s returned an invalid response %r", uri, response
  138. )
  139. return []
  140. ret = []
  141. for r in response:
  142. if _is_valid_3pe_result(r, field=required_field):
  143. ret.append(r)
  144. else:
  145. logger.warning(
  146. "query_3pe to %s returned an invalid result %r", uri, r
  147. )
  148. return ret
  149. except Exception as ex:
  150. logger.warning("query_3pe to %s threw exception %s", uri, ex)
  151. return []
  152. async def get_3pe_protocol(
  153. self, service: "ApplicationService", protocol: str
  154. ) -> Optional[JsonDict]:
  155. if service.url is None:
  156. return {}
  157. async def _get() -> Optional[JsonDict]:
  158. uri = "%s%s/thirdparty/protocol/%s" % (
  159. service.url,
  160. APP_SERVICE_PREFIX,
  161. urllib.parse.quote(protocol),
  162. )
  163. try:
  164. info = await self.get_json(uri)
  165. if not _is_valid_3pe_metadata(info):
  166. logger.warning(
  167. "query_3pe_protocol to %s did not return a valid result", uri
  168. )
  169. return None
  170. for instance in info.get("instances", []):
  171. network_id = instance.get("network_id", None)
  172. if network_id is not None:
  173. instance["instance_id"] = ThirdPartyInstanceID(
  174. service.id, network_id
  175. ).to_string()
  176. return info
  177. except Exception as ex:
  178. logger.warning("query_3pe_protocol to %s threw exception %s", uri, ex)
  179. return None
  180. key = (service.id, protocol)
  181. return await self.protocol_meta_cache.wrap(key, _get)
  182. async def push_bulk(
  183. self,
  184. service: "ApplicationService",
  185. events: List[EventBase],
  186. ephemeral: List[JsonDict],
  187. to_device_messages: List[JsonDict],
  188. one_time_key_counts: TransactionOneTimeKeyCounts,
  189. unused_fallback_keys: TransactionUnusedFallbackKeys,
  190. txn_id: Optional[int] = None,
  191. ) -> bool:
  192. """
  193. Push data to an application service.
  194. Args:
  195. service: The application service to send to.
  196. events: The persistent events to send.
  197. ephemeral: The ephemeral events to send.
  198. to_device_messages: The to-device messages to send.
  199. txn_id: An unique ID to assign to this transaction. Application services should
  200. deduplicate transactions received with identitical IDs.
  201. Returns:
  202. True if the task succeeded, False if it failed.
  203. """
  204. if service.url is None:
  205. return True
  206. # This is required by the configuration.
  207. assert service.hs_token is not None
  208. serialized_events = self._serialize(service, events)
  209. if txn_id is None:
  210. logger.warning(
  211. "push_bulk: Missing txn ID sending events to %s", service.url
  212. )
  213. txn_id = 0
  214. uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
  215. # Never send ephemeral events to appservices that do not support it
  216. body: JsonDict = {"events": serialized_events}
  217. if service.supports_ephemeral:
  218. body.update(
  219. {
  220. # TODO: Update to stable prefixes once MSC2409 completes FCP merge.
  221. "de.sorunome.msc2409.ephemeral": ephemeral,
  222. "de.sorunome.msc2409.to_device": to_device_messages,
  223. }
  224. )
  225. if service.msc3202_transaction_extensions:
  226. if one_time_key_counts:
  227. body[
  228. "org.matrix.msc3202.device_one_time_key_counts"
  229. ] = one_time_key_counts
  230. if unused_fallback_keys:
  231. body[
  232. "org.matrix.msc3202.device_unused_fallback_keys"
  233. ] = unused_fallback_keys
  234. try:
  235. await self.put_json(
  236. uri=uri,
  237. json_body=body,
  238. args={"access_token": service.hs_token},
  239. )
  240. if logger.isEnabledFor(logging.DEBUG):
  241. logger.debug(
  242. "push_bulk to %s succeeded! events=%s",
  243. uri,
  244. [event.get("event_id") for event in events],
  245. )
  246. sent_transactions_counter.labels(service.id).inc()
  247. sent_events_counter.labels(service.id).inc(len(serialized_events))
  248. return True
  249. except CodeMessageException as e:
  250. logger.warning(
  251. "push_bulk to %s received code=%s msg=%s",
  252. uri,
  253. e.code,
  254. e.msg,
  255. exc_info=logger.isEnabledFor(logging.DEBUG),
  256. )
  257. except Exception as ex:
  258. logger.warning(
  259. "push_bulk to %s threw exception(%s) %s args=%s",
  260. uri,
  261. type(ex).__name__,
  262. ex,
  263. ex.args,
  264. exc_info=logger.isEnabledFor(logging.DEBUG),
  265. )
  266. failed_transactions_counter.labels(service.id).inc()
  267. return False
  268. def _serialize(
  269. self, service: "ApplicationService", events: Iterable[EventBase]
  270. ) -> List[JsonDict]:
  271. time_now = self.clock.time_msec()
  272. return [
  273. serialize_event(
  274. e,
  275. time_now,
  276. config=SerializeEventConfig(
  277. as_client_event=True,
  278. # If this is an invite or a knock membership event, and we're interested
  279. # in this user, then include any stripped state alongside the event.
  280. include_stripped_room_state=(
  281. e.type == EventTypes.Member
  282. and (
  283. e.membership == Membership.INVITE
  284. or e.membership == Membership.KNOCK
  285. )
  286. and service.is_interested_in_user(e.state_key)
  287. ),
  288. ),
  289. )
  290. for e in events
  291. ]