api.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from six.moves import urllib
  17. from prometheus_client import Counter
  18. from twisted.internet import defer
  19. from synapse.api.constants import ThirdPartyEntityKind
  20. from synapse.api.errors import CodeMessageException
  21. from synapse.events.utils import serialize_event
  22. from synapse.http.client import SimpleHttpClient
  23. from synapse.types import ThirdPartyInstanceID
  24. from synapse.util.caches.response_cache import ResponseCache
  25. logger = logging.getLogger(__name__)
  26. sent_transactions_counter = Counter(
  27. "synapse_appservice_api_sent_transactions",
  28. "Number of /transactions/ requests sent",
  29. ["service"],
  30. )
  31. failed_transactions_counter = Counter(
  32. "synapse_appservice_api_failed_transactions",
  33. "Number of /transactions/ requests that failed to send",
  34. ["service"],
  35. )
  36. sent_events_counter = Counter(
  37. "synapse_appservice_api_sent_events", "Number of events sent to the AS", ["service"]
  38. )
  39. HOUR_IN_MS = 60 * 60 * 1000
  40. APP_SERVICE_PREFIX = "/_matrix/app/unstable"
  41. def _is_valid_3pe_metadata(info):
  42. if "instances" not in info:
  43. return False
  44. if not isinstance(info["instances"], list):
  45. return False
  46. return True
  47. def _is_valid_3pe_result(r, field):
  48. if not isinstance(r, dict):
  49. return False
  50. for k in (field, "protocol"):
  51. if k not in r:
  52. return False
  53. if not isinstance(r[k], str):
  54. return False
  55. if "fields" not in r:
  56. return False
  57. fields = r["fields"]
  58. if not isinstance(fields, dict):
  59. return False
  60. for k in fields.keys():
  61. if not isinstance(fields[k], str):
  62. return False
  63. return True
  64. class ApplicationServiceApi(SimpleHttpClient):
  65. """This class manages HS -> AS communications, including querying and
  66. pushing.
  67. """
  68. def __init__(self, hs):
  69. super(ApplicationServiceApi, self).__init__(hs)
  70. self.clock = hs.get_clock()
  71. self.protocol_meta_cache = ResponseCache(
  72. hs, "as_protocol_meta", timeout_ms=HOUR_IN_MS
  73. )
  74. @defer.inlineCallbacks
  75. def query_user(self, service, user_id):
  76. if service.url is None:
  77. return False
  78. uri = service.url + ("/users/%s" % urllib.parse.quote(user_id))
  79. response = None
  80. try:
  81. response = yield self.get_json(uri, {"access_token": service.hs_token})
  82. if response is not None: # just an empty json object
  83. return True
  84. except CodeMessageException as e:
  85. if e.code == 404:
  86. return False
  87. logger.warning("query_user to %s received %s", uri, e.code)
  88. except Exception as ex:
  89. logger.warning("query_user to %s threw exception %s", uri, ex)
  90. return False
  91. @defer.inlineCallbacks
  92. def query_alias(self, service, alias):
  93. if service.url is None:
  94. return False
  95. uri = service.url + ("/rooms/%s" % urllib.parse.quote(alias))
  96. response = None
  97. try:
  98. response = yield self.get_json(uri, {"access_token": service.hs_token})
  99. if response is not None: # just an empty json object
  100. return True
  101. except CodeMessageException as e:
  102. logger.warning("query_alias to %s received %s", uri, e.code)
  103. if e.code == 404:
  104. return False
  105. except Exception as ex:
  106. logger.warning("query_alias to %s threw exception %s", uri, ex)
  107. return False
  108. @defer.inlineCallbacks
  109. def query_3pe(self, service, kind, protocol, fields):
  110. if kind == ThirdPartyEntityKind.USER:
  111. required_field = "userid"
  112. elif kind == ThirdPartyEntityKind.LOCATION:
  113. required_field = "alias"
  114. else:
  115. raise ValueError("Unrecognised 'kind' argument %r to query_3pe()", kind)
  116. if service.url is None:
  117. return []
  118. uri = "%s%s/thirdparty/%s/%s" % (
  119. service.url,
  120. APP_SERVICE_PREFIX,
  121. kind,
  122. urllib.parse.quote(protocol),
  123. )
  124. try:
  125. response = yield self.get_json(uri, fields)
  126. if not isinstance(response, list):
  127. logger.warning(
  128. "query_3pe to %s returned an invalid response %r", uri, response
  129. )
  130. return []
  131. ret = []
  132. for r in response:
  133. if _is_valid_3pe_result(r, field=required_field):
  134. ret.append(r)
  135. else:
  136. logger.warning(
  137. "query_3pe to %s returned an invalid result %r", uri, r
  138. )
  139. return ret
  140. except Exception as ex:
  141. logger.warning("query_3pe to %s threw exception %s", uri, ex)
  142. return []
  143. def get_3pe_protocol(self, service, protocol):
  144. if service.url is None:
  145. return {}
  146. @defer.inlineCallbacks
  147. def _get():
  148. uri = "%s%s/thirdparty/protocol/%s" % (
  149. service.url,
  150. APP_SERVICE_PREFIX,
  151. urllib.parse.quote(protocol),
  152. )
  153. try:
  154. info = yield self.get_json(uri, {})
  155. if not _is_valid_3pe_metadata(info):
  156. logger.warning(
  157. "query_3pe_protocol to %s did not return a valid result", uri
  158. )
  159. return None
  160. for instance in info.get("instances", []):
  161. network_id = instance.get("network_id", None)
  162. if network_id is not None:
  163. instance["instance_id"] = ThirdPartyInstanceID(
  164. service.id, network_id
  165. ).to_string()
  166. return info
  167. except Exception as ex:
  168. logger.warning("query_3pe_protocol to %s threw exception %s", uri, ex)
  169. return None
  170. key = (service.id, protocol)
  171. return self.protocol_meta_cache.wrap(key, _get)
  172. @defer.inlineCallbacks
  173. def push_bulk(self, service, events, txn_id=None):
  174. if service.url is None:
  175. return True
  176. events = self._serialize(events)
  177. if txn_id is None:
  178. logger.warning(
  179. "push_bulk: Missing txn ID sending events to %s", service.url
  180. )
  181. txn_id = str(0)
  182. txn_id = str(txn_id)
  183. uri = service.url + ("/transactions/%s" % urllib.parse.quote(txn_id))
  184. try:
  185. yield self.put_json(
  186. uri=uri,
  187. json_body={"events": events},
  188. args={"access_token": service.hs_token},
  189. )
  190. sent_transactions_counter.labels(service.id).inc()
  191. sent_events_counter.labels(service.id).inc(len(events))
  192. return True
  193. except CodeMessageException as e:
  194. logger.warning("push_bulk to %s received %s", uri, e.code)
  195. except Exception as ex:
  196. logger.warning("push_bulk to %s threw exception %s", uri, ex)
  197. failed_transactions_counter.labels(service.id).inc()
  198. return False
  199. def _serialize(self, events):
  200. time_now = self.clock.time_msec()
  201. return [serialize_event(e, time_now, as_client_event=True) for e in events]