sync.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520
  1. # Copyright 2015, 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import itertools
  15. import logging
  16. from collections import defaultdict
  17. from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
  18. from synapse.api.constants import Membership, PresenceState
  19. from synapse.api.errors import Codes, StoreError, SynapseError
  20. from synapse.api.filtering import FilterCollection
  21. from synapse.api.presence import UserPresenceState
  22. from synapse.events.utils import (
  23. SerializeEventConfig,
  24. format_event_for_client_v2_without_room_id,
  25. format_event_raw,
  26. )
  27. from synapse.handlers.presence import format_user_presence_state
  28. from synapse.handlers.sync import (
  29. ArchivedSyncResult,
  30. InvitedSyncResult,
  31. JoinedSyncResult,
  32. KnockedSyncResult,
  33. SyncConfig,
  34. SyncResult,
  35. )
  36. from synapse.http.server import HttpServer
  37. from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
  38. from synapse.http.site import SynapseRequest
  39. from synapse.logging.opentracing import trace
  40. from synapse.types import JsonDict, StreamToken
  41. from synapse.util import json_decoder
  42. from ._base import client_patterns, set_timeline_upper_limit
  43. if TYPE_CHECKING:
  44. from synapse.server import HomeServer
  45. logger = logging.getLogger(__name__)
  46. class SyncRestServlet(RestServlet):
  47. """
  48. GET parameters::
  49. timeout(int): How long to wait for new events in milliseconds.
  50. since(batch_token): Batch token when asking for incremental deltas.
  51. set_presence(str): What state the device presence should be set to.
  52. default is "online".
  53. filter(filter_id): A filter to apply to the events returned.
  54. Response JSON::
  55. {
  56. "next_batch": // batch token for the next /sync
  57. "presence": // presence data for the user.
  58. "rooms": {
  59. "join": { // Joined rooms being updated.
  60. "${room_id}": { // Id of the room being updated
  61. "event_map": // Map of EventID -> event JSON.
  62. "timeline": { // The recent events in the room if gap is "true"
  63. "limited": // Was the per-room event limit exceeded?
  64. // otherwise the next events in the room.
  65. "events": [] // list of EventIDs in the "event_map".
  66. "prev_batch": // back token for getting previous events.
  67. }
  68. "state": {"events": []} // list of EventIDs updating the
  69. // current state to be what it should
  70. // be at the end of the batch.
  71. "ephemeral": {"events": []} // list of event objects
  72. }
  73. },
  74. "invite": {}, // Invited rooms being updated.
  75. "leave": {} // Archived rooms being updated.
  76. }
  77. }
  78. """
  79. PATTERNS = client_patterns("/sync$")
  80. ALLOWED_PRESENCE = {"online", "offline", "unavailable"}
  81. def __init__(self, hs: "HomeServer"):
  82. super().__init__()
  83. self.hs = hs
  84. self.auth = hs.get_auth()
  85. self.store = hs.get_datastores().main
  86. self.sync_handler = hs.get_sync_handler()
  87. self.clock = hs.get_clock()
  88. self.filtering = hs.get_filtering()
  89. self.presence_handler = hs.get_presence_handler()
  90. self._server_notices_sender = hs.get_server_notices_sender()
  91. self._event_serializer = hs.get_event_client_serializer()
  92. self._msc2654_enabled = hs.config.experimental.msc2654_enabled
  93. async def on_GET(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
  94. # This will always be set by the time Twisted calls us.
  95. assert request.args is not None
  96. if b"from" in request.args:
  97. # /events used to use 'from', but /sync uses 'since'.
  98. # Lets be helpful and whine if we see a 'from'.
  99. raise SynapseError(
  100. 400, "'from' is not a valid query parameter. Did you mean 'since'?"
  101. )
  102. requester = await self.auth.get_user_by_req(request, allow_guest=True)
  103. user = requester.user
  104. device_id = requester.device_id
  105. timeout = parse_integer(request, "timeout", default=0)
  106. since = parse_string(request, "since")
  107. set_presence = parse_string(
  108. request,
  109. "set_presence",
  110. default="online",
  111. allowed_values=self.ALLOWED_PRESENCE,
  112. )
  113. filter_id = parse_string(request, "filter")
  114. full_state = parse_boolean(request, "full_state", default=False)
  115. logger.debug(
  116. "/sync: user=%r, timeout=%r, since=%r, "
  117. "set_presence=%r, filter_id=%r, device_id=%r",
  118. user,
  119. timeout,
  120. since,
  121. set_presence,
  122. filter_id,
  123. device_id,
  124. )
  125. request_key = (user, timeout, since, filter_id, full_state, device_id)
  126. if filter_id is None:
  127. filter_collection = self.filtering.DEFAULT_FILTER_COLLECTION
  128. elif filter_id.startswith("{"):
  129. try:
  130. filter_object = json_decoder.decode(filter_id)
  131. set_timeline_upper_limit(
  132. filter_object, self.hs.config.server.filter_timeline_limit
  133. )
  134. except Exception:
  135. raise SynapseError(400, "Invalid filter JSON")
  136. self.filtering.check_valid_filter(filter_object)
  137. filter_collection = FilterCollection(self.hs, filter_object)
  138. else:
  139. try:
  140. filter_collection = await self.filtering.get_user_filter(
  141. user.localpart, filter_id
  142. )
  143. except StoreError as err:
  144. if err.code != 404:
  145. raise
  146. # fix up the description and errcode to be more useful
  147. raise SynapseError(400, "No such filter", errcode=Codes.INVALID_PARAM)
  148. sync_config = SyncConfig(
  149. user=user,
  150. filter_collection=filter_collection,
  151. is_guest=requester.is_guest,
  152. request_key=request_key,
  153. device_id=device_id,
  154. )
  155. since_token = None
  156. if since is not None:
  157. since_token = await StreamToken.from_string(self.store, since)
  158. # send any outstanding server notices to the user.
  159. await self._server_notices_sender.on_user_syncing(user.to_string())
  160. affect_presence = set_presence != PresenceState.OFFLINE
  161. context = await self.presence_handler.user_syncing(
  162. user.to_string(),
  163. affect_presence=affect_presence,
  164. presence_state=set_presence,
  165. )
  166. with context:
  167. sync_result = await self.sync_handler.wait_for_sync_for_user(
  168. requester,
  169. sync_config,
  170. since_token=since_token,
  171. timeout=timeout,
  172. full_state=full_state,
  173. )
  174. # the client may have disconnected by now; don't bother to serialize the
  175. # response if so.
  176. if request._disconnected:
  177. logger.info("Client has disconnected; not serializing response.")
  178. return 200, {}
  179. time_now = self.clock.time_msec()
  180. # We know that the the requester has an access token since appservices
  181. # cannot use sync.
  182. response_content = await self.encode_response(
  183. time_now, sync_result, requester.access_token_id, filter_collection
  184. )
  185. logger.debug("Event formatting complete")
  186. return 200, response_content
  187. @trace(opname="sync.encode_response")
  188. async def encode_response(
  189. self,
  190. time_now: int,
  191. sync_result: SyncResult,
  192. access_token_id: Optional[int],
  193. filter: FilterCollection,
  194. ) -> JsonDict:
  195. logger.debug("Formatting events in sync response")
  196. if filter.event_format == "client":
  197. event_formatter = format_event_for_client_v2_without_room_id
  198. elif filter.event_format == "federation":
  199. event_formatter = format_event_raw
  200. else:
  201. raise Exception("Unknown event format %s" % (filter.event_format,))
  202. serialize_options = SerializeEventConfig(
  203. event_format=event_formatter,
  204. token_id=access_token_id,
  205. only_event_fields=filter.event_fields,
  206. )
  207. stripped_serialize_options = SerializeEventConfig(
  208. event_format=event_formatter,
  209. token_id=access_token_id,
  210. include_stripped_room_state=True,
  211. )
  212. joined = await self.encode_joined(
  213. sync_result.joined, time_now, serialize_options
  214. )
  215. invited = await self.encode_invited(
  216. sync_result.invited, time_now, stripped_serialize_options
  217. )
  218. knocked = await self.encode_knocked(
  219. sync_result.knocked, time_now, stripped_serialize_options
  220. )
  221. archived = await self.encode_archived(
  222. sync_result.archived, time_now, serialize_options
  223. )
  224. logger.debug("building sync response dict")
  225. response: JsonDict = defaultdict(dict)
  226. response["next_batch"] = await sync_result.next_batch.to_string(self.store)
  227. if sync_result.account_data:
  228. response["account_data"] = {"events": sync_result.account_data}
  229. if sync_result.presence:
  230. response["presence"] = SyncRestServlet.encode_presence(
  231. sync_result.presence, time_now
  232. )
  233. if sync_result.to_device:
  234. response["to_device"] = {"events": sync_result.to_device}
  235. if sync_result.device_lists.changed:
  236. response["device_lists"]["changed"] = list(sync_result.device_lists.changed)
  237. if sync_result.device_lists.left:
  238. response["device_lists"]["left"] = list(sync_result.device_lists.left)
  239. # We always include this because https://github.com/vector-im/element-android/issues/3725
  240. # The spec isn't terribly clear on when this can be omitted and how a client would tell
  241. # the difference between "no keys present" and "nothing changed" in terms of whole field
  242. # absent / individual key type entry absent
  243. # Corresponding synapse issue: https://github.com/matrix-org/synapse/issues/10456
  244. response["device_one_time_keys_count"] = sync_result.device_one_time_keys_count
  245. # https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
  246. # states that this field should always be included, as long as the server supports the feature.
  247. response[
  248. "org.matrix.msc2732.device_unused_fallback_key_types"
  249. ] = sync_result.device_unused_fallback_key_types
  250. response[
  251. "device_unused_fallback_key_types"
  252. ] = sync_result.device_unused_fallback_key_types
  253. if joined:
  254. response["rooms"][Membership.JOIN] = joined
  255. if invited:
  256. response["rooms"][Membership.INVITE] = invited
  257. if knocked:
  258. response["rooms"][Membership.KNOCK] = knocked
  259. if archived:
  260. response["rooms"][Membership.LEAVE] = archived
  261. return response
  262. @staticmethod
  263. def encode_presence(events: List[UserPresenceState], time_now: int) -> JsonDict:
  264. return {
  265. "events": [
  266. {
  267. "type": "m.presence",
  268. "sender": event.user_id,
  269. "content": format_user_presence_state(
  270. event, time_now, include_user_id=False
  271. ),
  272. }
  273. for event in events
  274. ]
  275. }
  276. @trace(opname="sync.encode_joined")
  277. async def encode_joined(
  278. self,
  279. rooms: List[JoinedSyncResult],
  280. time_now: int,
  281. serialize_options: SerializeEventConfig,
  282. ) -> JsonDict:
  283. """
  284. Encode the joined rooms in a sync result
  285. Args:
  286. rooms: list of sync results for rooms this user is joined to
  287. time_now: current time - used as a baseline for age calculations
  288. serialize_options: Event serializer options
  289. Returns:
  290. The joined rooms list, in our response format
  291. """
  292. joined = {}
  293. for room in rooms:
  294. joined[room.room_id] = await self.encode_room(
  295. room, time_now, joined=True, serialize_options=serialize_options
  296. )
  297. return joined
  298. @trace(opname="sync.encode_invited")
  299. async def encode_invited(
  300. self,
  301. rooms: List[InvitedSyncResult],
  302. time_now: int,
  303. serialize_options: SerializeEventConfig,
  304. ) -> JsonDict:
  305. """
  306. Encode the invited rooms in a sync result
  307. Args:
  308. rooms: list of sync results for rooms this user is invited to
  309. time_now: current time - used as a baseline for age calculations
  310. serialize_options: Event serializer options
  311. Returns:
  312. The invited rooms list, in our response format
  313. """
  314. invited = {}
  315. for room in rooms:
  316. invite = self._event_serializer.serialize_event(
  317. room.invite, time_now, config=serialize_options
  318. )
  319. unsigned = dict(invite.get("unsigned", {}))
  320. invite["unsigned"] = unsigned
  321. invited_state = list(unsigned.pop("invite_room_state", []))
  322. invited_state.append(invite)
  323. invited[room.room_id] = {"invite_state": {"events": invited_state}}
  324. return invited
  325. @trace(opname="sync.encode_knocked")
  326. async def encode_knocked(
  327. self,
  328. rooms: List[KnockedSyncResult],
  329. time_now: int,
  330. serialize_options: SerializeEventConfig,
  331. ) -> Dict[str, Dict[str, Any]]:
  332. """
  333. Encode the rooms we've knocked on in a sync result.
  334. Args:
  335. rooms: list of sync results for rooms this user is knocking on
  336. time_now: current time - used as a baseline for age calculations
  337. serialize_options: Event serializer options
  338. Returns:
  339. The list of rooms the user has knocked on, in our response format.
  340. """
  341. knocked = {}
  342. for room in rooms:
  343. knock = self._event_serializer.serialize_event(
  344. room.knock, time_now, config=serialize_options
  345. )
  346. # Extract the `unsigned` key from the knock event.
  347. # This is where we (cheekily) store the knock state events
  348. unsigned = knock.setdefault("unsigned", {})
  349. # Duplicate the dictionary in order to avoid modifying the original
  350. unsigned = dict(unsigned)
  351. # Extract the stripped room state from the unsigned dict
  352. # This is for clients to get a little bit of information about
  353. # the room they've knocked on, without revealing any sensitive information
  354. knocked_state = list(unsigned.pop("knock_room_state", []))
  355. # Append the actual knock membership event itself as well. This provides
  356. # the client with:
  357. #
  358. # * A knock state event that they can use for easier internal tracking
  359. # * The rough timestamp of when the knock occurred contained within the event
  360. knocked_state.append(knock)
  361. # Build the `knock_state` dictionary, which will contain the state of the
  362. # room that the client has knocked on
  363. knocked[room.room_id] = {"knock_state": {"events": knocked_state}}
  364. return knocked
  365. @trace(opname="sync.encode_archived")
  366. async def encode_archived(
  367. self,
  368. rooms: List[ArchivedSyncResult],
  369. time_now: int,
  370. serialize_options: SerializeEventConfig,
  371. ) -> JsonDict:
  372. """
  373. Encode the archived rooms in a sync result
  374. Args:
  375. rooms: list of sync results for rooms this user is joined to
  376. time_now: current time - used as a baseline for age calculations
  377. serialize_options: Event serializer options
  378. Returns:
  379. The archived rooms list, in our response format
  380. """
  381. joined = {}
  382. for room in rooms:
  383. joined[room.room_id] = await self.encode_room(
  384. room, time_now, joined=False, serialize_options=serialize_options
  385. )
  386. return joined
  387. async def encode_room(
  388. self,
  389. room: Union[JoinedSyncResult, ArchivedSyncResult],
  390. time_now: int,
  391. joined: bool,
  392. serialize_options: SerializeEventConfig,
  393. ) -> JsonDict:
  394. """
  395. Args:
  396. room: sync result for a single room
  397. time_now: current time - used as a baseline for age calculations
  398. token_id: ID of the user's auth token - used for namespacing
  399. of transaction IDs
  400. joined: True if the user is joined to this room - will mean
  401. we handle ephemeral events
  402. only_fields: Optional. The list of event fields to include.
  403. event_formatter: function to convert from federation format
  404. to client format
  405. Returns:
  406. The room, encoded in our response format
  407. """
  408. state_dict = room.state
  409. timeline_events = room.timeline.events
  410. state_events = state_dict.values()
  411. for event in itertools.chain(state_events, timeline_events):
  412. # We've had bug reports that events were coming down under the
  413. # wrong room.
  414. if event.room_id != room.room_id:
  415. logger.warning(
  416. "Event %r is under room %r instead of %r",
  417. event.event_id,
  418. room.room_id,
  419. event.room_id,
  420. )
  421. serialized_state = self._event_serializer.serialize_events(
  422. state_events, time_now, config=serialize_options
  423. )
  424. serialized_timeline = self._event_serializer.serialize_events(
  425. timeline_events,
  426. time_now,
  427. config=serialize_options,
  428. bundle_aggregations=room.timeline.bundled_aggregations,
  429. )
  430. account_data = room.account_data
  431. result: JsonDict = {
  432. "timeline": {
  433. "events": serialized_timeline,
  434. "prev_batch": await room.timeline.prev_batch.to_string(self.store),
  435. "limited": room.timeline.limited,
  436. },
  437. "state": {"events": serialized_state},
  438. "account_data": {"events": account_data},
  439. }
  440. if joined:
  441. assert isinstance(room, JoinedSyncResult)
  442. ephemeral_events = room.ephemeral
  443. result["ephemeral"] = {"events": ephemeral_events}
  444. result["unread_notifications"] = room.unread_notifications
  445. result["summary"] = room.summary
  446. if self._msc2654_enabled:
  447. result["org.matrix.msc2654.unread_count"] = room.unread_count
  448. return result
  449. def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
  450. SyncRestServlet(hs).register(http_server)