typing.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from collections import namedtuple
  17. from twisted.internet import defer
  18. from synapse.api.errors import AuthError, SynapseError
  19. from synapse.logging.context import run_in_background
  20. from synapse.types import UserID, get_domain_from_id
  21. from synapse.util.caches.stream_change_cache import StreamChangeCache
  22. from synapse.util.metrics import Measure
  23. from synapse.util.wheel_timer import WheelTimer
  24. logger = logging.getLogger(__name__)
  25. # A tiny object useful for storing a user's membership in a room, as a mapping
  26. # key
  27. RoomMember = namedtuple("RoomMember", ("room_id", "user_id"))
  28. # How often we expect remote servers to resend us presence.
  29. FEDERATION_TIMEOUT = 60 * 1000
  30. # How often to resend typing across federation.
  31. FEDERATION_PING_INTERVAL = 40 * 1000
  32. class TypingHandler(object):
  33. def __init__(self, hs):
  34. self.store = hs.get_datastore()
  35. self.server_name = hs.config.server_name
  36. self.auth = hs.get_auth()
  37. self.is_mine_id = hs.is_mine_id
  38. self.notifier = hs.get_notifier()
  39. self.state = hs.get_state_handler()
  40. self.hs = hs
  41. self.clock = hs.get_clock()
  42. self.wheel_timer = WheelTimer(bucket_size=5000)
  43. self.federation = hs.get_federation_sender()
  44. hs.get_federation_registry().register_edu_handler("m.typing", self._recv_edu)
  45. hs.get_distributor().observe("user_left_room", self.user_left_room)
  46. self._member_typing_until = {} # clock time we expect to stop
  47. self._member_last_federation_poke = {}
  48. self._latest_room_serial = 0
  49. self._reset()
  50. # caches which room_ids changed at which serials
  51. self._typing_stream_change_cache = StreamChangeCache(
  52. "TypingStreamChangeCache", self._latest_room_serial
  53. )
  54. self.clock.looping_call(self._handle_timeouts, 5000)
  55. def _reset(self):
  56. """
  57. Reset the typing handler's data caches.
  58. """
  59. # map room IDs to serial numbers
  60. self._room_serials = {}
  61. # map room IDs to sets of users currently typing
  62. self._room_typing = {}
  63. def _handle_timeouts(self):
  64. logger.info("Checking for typing timeouts")
  65. now = self.clock.time_msec()
  66. members = set(self.wheel_timer.fetch(now))
  67. for member in members:
  68. if not self.is_typing(member):
  69. # Nothing to do if they're no longer typing
  70. continue
  71. until = self._member_typing_until.get(member, None)
  72. if not until or until <= now:
  73. logger.info("Timing out typing for: %s", member.user_id)
  74. self._stopped_typing(member)
  75. continue
  76. # Check if we need to resend a keep alive over federation for this
  77. # user.
  78. if self.hs.is_mine_id(member.user_id):
  79. last_fed_poke = self._member_last_federation_poke.get(member, None)
  80. if not last_fed_poke or last_fed_poke + FEDERATION_PING_INTERVAL <= now:
  81. run_in_background(self._push_remote, member=member, typing=True)
  82. # Add a paranoia timer to ensure that we always have a timer for
  83. # each person typing.
  84. self.wheel_timer.insert(now=now, obj=member, then=now + 60 * 1000)
  85. def is_typing(self, member):
  86. return member.user_id in self._room_typing.get(member.room_id, [])
  87. @defer.inlineCallbacks
  88. def started_typing(self, target_user, auth_user, room_id, timeout):
  89. target_user_id = target_user.to_string()
  90. auth_user_id = auth_user.to_string()
  91. if not self.is_mine_id(target_user_id):
  92. raise SynapseError(400, "User is not hosted on this Home Server")
  93. if target_user_id != auth_user_id:
  94. raise AuthError(400, "Cannot set another user's typing state")
  95. yield self.auth.check_joined_room(room_id, target_user_id)
  96. logger.debug("%s has started typing in %s", target_user_id, room_id)
  97. member = RoomMember(room_id=room_id, user_id=target_user_id)
  98. was_present = member.user_id in self._room_typing.get(room_id, set())
  99. now = self.clock.time_msec()
  100. self._member_typing_until[member] = now + timeout
  101. self.wheel_timer.insert(now=now, obj=member, then=now + timeout)
  102. if was_present:
  103. # No point sending another notification
  104. return None
  105. self._push_update(member=member, typing=True)
  106. @defer.inlineCallbacks
  107. def stopped_typing(self, target_user, auth_user, room_id):
  108. target_user_id = target_user.to_string()
  109. auth_user_id = auth_user.to_string()
  110. if not self.is_mine_id(target_user_id):
  111. raise SynapseError(400, "User is not hosted on this Home Server")
  112. if target_user_id != auth_user_id:
  113. raise AuthError(400, "Cannot set another user's typing state")
  114. yield self.auth.check_joined_room(room_id, target_user_id)
  115. logger.debug("%s has stopped typing in %s", target_user_id, room_id)
  116. member = RoomMember(room_id=room_id, user_id=target_user_id)
  117. self._stopped_typing(member)
  118. @defer.inlineCallbacks
  119. def user_left_room(self, user, room_id):
  120. user_id = user.to_string()
  121. if self.is_mine_id(user_id):
  122. member = RoomMember(room_id=room_id, user_id=user_id)
  123. yield self._stopped_typing(member)
  124. def _stopped_typing(self, member):
  125. if member.user_id not in self._room_typing.get(member.room_id, set()):
  126. # No point
  127. return None
  128. self._member_typing_until.pop(member, None)
  129. self._member_last_federation_poke.pop(member, None)
  130. self._push_update(member=member, typing=False)
  131. def _push_update(self, member, typing):
  132. if self.hs.is_mine_id(member.user_id):
  133. # Only send updates for changes to our own users.
  134. run_in_background(self._push_remote, member, typing)
  135. self._push_update_local(member=member, typing=typing)
  136. @defer.inlineCallbacks
  137. def _push_remote(self, member, typing):
  138. try:
  139. users = yield self.state.get_current_users_in_room(member.room_id)
  140. self._member_last_federation_poke[member] = self.clock.time_msec()
  141. now = self.clock.time_msec()
  142. self.wheel_timer.insert(
  143. now=now, obj=member, then=now + FEDERATION_PING_INTERVAL
  144. )
  145. for domain in set(get_domain_from_id(u) for u in users):
  146. if domain != self.server_name:
  147. logger.debug("sending typing update to %s", domain)
  148. self.federation.build_and_send_edu(
  149. destination=domain,
  150. edu_type="m.typing",
  151. content={
  152. "room_id": member.room_id,
  153. "user_id": member.user_id,
  154. "typing": typing,
  155. },
  156. key=member,
  157. )
  158. except Exception:
  159. logger.exception("Error pushing typing notif to remotes")
  160. @defer.inlineCallbacks
  161. def _recv_edu(self, origin, content):
  162. room_id = content["room_id"]
  163. user_id = content["user_id"]
  164. member = RoomMember(user_id=user_id, room_id=room_id)
  165. # Check that the string is a valid user id
  166. user = UserID.from_string(user_id)
  167. if user.domain != origin:
  168. logger.info(
  169. "Got typing update from %r with bad 'user_id': %r", origin, user_id
  170. )
  171. return
  172. users = yield self.state.get_current_users_in_room(room_id)
  173. domains = set(get_domain_from_id(u) for u in users)
  174. if self.server_name in domains:
  175. logger.info("Got typing update from %s: %r", user_id, content)
  176. now = self.clock.time_msec()
  177. self._member_typing_until[member] = now + FEDERATION_TIMEOUT
  178. self.wheel_timer.insert(now=now, obj=member, then=now + FEDERATION_TIMEOUT)
  179. self._push_update_local(member=member, typing=content["typing"])
  180. def _push_update_local(self, member, typing):
  181. room_set = self._room_typing.setdefault(member.room_id, set())
  182. if typing:
  183. room_set.add(member.user_id)
  184. else:
  185. room_set.discard(member.user_id)
  186. self._latest_room_serial += 1
  187. self._room_serials[member.room_id] = self._latest_room_serial
  188. self._typing_stream_change_cache.entity_has_changed(
  189. member.room_id, self._latest_room_serial
  190. )
  191. self.notifier.on_new_event(
  192. "typing_key", self._latest_room_serial, rooms=[member.room_id]
  193. )
  194. def get_all_typing_updates(self, last_id, current_id):
  195. if last_id == current_id:
  196. return []
  197. changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
  198. last_id
  199. )
  200. if changed_rooms is None:
  201. changed_rooms = self._room_serials
  202. rows = []
  203. for room_id in changed_rooms:
  204. serial = self._room_serials[room_id]
  205. if last_id < serial <= current_id:
  206. typing = self._room_typing[room_id]
  207. rows.append((serial, room_id, list(typing)))
  208. rows.sort()
  209. return rows
  210. def get_current_token(self):
  211. return self._latest_room_serial
  212. class TypingNotificationEventSource(object):
  213. def __init__(self, hs):
  214. self.hs = hs
  215. self.clock = hs.get_clock()
  216. # We can't call get_typing_handler here because there's a cycle:
  217. #
  218. # Typing -> Notifier -> TypingNotificationEventSource -> Typing
  219. #
  220. self.get_typing_handler = hs.get_typing_handler
  221. def _make_event_for(self, room_id):
  222. typing = self.get_typing_handler()._room_typing[room_id]
  223. return {
  224. "type": "m.typing",
  225. "room_id": room_id,
  226. "content": {"user_ids": list(typing)},
  227. }
  228. def get_new_events(self, from_key, room_ids, **kwargs):
  229. with Measure(self.clock, "typing.get_new_events"):
  230. from_key = int(from_key)
  231. handler = self.get_typing_handler()
  232. events = []
  233. for room_id in room_ids:
  234. if room_id not in handler._room_serials:
  235. continue
  236. if handler._room_serials[room_id] <= from_key:
  237. continue
  238. events.append(self._make_event_for(room_id))
  239. return events, handler._latest_room_serial
  240. def get_current_key(self):
  241. return self.get_typing_handler()._latest_room_serial
  242. def get_pagination_rows(self, user, pagination_config, key):
  243. return ([], pagination_config.from_key)