sync.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from twisted.internet import defer
  16. from synapse.http.servlet import (
  17. RestServlet, parse_string, parse_integer, parse_boolean
  18. )
  19. from synapse.handlers.presence import format_user_presence_state
  20. from synapse.handlers.sync import SyncConfig
  21. from synapse.types import StreamToken
  22. from synapse.events.utils import (
  23. serialize_event, format_event_for_client_v2_without_room_id,
  24. )
  25. from synapse.api.filtering import FilterCollection, DEFAULT_FILTER_COLLECTION
  26. from synapse.api.errors import SynapseError
  27. from synapse.api.constants import PresenceState
  28. from ._base import client_v2_patterns
  29. from ._base import set_timeline_upper_limit
  30. import itertools
  31. import logging
  32. import ujson as json
  33. logger = logging.getLogger(__name__)
  34. class SyncRestServlet(RestServlet):
  35. """
  36. GET parameters::
  37. timeout(int): How long to wait for new events in milliseconds.
  38. since(batch_token): Batch token when asking for incremental deltas.
  39. set_presence(str): What state the device presence should be set to.
  40. default is "online".
  41. filter(filter_id): A filter to apply to the events returned.
  42. Response JSON::
  43. {
  44. "next_batch": // batch token for the next /sync
  45. "presence": // presence data for the user.
  46. "rooms": {
  47. "join": { // Joined rooms being updated.
  48. "${room_id}": { // Id of the room being updated
  49. "event_map": // Map of EventID -> event JSON.
  50. "timeline": { // The recent events in the room if gap is "true"
  51. "limited": // Was the per-room event limit exceeded?
  52. // otherwise the next events in the room.
  53. "events": [] // list of EventIDs in the "event_map".
  54. "prev_batch": // back token for getting previous events.
  55. }
  56. "state": {"events": []} // list of EventIDs updating the
  57. // current state to be what it should
  58. // be at the end of the batch.
  59. "ephemeral": {"events": []} // list of event objects
  60. }
  61. },
  62. "invite": {}, // Invited rooms being updated.
  63. "leave": {} // Archived rooms being updated.
  64. }
  65. }
  66. """
  67. PATTERNS = client_v2_patterns("/sync$")
  68. ALLOWED_PRESENCE = set(["online", "offline"])
  69. def __init__(self, hs):
  70. super(SyncRestServlet, self).__init__()
  71. self.hs = hs
  72. self.auth = hs.get_auth()
  73. self.sync_handler = hs.get_sync_handler()
  74. self.clock = hs.get_clock()
  75. self.filtering = hs.get_filtering()
  76. self.presence_handler = hs.get_presence_handler()
  77. @defer.inlineCallbacks
  78. def on_GET(self, request):
  79. if "from" in request.args:
  80. # /events used to use 'from', but /sync uses 'since'.
  81. # Lets be helpful and whine if we see a 'from'.
  82. raise SynapseError(
  83. 400, "'from' is not a valid query parameter. Did you mean 'since'?"
  84. )
  85. requester = yield self.auth.get_user_by_req(
  86. request, allow_guest=True
  87. )
  88. user = requester.user
  89. device_id = requester.device_id
  90. timeout = parse_integer(request, "timeout", default=0)
  91. since = parse_string(request, "since")
  92. set_presence = parse_string(
  93. request, "set_presence", default="online",
  94. allowed_values=self.ALLOWED_PRESENCE
  95. )
  96. filter_id = parse_string(request, "filter", default=None)
  97. full_state = parse_boolean(request, "full_state", default=False)
  98. logger.debug(
  99. "/sync: user=%r, timeout=%r, since=%r,"
  100. " set_presence=%r, filter_id=%r, device_id=%r" % (
  101. user, timeout, since, set_presence, filter_id, device_id
  102. )
  103. )
  104. request_key = (user, timeout, since, filter_id, full_state, device_id)
  105. if filter_id:
  106. if filter_id.startswith('{'):
  107. try:
  108. filter_object = json.loads(filter_id)
  109. set_timeline_upper_limit(filter_object,
  110. self.hs.config.filter_timeline_limit)
  111. except:
  112. raise SynapseError(400, "Invalid filter JSON")
  113. self.filtering.check_valid_filter(filter_object)
  114. filter = FilterCollection(filter_object)
  115. else:
  116. filter = yield self.filtering.get_user_filter(
  117. user.localpart, filter_id
  118. )
  119. else:
  120. filter = DEFAULT_FILTER_COLLECTION
  121. sync_config = SyncConfig(
  122. user=user,
  123. filter_collection=filter,
  124. is_guest=requester.is_guest,
  125. request_key=request_key,
  126. device_id=device_id,
  127. )
  128. if since is not None:
  129. since_token = StreamToken.from_string(since)
  130. else:
  131. since_token = None
  132. affect_presence = set_presence != PresenceState.OFFLINE
  133. if affect_presence:
  134. yield self.presence_handler.set_state(user, {"presence": set_presence}, True)
  135. context = yield self.presence_handler.user_syncing(
  136. user.to_string(), affect_presence=affect_presence,
  137. )
  138. with context:
  139. sync_result = yield self.sync_handler.wait_for_sync_for_user(
  140. sync_config, since_token=since_token, timeout=timeout,
  141. full_state=full_state
  142. )
  143. time_now = self.clock.time_msec()
  144. response_content = self.encode_response(
  145. time_now, sync_result, requester.access_token_id, filter
  146. )
  147. defer.returnValue((200, response_content))
  148. @staticmethod
  149. def encode_response(time_now, sync_result, access_token_id, filter):
  150. joined = SyncRestServlet.encode_joined(
  151. sync_result.joined, time_now, access_token_id, filter.event_fields
  152. )
  153. invited = SyncRestServlet.encode_invited(
  154. sync_result.invited, time_now, access_token_id,
  155. )
  156. archived = SyncRestServlet.encode_archived(
  157. sync_result.archived, time_now, access_token_id,
  158. filter.event_fields,
  159. )
  160. return {
  161. "account_data": {"events": sync_result.account_data},
  162. "to_device": {"events": sync_result.to_device},
  163. "device_lists": {
  164. "changed": list(sync_result.device_lists.changed),
  165. "left": list(sync_result.device_lists.left),
  166. },
  167. "presence": SyncRestServlet.encode_presence(
  168. sync_result.presence, time_now
  169. ),
  170. "rooms": {
  171. "join": joined,
  172. "invite": invited,
  173. "leave": archived,
  174. },
  175. "groups": {
  176. "join": sync_result.groups.join,
  177. "invite": sync_result.groups.invite,
  178. "leave": sync_result.groups.leave,
  179. },
  180. "device_one_time_keys_count": sync_result.device_one_time_keys_count,
  181. "next_batch": sync_result.next_batch.to_string(),
  182. }
  183. @staticmethod
  184. def encode_presence(events, time_now):
  185. return {
  186. "events": [
  187. {
  188. "type": "m.presence",
  189. "sender": event.user_id,
  190. "content": format_user_presence_state(
  191. event, time_now, include_user_id=False
  192. ),
  193. }
  194. for event in events
  195. ]
  196. }
  197. @staticmethod
  198. def encode_joined(rooms, time_now, token_id, event_fields):
  199. """
  200. Encode the joined rooms in a sync result
  201. Args:
  202. rooms(list[synapse.handlers.sync.JoinedSyncResult]): list of sync
  203. results for rooms this user is joined to
  204. time_now(int): current time - used as a baseline for age
  205. calculations
  206. token_id(int): ID of the user's auth token - used for namespacing
  207. of transaction IDs
  208. event_fields(list<str>): List of event fields to include. If empty,
  209. all fields will be returned.
  210. Returns:
  211. dict[str, dict[str, object]]: the joined rooms list, in our
  212. response format
  213. """
  214. joined = {}
  215. for room in rooms:
  216. joined[room.room_id] = SyncRestServlet.encode_room(
  217. room, time_now, token_id, only_fields=event_fields
  218. )
  219. return joined
  220. @staticmethod
  221. def encode_invited(rooms, time_now, token_id):
  222. """
  223. Encode the invited rooms in a sync result
  224. Args:
  225. rooms(list[synapse.handlers.sync.InvitedSyncResult]): list of
  226. sync results for rooms this user is joined to
  227. time_now(int): current time - used as a baseline for age
  228. calculations
  229. token_id(int): ID of the user's auth token - used for namespacing
  230. of transaction IDs
  231. Returns:
  232. dict[str, dict[str, object]]: the invited rooms list, in our
  233. response format
  234. """
  235. invited = {}
  236. for room in rooms:
  237. invite = serialize_event(
  238. room.invite, time_now, token_id=token_id,
  239. event_format=format_event_for_client_v2_without_room_id,
  240. is_invite=True,
  241. )
  242. unsigned = dict(invite.get("unsigned", {}))
  243. invite["unsigned"] = unsigned
  244. invited_state = list(unsigned.pop("invite_room_state", []))
  245. invited_state.append(invite)
  246. invited[room.room_id] = {
  247. "invite_state": {"events": invited_state}
  248. }
  249. return invited
  250. @staticmethod
  251. def encode_archived(rooms, time_now, token_id, event_fields):
  252. """
  253. Encode the archived rooms in a sync result
  254. Args:
  255. rooms (list[synapse.handlers.sync.ArchivedSyncResult]): list of
  256. sync results for rooms this user is joined to
  257. time_now(int): current time - used as a baseline for age
  258. calculations
  259. token_id(int): ID of the user's auth token - used for namespacing
  260. of transaction IDs
  261. event_fields(list<str>): List of event fields to include. If empty,
  262. all fields will be returned.
  263. Returns:
  264. dict[str, dict[str, object]]: The invited rooms list, in our
  265. response format
  266. """
  267. joined = {}
  268. for room in rooms:
  269. joined[room.room_id] = SyncRestServlet.encode_room(
  270. room, time_now, token_id, joined=False, only_fields=event_fields
  271. )
  272. return joined
  273. @staticmethod
  274. def encode_room(room, time_now, token_id, joined=True, only_fields=None):
  275. """
  276. Args:
  277. room (JoinedSyncResult|ArchivedSyncResult): sync result for a
  278. single room
  279. time_now (int): current time - used as a baseline for age
  280. calculations
  281. token_id (int): ID of the user's auth token - used for namespacing
  282. of transaction IDs
  283. joined (bool): True if the user is joined to this room - will mean
  284. we handle ephemeral events
  285. only_fields(list<str>): Optional. The list of event fields to include.
  286. Returns:
  287. dict[str, object]: the room, encoded in our response format
  288. """
  289. def serialize(event):
  290. # TODO(mjark): Respect formatting requirements in the filter.
  291. return serialize_event(
  292. event, time_now, token_id=token_id,
  293. event_format=format_event_for_client_v2_without_room_id,
  294. only_event_fields=only_fields,
  295. )
  296. state_dict = room.state
  297. timeline_events = room.timeline.events
  298. state_events = state_dict.values()
  299. for event in itertools.chain(state_events, timeline_events):
  300. # We've had bug reports that events were coming down under the
  301. # wrong room.
  302. if event.room_id != room.room_id:
  303. logger.warn(
  304. "Event %r is under room %r instead of %r",
  305. event.event_id, room.room_id, event.room_id,
  306. )
  307. serialized_state = [serialize(e) for e in state_events]
  308. serialized_timeline = [serialize(e) for e in timeline_events]
  309. account_data = room.account_data
  310. result = {
  311. "timeline": {
  312. "events": serialized_timeline,
  313. "prev_batch": room.timeline.prev_batch.to_string(),
  314. "limited": room.timeline.limited,
  315. },
  316. "state": {"events": serialized_state},
  317. "account_data": {"events": account_data},
  318. }
  319. if joined:
  320. ephemeral_events = room.ephemeral
  321. result["ephemeral"] = {"events": ephemeral_events}
  322. result["unread_notifications"] = room.unread_notifications
  323. return result
  324. def register_servlets(hs, http_server):
  325. SyncRestServlet(hs).register(http_server)