sync.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2015, 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import itertools
  16. import logging
  17. from canonicaljson import json
  18. from synapse.api.constants import PresenceState
  19. from synapse.api.errors import Codes, StoreError, SynapseError
  20. from synapse.api.filtering import DEFAULT_FILTER_COLLECTION, FilterCollection
  21. from synapse.events.utils import (
  22. format_event_for_client_v2_without_room_id,
  23. format_event_raw,
  24. )
  25. from synapse.handlers.presence import format_user_presence_state
  26. from synapse.handlers.sync import SyncConfig
  27. from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
  28. from synapse.types import StreamToken
  29. from ._base import client_patterns, set_timeline_upper_limit
  30. logger = logging.getLogger(__name__)
  31. class SyncRestServlet(RestServlet):
  32. """
  33. GET parameters::
  34. timeout(int): How long to wait for new events in milliseconds.
  35. since(batch_token): Batch token when asking for incremental deltas.
  36. set_presence(str): What state the device presence should be set to.
  37. default is "online".
  38. filter(filter_id): A filter to apply to the events returned.
  39. Response JSON::
  40. {
  41. "next_batch": // batch token for the next /sync
  42. "presence": // presence data for the user.
  43. "rooms": {
  44. "join": { // Joined rooms being updated.
  45. "${room_id}": { // Id of the room being updated
  46. "event_map": // Map of EventID -> event JSON.
  47. "timeline": { // The recent events in the room if gap is "true"
  48. "limited": // Was the per-room event limit exceeded?
  49. // otherwise the next events in the room.
  50. "events": [] // list of EventIDs in the "event_map".
  51. "prev_batch": // back token for getting previous events.
  52. }
  53. "state": {"events": []} // list of EventIDs updating the
  54. // current state to be what it should
  55. // be at the end of the batch.
  56. "ephemeral": {"events": []} // list of event objects
  57. }
  58. },
  59. "invite": {}, // Invited rooms being updated.
  60. "leave": {} // Archived rooms being updated.
  61. }
  62. }
  63. """
  64. PATTERNS = client_patterns("/sync$")
  65. ALLOWED_PRESENCE = {"online", "offline", "unavailable"}
  66. def __init__(self, hs):
  67. super(SyncRestServlet, self).__init__()
  68. self.hs = hs
  69. self.auth = hs.get_auth()
  70. self.sync_handler = hs.get_sync_handler()
  71. self.clock = hs.get_clock()
  72. self.filtering = hs.get_filtering()
  73. self.presence_handler = hs.get_presence_handler()
  74. self._server_notices_sender = hs.get_server_notices_sender()
  75. self._event_serializer = hs.get_event_client_serializer()
  76. async def on_GET(self, request):
  77. if b"from" in request.args:
  78. # /events used to use 'from', but /sync uses 'since'.
  79. # Lets be helpful and whine if we see a 'from'.
  80. raise SynapseError(
  81. 400, "'from' is not a valid query parameter. Did you mean 'since'?"
  82. )
  83. requester = await self.auth.get_user_by_req(request, allow_guest=True)
  84. user = requester.user
  85. device_id = requester.device_id
  86. timeout = parse_integer(request, "timeout", default=0)
  87. since = parse_string(request, "since")
  88. set_presence = parse_string(
  89. request,
  90. "set_presence",
  91. default="online",
  92. allowed_values=self.ALLOWED_PRESENCE,
  93. )
  94. filter_id = parse_string(request, "filter", default=None)
  95. full_state = parse_boolean(request, "full_state", default=False)
  96. logger.debug(
  97. "/sync: user=%r, timeout=%r, since=%r, "
  98. "set_presence=%r, filter_id=%r, device_id=%r",
  99. user,
  100. timeout,
  101. since,
  102. set_presence,
  103. filter_id,
  104. device_id,
  105. )
  106. request_key = (user, timeout, since, filter_id, full_state, device_id)
  107. if filter_id is None:
  108. filter_collection = DEFAULT_FILTER_COLLECTION
  109. elif filter_id.startswith("{"):
  110. try:
  111. filter_object = json.loads(filter_id)
  112. set_timeline_upper_limit(
  113. filter_object, self.hs.config.filter_timeline_limit
  114. )
  115. except Exception:
  116. raise SynapseError(400, "Invalid filter JSON")
  117. self.filtering.check_valid_filter(filter_object)
  118. filter_collection = FilterCollection(filter_object)
  119. else:
  120. try:
  121. filter_collection = await self.filtering.get_user_filter(
  122. user.localpart, filter_id
  123. )
  124. except StoreError as err:
  125. if err.code != 404:
  126. raise
  127. # fix up the description and errcode to be more useful
  128. raise SynapseError(400, "No such filter", errcode=Codes.INVALID_PARAM)
  129. sync_config = SyncConfig(
  130. user=user,
  131. filter_collection=filter_collection,
  132. is_guest=requester.is_guest,
  133. request_key=request_key,
  134. device_id=device_id,
  135. )
  136. if since is not None:
  137. since_token = StreamToken.from_string(since)
  138. else:
  139. since_token = None
  140. # send any outstanding server notices to the user.
  141. await self._server_notices_sender.on_user_syncing(user.to_string())
  142. affect_presence = set_presence != PresenceState.OFFLINE
  143. if affect_presence:
  144. await self.presence_handler.set_state(
  145. user, {"presence": set_presence}, True
  146. )
  147. context = await self.presence_handler.user_syncing(
  148. user.to_string(), affect_presence=affect_presence
  149. )
  150. with context:
  151. sync_result = await self.sync_handler.wait_for_sync_for_user(
  152. sync_config,
  153. since_token=since_token,
  154. timeout=timeout,
  155. full_state=full_state,
  156. )
  157. time_now = self.clock.time_msec()
  158. response_content = await self.encode_response(
  159. time_now, sync_result, requester.access_token_id, filter_collection
  160. )
  161. return 200, response_content
  162. async def encode_response(self, time_now, sync_result, access_token_id, filter):
  163. if filter.event_format == "client":
  164. event_formatter = format_event_for_client_v2_without_room_id
  165. elif filter.event_format == "federation":
  166. event_formatter = format_event_raw
  167. else:
  168. raise Exception("Unknown event format %s" % (filter.event_format,))
  169. joined = await self.encode_joined(
  170. sync_result.joined,
  171. time_now,
  172. access_token_id,
  173. filter.event_fields,
  174. event_formatter,
  175. )
  176. invited = await self.encode_invited(
  177. sync_result.invited, time_now, access_token_id, event_formatter
  178. )
  179. archived = await self.encode_archived(
  180. sync_result.archived,
  181. time_now,
  182. access_token_id,
  183. filter.event_fields,
  184. event_formatter,
  185. )
  186. return {
  187. "account_data": {"events": sync_result.account_data},
  188. "to_device": {"events": sync_result.to_device},
  189. "device_lists": {
  190. "changed": list(sync_result.device_lists.changed),
  191. "left": list(sync_result.device_lists.left),
  192. },
  193. "presence": SyncRestServlet.encode_presence(sync_result.presence, time_now),
  194. "rooms": {"join": joined, "invite": invited, "leave": archived},
  195. "groups": {
  196. "join": sync_result.groups.join,
  197. "invite": sync_result.groups.invite,
  198. "leave": sync_result.groups.leave,
  199. },
  200. "device_one_time_keys_count": sync_result.device_one_time_keys_count,
  201. "next_batch": sync_result.next_batch.to_string(),
  202. }
  203. @staticmethod
  204. def encode_presence(events, time_now):
  205. return {
  206. "events": [
  207. {
  208. "type": "m.presence",
  209. "sender": event.user_id,
  210. "content": format_user_presence_state(
  211. event, time_now, include_user_id=False
  212. ),
  213. }
  214. for event in events
  215. ]
  216. }
  217. async def encode_joined(
  218. self, rooms, time_now, token_id, event_fields, event_formatter
  219. ):
  220. """
  221. Encode the joined rooms in a sync result
  222. Args:
  223. rooms(list[synapse.handlers.sync.JoinedSyncResult]): list of sync
  224. results for rooms this user is joined to
  225. time_now(int): current time - used as a baseline for age
  226. calculations
  227. token_id(int): ID of the user's auth token - used for namespacing
  228. of transaction IDs
  229. event_fields(list<str>): List of event fields to include. If empty,
  230. all fields will be returned.
  231. event_formatter (func[dict]): function to convert from federation format
  232. to client format
  233. Returns:
  234. dict[str, dict[str, object]]: the joined rooms list, in our
  235. response format
  236. """
  237. joined = {}
  238. for room in rooms:
  239. joined[room.room_id] = await self.encode_room(
  240. room,
  241. time_now,
  242. token_id,
  243. joined=True,
  244. only_fields=event_fields,
  245. event_formatter=event_formatter,
  246. )
  247. return joined
  248. async def encode_invited(self, rooms, time_now, token_id, event_formatter):
  249. """
  250. Encode the invited rooms in a sync result
  251. Args:
  252. rooms(list[synapse.handlers.sync.InvitedSyncResult]): list of
  253. sync results for rooms this user is joined to
  254. time_now(int): current time - used as a baseline for age
  255. calculations
  256. token_id(int): ID of the user's auth token - used for namespacing
  257. of transaction IDs
  258. event_formatter (func[dict]): function to convert from federation format
  259. to client format
  260. Returns:
  261. dict[str, dict[str, object]]: the invited rooms list, in our
  262. response format
  263. """
  264. invited = {}
  265. for room in rooms:
  266. invite = await self._event_serializer.serialize_event(
  267. room.invite,
  268. time_now,
  269. token_id=token_id,
  270. event_format=event_formatter,
  271. is_invite=True,
  272. )
  273. unsigned = dict(invite.get("unsigned", {}))
  274. invite["unsigned"] = unsigned
  275. invited_state = list(unsigned.pop("invite_room_state", []))
  276. invited_state.append(invite)
  277. invited[room.room_id] = {"invite_state": {"events": invited_state}}
  278. return invited
  279. async def encode_archived(
  280. self, rooms, time_now, token_id, event_fields, event_formatter
  281. ):
  282. """
  283. Encode the archived rooms in a sync result
  284. Args:
  285. rooms (list[synapse.handlers.sync.ArchivedSyncResult]): list of
  286. sync results for rooms this user is joined to
  287. time_now(int): current time - used as a baseline for age
  288. calculations
  289. token_id(int): ID of the user's auth token - used for namespacing
  290. of transaction IDs
  291. event_fields(list<str>): List of event fields to include. If empty,
  292. all fields will be returned.
  293. event_formatter (func[dict]): function to convert from federation format
  294. to client format
  295. Returns:
  296. dict[str, dict[str, object]]: The invited rooms list, in our
  297. response format
  298. """
  299. joined = {}
  300. for room in rooms:
  301. joined[room.room_id] = await self.encode_room(
  302. room,
  303. time_now,
  304. token_id,
  305. joined=False,
  306. only_fields=event_fields,
  307. event_formatter=event_formatter,
  308. )
  309. return joined
  310. async def encode_room(
  311. self, room, time_now, token_id, joined, only_fields, event_formatter
  312. ):
  313. """
  314. Args:
  315. room (JoinedSyncResult|ArchivedSyncResult): sync result for a
  316. single room
  317. time_now (int): current time - used as a baseline for age
  318. calculations
  319. token_id (int): ID of the user's auth token - used for namespacing
  320. of transaction IDs
  321. joined (bool): True if the user is joined to this room - will mean
  322. we handle ephemeral events
  323. only_fields(list<str>): Optional. The list of event fields to include.
  324. event_formatter (func[dict]): function to convert from federation format
  325. to client format
  326. Returns:
  327. dict[str, object]: the room, encoded in our response format
  328. """
  329. def serialize(events):
  330. return self._event_serializer.serialize_events(
  331. events,
  332. time_now=time_now,
  333. # We don't bundle "live" events, as otherwise clients
  334. # will end up double counting annotations.
  335. bundle_aggregations=False,
  336. token_id=token_id,
  337. event_format=event_formatter,
  338. only_event_fields=only_fields,
  339. )
  340. state_dict = room.state
  341. timeline_events = room.timeline.events
  342. state_events = state_dict.values()
  343. for event in itertools.chain(state_events, timeline_events):
  344. # We've had bug reports that events were coming down under the
  345. # wrong room.
  346. if event.room_id != room.room_id:
  347. logger.warning(
  348. "Event %r is under room %r instead of %r",
  349. event.event_id,
  350. room.room_id,
  351. event.room_id,
  352. )
  353. serialized_state = await serialize(state_events)
  354. serialized_timeline = await serialize(timeline_events)
  355. account_data = room.account_data
  356. result = {
  357. "timeline": {
  358. "events": serialized_timeline,
  359. "prev_batch": room.timeline.prev_batch.to_string(),
  360. "limited": room.timeline.limited,
  361. },
  362. "state": {"events": serialized_state},
  363. "account_data": {"events": account_data},
  364. }
  365. if joined:
  366. ephemeral_events = room.ephemeral
  367. result["ephemeral"] = {"events": ephemeral_events}
  368. result["unread_notifications"] = room.unread_notifications
  369. result["summary"] = room.summary
  370. return result
  371. def register_servlets(hs, http_server):
  372. SyncRestServlet(hs).register(http_server)