appservice.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer
  16. from six import itervalues
  17. import synapse
  18. from synapse.api.constants import EventTypes
  19. from synapse.util.metrics import Measure
  20. from synapse.util.logcontext import (
  21. make_deferred_yieldable, run_in_background,
  22. )
  23. from prometheus_client import Counter
  24. import logging
  25. logger = logging.getLogger(__name__)
  26. events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
  27. def log_failure(failure):
  28. logger.error(
  29. "Application Services Failure",
  30. exc_info=(
  31. failure.type,
  32. failure.value,
  33. failure.getTracebackObject()
  34. )
  35. )
  36. class ApplicationServicesHandler(object):
  37. def __init__(self, hs):
  38. self.store = hs.get_datastore()
  39. self.is_mine_id = hs.is_mine_id
  40. self.appservice_api = hs.get_application_service_api()
  41. self.scheduler = hs.get_application_service_scheduler()
  42. self.started_scheduler = False
  43. self.clock = hs.get_clock()
  44. self.notify_appservices = hs.config.notify_appservices
  45. self.current_max = 0
  46. self.is_processing = False
  47. @defer.inlineCallbacks
  48. def notify_interested_services(self, current_id):
  49. """Notifies (pushes) all application services interested in this event.
  50. Pushing is done asynchronously, so this method won't block for any
  51. prolonged length of time.
  52. Args:
  53. current_id(int): The current maximum ID.
  54. """
  55. services = self.store.get_app_services()
  56. if not services or not self.notify_appservices:
  57. return
  58. self.current_max = max(self.current_max, current_id)
  59. if self.is_processing:
  60. return
  61. with Measure(self.clock, "notify_interested_services"):
  62. self.is_processing = True
  63. try:
  64. limit = 100
  65. while True:
  66. upper_bound, events = yield self.store.get_new_events_for_appservice(
  67. self.current_max, limit
  68. )
  69. if not events:
  70. break
  71. events_by_room = {}
  72. for event in events:
  73. events_by_room.setdefault(event.room_id, []).append(event)
  74. @defer.inlineCallbacks
  75. def handle_event(event):
  76. # Gather interested services
  77. services = yield self._get_services_for_event(event)
  78. if len(services) == 0:
  79. return # no services need notifying
  80. # Do we know this user exists? If not, poke the user
  81. # query API for all services which match that user regex.
  82. # This needs to block as these user queries need to be
  83. # made BEFORE pushing the event.
  84. yield self._check_user_exists(event.sender)
  85. if event.type == EventTypes.Member:
  86. yield self._check_user_exists(event.state_key)
  87. if not self.started_scheduler:
  88. self.scheduler.start().addErrback(log_failure)
  89. self.started_scheduler = True
  90. # Fork off pushes to these services
  91. for service in services:
  92. self.scheduler.submit_event_for_as(service, event)
  93. @defer.inlineCallbacks
  94. def handle_room_events(events):
  95. for event in events:
  96. yield handle_event(event)
  97. yield make_deferred_yieldable(defer.gatherResults([
  98. run_in_background(handle_room_events, evs)
  99. for evs in itervalues(events_by_room)
  100. ], consumeErrors=True))
  101. yield self.store.set_appservice_last_pos(upper_bound)
  102. now = self.clock.time_msec()
  103. ts = yield self.store.get_received_ts(events[-1].event_id)
  104. synapse.metrics.event_processing_positions.labels(
  105. "appservice_sender").set(upper_bound)
  106. events_processed_counter.inc(len(events))
  107. synapse.metrics.event_processing_lag.labels(
  108. "appservice_sender").set(now - ts)
  109. synapse.metrics.event_processing_last_ts.labels(
  110. "appservice_sender").set(ts)
  111. finally:
  112. self.is_processing = False
  113. @defer.inlineCallbacks
  114. def query_user_exists(self, user_id):
  115. """Check if any application service knows this user_id exists.
  116. Args:
  117. user_id(str): The user to query if they exist on any AS.
  118. Returns:
  119. True if this user exists on at least one application service.
  120. """
  121. user_query_services = yield self._get_services_for_user(
  122. user_id=user_id
  123. )
  124. for user_service in user_query_services:
  125. is_known_user = yield self.appservice_api.query_user(
  126. user_service, user_id
  127. )
  128. if is_known_user:
  129. defer.returnValue(True)
  130. defer.returnValue(False)
  131. @defer.inlineCallbacks
  132. def query_room_alias_exists(self, room_alias):
  133. """Check if an application service knows this room alias exists.
  134. Args:
  135. room_alias(RoomAlias): The room alias to query.
  136. Returns:
  137. namedtuple: with keys "room_id" and "servers" or None if no
  138. association can be found.
  139. """
  140. room_alias_str = room_alias.to_string()
  141. services = self.store.get_app_services()
  142. alias_query_services = [
  143. s for s in services if (
  144. s.is_interested_in_alias(room_alias_str)
  145. )
  146. ]
  147. for alias_service in alias_query_services:
  148. is_known_alias = yield self.appservice_api.query_alias(
  149. alias_service, room_alias_str
  150. )
  151. if is_known_alias:
  152. # the alias exists now so don't query more ASes.
  153. result = yield self.store.get_association_from_room_alias(
  154. room_alias
  155. )
  156. defer.returnValue(result)
  157. @defer.inlineCallbacks
  158. def query_3pe(self, kind, protocol, fields):
  159. services = yield self._get_services_for_3pn(protocol)
  160. results = yield make_deferred_yieldable(defer.DeferredList([
  161. run_in_background(
  162. self.appservice_api.query_3pe,
  163. service, kind, protocol, fields,
  164. )
  165. for service in services
  166. ], consumeErrors=True))
  167. ret = []
  168. for (success, result) in results:
  169. if success:
  170. ret.extend(result)
  171. defer.returnValue(ret)
  172. @defer.inlineCallbacks
  173. def get_3pe_protocols(self, only_protocol=None):
  174. services = self.store.get_app_services()
  175. protocols = {}
  176. # Collect up all the individual protocol responses out of the ASes
  177. for s in services:
  178. for p in s.protocols:
  179. if only_protocol is not None and p != only_protocol:
  180. continue
  181. if p not in protocols:
  182. protocols[p] = []
  183. info = yield self.appservice_api.get_3pe_protocol(s, p)
  184. if info is not None:
  185. protocols[p].append(info)
  186. def _merge_instances(infos):
  187. if not infos:
  188. return {}
  189. # Merge the 'instances' lists of multiple results, but just take
  190. # the other fields from the first as they ought to be identical
  191. # copy the result so as not to corrupt the cached one
  192. combined = dict(infos[0])
  193. combined["instances"] = list(combined["instances"])
  194. for info in infos[1:]:
  195. combined["instances"].extend(info["instances"])
  196. return combined
  197. for p in protocols.keys():
  198. protocols[p] = _merge_instances(protocols[p])
  199. defer.returnValue(protocols)
  200. @defer.inlineCallbacks
  201. def _get_services_for_event(self, event):
  202. """Retrieve a list of application services interested in this event.
  203. Args:
  204. event(Event): The event to check. Can be None if alias_list is not.
  205. Returns:
  206. list<ApplicationService>: A list of services interested in this
  207. event based on the service regex.
  208. """
  209. services = self.store.get_app_services()
  210. # we can't use a list comprehension here. Since python 3, list
  211. # comprehensions use a generator internally. This means you can't yield
  212. # inside of a list comprehension anymore.
  213. interested_list = []
  214. for s in services:
  215. if (yield s.is_interested(event, self.store)):
  216. interested_list.append(s)
  217. defer.returnValue(interested_list)
  218. def _get_services_for_user(self, user_id):
  219. services = self.store.get_app_services()
  220. interested_list = [
  221. s for s in services if (
  222. s.is_interested_in_user(user_id)
  223. )
  224. ]
  225. return defer.succeed(interested_list)
  226. def _get_services_for_3pn(self, protocol):
  227. services = self.store.get_app_services()
  228. interested_list = [
  229. s for s in services if s.is_interested_in_protocol(protocol)
  230. ]
  231. return defer.succeed(interested_list)
  232. @defer.inlineCallbacks
  233. def _is_unknown_user(self, user_id):
  234. if not self.is_mine_id(user_id):
  235. # we don't know if they are unknown or not since it isn't one of our
  236. # users. We can't poke ASes.
  237. defer.returnValue(False)
  238. return
  239. user_info = yield self.store.get_user_by_id(user_id)
  240. if user_info:
  241. defer.returnValue(False)
  242. return
  243. # user not found; could be the AS though, so check.
  244. services = self.store.get_app_services()
  245. service_list = [s for s in services if s.sender == user_id]
  246. defer.returnValue(len(service_list) == 0)
  247. @defer.inlineCallbacks
  248. def _check_user_exists(self, user_id):
  249. unknown_user = yield self._is_unknown_user(user_id)
  250. if unknown_user:
  251. exists = yield self.query_user_exists(user_id)
  252. defer.returnValue(exists)
  253. defer.returnValue(True)