api.py 8.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import urllib
  17. from prometheus_client import Counter
  18. from twisted.internet import defer
  19. from synapse.api.constants import ThirdPartyEntityKind
  20. from synapse.api.errors import CodeMessageException
  21. from synapse.events.utils import serialize_event
  22. from synapse.http.client import SimpleHttpClient
  23. from synapse.types import ThirdPartyInstanceID
  24. from synapse.util.caches.response_cache import ResponseCache
  25. logger = logging.getLogger(__name__)
  26. sent_transactions_counter = Counter(
  27. "synapse_appservice_api_sent_transactions",
  28. "Number of /transactions/ requests sent",
  29. ["service"]
  30. )
  31. failed_transactions_counter = Counter(
  32. "synapse_appservice_api_failed_transactions",
  33. "Number of /transactions/ requests that failed to send",
  34. ["service"]
  35. )
  36. sent_events_counter = Counter(
  37. "synapse_appservice_api_sent_events",
  38. "Number of events sent to the AS",
  39. ["service"]
  40. )
  41. HOUR_IN_MS = 60 * 60 * 1000
  42. APP_SERVICE_PREFIX = "/_matrix/app/unstable"
  43. def _is_valid_3pe_metadata(info):
  44. if "instances" not in info:
  45. return False
  46. if not isinstance(info["instances"], list):
  47. return False
  48. return True
  49. def _is_valid_3pe_result(r, field):
  50. if not isinstance(r, dict):
  51. return False
  52. for k in (field, "protocol"):
  53. if k not in r:
  54. return False
  55. if not isinstance(r[k], str):
  56. return False
  57. if "fields" not in r:
  58. return False
  59. fields = r["fields"]
  60. if not isinstance(fields, dict):
  61. return False
  62. for k in fields.keys():
  63. if not isinstance(fields[k], str):
  64. return False
  65. return True
  66. class ApplicationServiceApi(SimpleHttpClient):
  67. """This class manages HS -> AS communications, including querying and
  68. pushing.
  69. """
  70. def __init__(self, hs):
  71. super(ApplicationServiceApi, self).__init__(hs)
  72. self.clock = hs.get_clock()
  73. self.protocol_meta_cache = ResponseCache(hs, "as_protocol_meta",
  74. timeout_ms=HOUR_IN_MS)
  75. @defer.inlineCallbacks
  76. def query_user(self, service, user_id):
  77. if service.url is None:
  78. defer.returnValue(False)
  79. uri = service.url + ("/users/%s" % urllib.quote(user_id))
  80. response = None
  81. try:
  82. response = yield self.get_json(uri, {
  83. "access_token": service.hs_token
  84. })
  85. if response is not None: # just an empty json object
  86. defer.returnValue(True)
  87. except CodeMessageException as e:
  88. if e.code == 404:
  89. defer.returnValue(False)
  90. return
  91. logger.warning("query_user to %s received %s", uri, e.code)
  92. except Exception as ex:
  93. logger.warning("query_user to %s threw exception %s", uri, ex)
  94. defer.returnValue(False)
  95. @defer.inlineCallbacks
  96. def query_alias(self, service, alias):
  97. if service.url is None:
  98. defer.returnValue(False)
  99. uri = service.url + ("/rooms/%s" % urllib.quote(alias))
  100. response = None
  101. try:
  102. response = yield self.get_json(uri, {
  103. "access_token": service.hs_token
  104. })
  105. if response is not None: # just an empty json object
  106. defer.returnValue(True)
  107. except CodeMessageException as e:
  108. logger.warning("query_alias to %s received %s", uri, e.code)
  109. if e.code == 404:
  110. defer.returnValue(False)
  111. return
  112. except Exception as ex:
  113. logger.warning("query_alias to %s threw exception %s", uri, ex)
  114. defer.returnValue(False)
  115. @defer.inlineCallbacks
  116. def query_3pe(self, service, kind, protocol, fields):
  117. if kind == ThirdPartyEntityKind.USER:
  118. required_field = "userid"
  119. elif kind == ThirdPartyEntityKind.LOCATION:
  120. required_field = "alias"
  121. else:
  122. raise ValueError(
  123. "Unrecognised 'kind' argument %r to query_3pe()", kind
  124. )
  125. if service.url is None:
  126. defer.returnValue([])
  127. uri = "%s%s/thirdparty/%s/%s" % (
  128. service.url,
  129. APP_SERVICE_PREFIX,
  130. kind,
  131. urllib.quote(protocol)
  132. )
  133. try:
  134. response = yield self.get_json(uri, fields)
  135. if not isinstance(response, list):
  136. logger.warning(
  137. "query_3pe to %s returned an invalid response %r",
  138. uri, response
  139. )
  140. defer.returnValue([])
  141. ret = []
  142. for r in response:
  143. if _is_valid_3pe_result(r, field=required_field):
  144. ret.append(r)
  145. else:
  146. logger.warning(
  147. "query_3pe to %s returned an invalid result %r",
  148. uri, r
  149. )
  150. defer.returnValue(ret)
  151. except Exception as ex:
  152. logger.warning("query_3pe to %s threw exception %s", uri, ex)
  153. defer.returnValue([])
  154. def get_3pe_protocol(self, service, protocol):
  155. if service.url is None:
  156. defer.returnValue({})
  157. @defer.inlineCallbacks
  158. def _get():
  159. uri = "%s%s/thirdparty/protocol/%s" % (
  160. service.url,
  161. APP_SERVICE_PREFIX,
  162. urllib.quote(protocol)
  163. )
  164. try:
  165. info = yield self.get_json(uri, {})
  166. if not _is_valid_3pe_metadata(info):
  167. logger.warning("query_3pe_protocol to %s did not return a"
  168. " valid result", uri)
  169. defer.returnValue(None)
  170. for instance in info.get("instances", []):
  171. network_id = instance.get("network_id", None)
  172. if network_id is not None:
  173. instance["instance_id"] = ThirdPartyInstanceID(
  174. service.id, network_id,
  175. ).to_string()
  176. defer.returnValue(info)
  177. except Exception as ex:
  178. logger.warning("query_3pe_protocol to %s threw exception %s",
  179. uri, ex)
  180. defer.returnValue(None)
  181. key = (service.id, protocol)
  182. return self.protocol_meta_cache.wrap(key, _get)
  183. @defer.inlineCallbacks
  184. def push_bulk(self, service, events, txn_id=None):
  185. if service.url is None:
  186. defer.returnValue(True)
  187. events = self._serialize(events)
  188. if txn_id is None:
  189. logger.warning("push_bulk: Missing txn ID sending events to %s",
  190. service.url)
  191. txn_id = str(0)
  192. txn_id = str(txn_id)
  193. uri = service.url + ("/transactions/%s" %
  194. urllib.quote(txn_id))
  195. try:
  196. yield self.put_json(
  197. uri=uri,
  198. json_body={
  199. "events": events
  200. },
  201. args={
  202. "access_token": service.hs_token
  203. })
  204. sent_transactions_counter.labels(service.id).inc()
  205. sent_events_counter.labels(service.id).inc(len(events))
  206. defer.returnValue(True)
  207. return
  208. except CodeMessageException as e:
  209. logger.warning("push_bulk to %s received %s", uri, e.code)
  210. except Exception as ex:
  211. logger.warning("push_bulk to %s threw exception %s", uri, ex)
  212. failed_transactions_counter.labels(service.id).inc()
  213. defer.returnValue(False)
  214. def _serialize(self, events):
  215. time_now = self.clock.time_msec()
  216. return [
  217. serialize_event(e, time_now, as_client_event=True) for e in events
  218. ]