api.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import urllib
  17. from typing import TYPE_CHECKING, List, Optional, Tuple
  18. from prometheus_client import Counter
  19. from synapse.api.constants import EventTypes, ThirdPartyEntityKind
  20. from synapse.api.errors import CodeMessageException
  21. from synapse.events import EventBase
  22. from synapse.events.utils import serialize_event
  23. from synapse.http.client import SimpleHttpClient
  24. from synapse.types import JsonDict, ThirdPartyInstanceID
  25. from synapse.util.caches.response_cache import ResponseCache
  26. if TYPE_CHECKING:
  27. from synapse.appservice import ApplicationService
  28. logger = logging.getLogger(__name__)
  29. sent_transactions_counter = Counter(
  30. "synapse_appservice_api_sent_transactions",
  31. "Number of /transactions/ requests sent",
  32. ["service"],
  33. )
  34. failed_transactions_counter = Counter(
  35. "synapse_appservice_api_failed_transactions",
  36. "Number of /transactions/ requests that failed to send",
  37. ["service"],
  38. )
  39. sent_events_counter = Counter(
  40. "synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
  41. )
  42. HOUR_IN_MS = 60 * 60 * 1000
  43. APP_SERVICE_PREFIX = "/_matrix/app/unstable"
  44. def _is_valid_3pe_metadata(info):
  45. if "instances" not in info:
  46. return False
  47. if not isinstance(info["instances"], list):
  48. return False
  49. return True
  50. def _is_valid_3pe_result(r, field):
  51. if not isinstance(r, dict):
  52. return False
  53. for k in (field, "protocol"):
  54. if k not in r:
  55. return False
  56. if not isinstance(r[k], str):
  57. return False
  58. if "fields" not in r:
  59. return False
  60. fields = r["fields"]
  61. if not isinstance(fields, dict):
  62. return False
  63. for k in fields.keys():
  64. if not isinstance(fields[k], str):
  65. return False
  66. return True
  67. class ApplicationServiceApi(SimpleHttpClient):
  68. """This class manages HS -> AS communications, including querying and
  69. pushing.
  70. """
  71. def __init__(self, hs):
  72. super().__init__(hs)
  73. self.clock = hs.get_clock()
  74. self.protocol_meta_cache = ResponseCache(
  75. hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS
  76. ) # type: ResponseCache[Tuple[str, str]]
  77. async def query_user(self, service, user_id):
  78. if service.url is None:
  79. return False
  80. uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
  81. try:
  82. response = await self.get_json(uri, {"access_token": service.hs_token})
  83. if response is not None: # just an empty json object
  84. return True
  85. except CodeMessageException as e:
  86. if e.code == 404:
  87. return False
  88. logger.warning("query_user to %s received %s", uri, e.code)
  89. except Exception as ex:
  90. logger.warning("query_user to %s threw exception %s", uri, ex)
  91. return False
  92. async def query_alias(self, service, alias):
  93. if service.url is None:
  94. return False
  95. uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
  96. try:
  97. response = await self.get_json(uri, {"access_token": service.hs_token})
  98. if response is not None: # just an empty json object
  99. return True
  100. except CodeMessageException as e:
  101. logger.warning("query_alias to %s received %s", uri, e.code)
  102. if e.code == 404:
  103. return False
  104. except Exception as ex:
  105. logger.warning("query_alias to %s threw exception %s", uri, ex)
  106. return False
  107. async def query_3pe(self, service, kind, protocol, fields):
  108. if kind == ThirdPartyEntityKind.USER:
  109. required_field = "userid"
  110. elif kind == ThirdPartyEntityKind.LOCATION:
  111. required_field = "alias"
  112. else:
  113. raise ValueError("Unrecognised 'kind' argument %r to query_3pe()", kind)
  114. if service.url is None:
  115. return []
  116. uri = "%s%s/thirdparty/%s/%s" % (
  117. service.url,
  118. APP_SERVICE_PREFIX,
  119. kind,
  120. urllib.parse.quote(protocol),
  121. )
  122. try:
  123. response = await self.get_json(uri, fields)
  124. if not isinstance(response, list):
  125. logger.warning(
  126. "query_3pe to %s returned an invalid response %r", uri, response
  127. )
  128. return []
  129. ret = []
  130. for r in response:
  131. if _is_valid_3pe_result(r, field=required_field):
  132. ret.append(r)
  133. else:
  134. logger.warning(
  135. "query_3pe to %s returned an invalid result %r", uri, r
  136. )
  137. return ret
  138. except Exception as ex:
  139. logger.warning("query_3pe to %s threw exception %s", uri, ex)
  140. return []
  141. async def get_3pe_protocol(
  142. self, service: "ApplicationService", protocol: str
  143. ) -> Optional[JsonDict]:
  144. if service.url is None:
  145. return {}
  146. async def _get() -> Optional[JsonDict]:
  147. uri = "%s%s/thirdparty/protocol/%s" % (
  148. service.url,
  149. APP_SERVICE_PREFIX,
  150. urllib.parse.quote(protocol),
  151. )
  152. try:
  153. info = await self.get_json(uri)
  154. if not _is_valid_3pe_metadata(info):
  155. logger.warning(
  156. "query_3pe_protocol to %s did not return a valid result", uri
  157. )
  158. return None
  159. for instance in info.get("instances", []):
  160. network_id = instance.get("network_id", None)
  161. if network_id is not None:
  162. instance["instance_id"] = ThirdPartyInstanceID(
  163. service.id, network_id
  164. ).to_string()
  165. return info
  166. except Exception as ex:
  167. logger.warning("query_3pe_protocol to %s threw exception %s", uri, ex)
  168. return None
  169. key = (service.id, protocol)
  170. return await self.protocol_meta_cache.wrap(key, _get)
  171. async def push_bulk(
  172. self,
  173. service: "ApplicationService",
  174. events: List[EventBase],
  175. ephemeral: List[JsonDict],
  176. txn_id: Optional[int] = None,
  177. ):
  178. if service.url is None:
  179. return True
  180. events = self._serialize(service, events)
  181. if txn_id is None:
  182. logger.warning(
  183. "push_bulk: Missing txn ID sending events to %s", service.url
  184. )
  185. txn_id = 0
  186. uri = service.url + ("/transactions/%s" % urllib.parse.quote(str(txn_id)))
  187. # Never send ephemeral events to appservices that do not support it
  188. if service.supports_ephemeral:
  189. body = {"events": events, "de.sorunome.msc2409.ephemeral": ephemeral}
  190. else:
  191. body = {"events": events}
  192. try:
  193. await self.put_json(
  194. uri=uri, json_body=body, args={"access_token": service.hs_token},
  195. )
  196. sent_transactions_counter.labels(service.id).inc()
  197. sent_events_counter.labels(service.id).inc(len(events))
  198. return True
  199. except CodeMessageException as e:
  200. logger.warning("push_bulk to %s received %s", uri, e.code)
  201. except Exception as ex:
  202. logger.warning("push_bulk to %s threw exception %s", uri, ex)
  203. failed_transactions_counter.labels(service.id).inc()
  204. return False
  205. def _serialize(self, service, events):
  206. time_now = self.clock.time_msec()
  207. return [
  208. serialize_event(
  209. e,
  210. time_now,
  211. as_client_event=True,
  212. is_invite=(
  213. e.type == EventTypes.Member
  214. and e.membership == "invite"
  215. and service.is_interested_in_user(e.state_key)
  216. ),
  217. )
  218. for e in events
  219. ]