sync.py 96 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415
  1. # Copyright 2015-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import itertools
  15. import logging
  16. from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
  17. import attr
  18. from prometheus_client import Counter
  19. from synapse.api.constants import EventTypes, Membership, ReceiptTypes
  20. from synapse.api.filtering import FilterCollection
  21. from synapse.api.presence import UserPresenceState
  22. from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
  23. from synapse.events import EventBase
  24. from synapse.handlers.relations import BundledAggregations
  25. from synapse.logging.context import current_context
  26. from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
  27. from synapse.push.clientformat import format_push_rules_for_user
  28. from synapse.storage.databases.main.event_push_actions import NotifCounts
  29. from synapse.storage.roommember import MemberSummary
  30. from synapse.storage.state import StateFilter
  31. from synapse.types import (
  32. DeviceListUpdates,
  33. JsonDict,
  34. MutableStateMap,
  35. Requester,
  36. RoomStreamToken,
  37. StateMap,
  38. StreamKeyType,
  39. StreamToken,
  40. UserID,
  41. )
  42. from synapse.util.async_helpers import concurrently_execute
  43. from synapse.util.caches.expiringcache import ExpiringCache
  44. from synapse.util.caches.lrucache import LruCache
  45. from synapse.util.caches.response_cache import ResponseCache, ResponseCacheContext
  46. from synapse.util.metrics import Measure, measure_func
  47. from synapse.visibility import filter_events_for_client
  48. if TYPE_CHECKING:
  49. from synapse.server import HomeServer
  50. logger = logging.getLogger(__name__)
  51. # Counts the number of times we returned a non-empty sync. `type` is one of
  52. # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is
  53. # "true" or "false" depending on if the request asked for lazy loaded members or
  54. # not.
  55. non_empty_sync_counter = Counter(
  56. "synapse_handlers_sync_nonempty_total",
  57. "Count of non empty sync responses. type is initial_sync/full_state_sync"
  58. "/incremental_sync. lazy_loaded indicates if lazy loaded members were "
  59. "enabled for that request.",
  60. ["type", "lazy_loaded"],
  61. )
  62. # Store the cache that tracks which lazy-loaded members have been sent to a given
  63. # client for no more than 30 minutes.
  64. LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000
  65. # Remember the last 100 members we sent to a client for the purposes of
  66. # avoiding redundantly sending the same lazy-loaded members to the client
  67. LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
  68. SyncRequestKey = Tuple[Any, ...]
  69. @attr.s(slots=True, frozen=True, auto_attribs=True)
  70. class SyncConfig:
  71. user: UserID
  72. filter_collection: FilterCollection
  73. is_guest: bool
  74. request_key: SyncRequestKey
  75. device_id: Optional[str]
  76. @attr.s(slots=True, frozen=True, auto_attribs=True)
  77. class TimelineBatch:
  78. prev_batch: StreamToken
  79. events: List[EventBase]
  80. limited: bool
  81. # A mapping of event ID to the bundled aggregations for the above events.
  82. # This is only calculated if limited is true.
  83. bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
  84. def __bool__(self) -> bool:
  85. """Make the result appear empty if there are no updates. This is used
  86. to tell if room needs to be part of the sync result.
  87. """
  88. return bool(self.events)
  89. # We can't freeze this class, because we need to update it after it's instantiated to
  90. # update its unread count. This is because we calculate the unread count for a room only
  91. # if there are updates for it, which we check after the instance has been created.
  92. # This should not be a big deal because we update the notification counts afterwards as
  93. # well anyway.
  94. @attr.s(slots=True, auto_attribs=True)
  95. class JoinedSyncResult:
  96. room_id: str
  97. timeline: TimelineBatch
  98. state: StateMap[EventBase]
  99. ephemeral: List[JsonDict]
  100. account_data: List[JsonDict]
  101. unread_notifications: JsonDict
  102. summary: Optional[JsonDict]
  103. unread_count: int
  104. def __bool__(self) -> bool:
  105. """Make the result appear empty if there are no updates. This is used
  106. to tell if room needs to be part of the sync result.
  107. """
  108. return bool(
  109. self.timeline
  110. or self.state
  111. or self.ephemeral
  112. or self.account_data
  113. # nb the notification count does not, er, count: if there's nothing
  114. # else in the result, we don't need to send it.
  115. )
  116. @attr.s(slots=True, frozen=True, auto_attribs=True)
  117. class ArchivedSyncResult:
  118. room_id: str
  119. timeline: TimelineBatch
  120. state: StateMap[EventBase]
  121. account_data: List[JsonDict]
  122. def __bool__(self) -> bool:
  123. """Make the result appear empty if there are no updates. This is used
  124. to tell if room needs to be part of the sync result.
  125. """
  126. return bool(self.timeline or self.state or self.account_data)
  127. @attr.s(slots=True, frozen=True, auto_attribs=True)
  128. class InvitedSyncResult:
  129. room_id: str
  130. invite: EventBase
  131. def __bool__(self) -> bool:
  132. """Invited rooms should always be reported to the client"""
  133. return True
  134. @attr.s(slots=True, frozen=True, auto_attribs=True)
  135. class KnockedSyncResult:
  136. room_id: str
  137. knock: EventBase
  138. def __bool__(self) -> bool:
  139. """Knocked rooms should always be reported to the client"""
  140. return True
  141. @attr.s(slots=True, frozen=True, auto_attribs=True)
  142. class GroupsSyncResult:
  143. join: JsonDict
  144. invite: JsonDict
  145. leave: JsonDict
  146. def __bool__(self) -> bool:
  147. return bool(self.join or self.invite or self.leave)
  148. @attr.s(slots=True, auto_attribs=True)
  149. class _RoomChanges:
  150. """The set of room entries to include in the sync, plus the set of joined
  151. and left room IDs since last sync.
  152. """
  153. room_entries: List["RoomSyncResultBuilder"]
  154. invited: List[InvitedSyncResult]
  155. knocked: List[KnockedSyncResult]
  156. newly_joined_rooms: List[str]
  157. newly_left_rooms: List[str]
  158. @attr.s(slots=True, frozen=True, auto_attribs=True)
  159. class SyncResult:
  160. """
  161. Attributes:
  162. next_batch: Token for the next sync
  163. presence: List of presence events for the user.
  164. account_data: List of account_data events for the user.
  165. joined: JoinedSyncResult for each joined room.
  166. invited: InvitedSyncResult for each invited room.
  167. knocked: KnockedSyncResult for each knocked on room.
  168. archived: ArchivedSyncResult for each archived room.
  169. to_device: List of direct messages for the device.
  170. device_lists: List of user_ids whose devices have changed
  171. device_one_time_keys_count: Dict of algorithm to count for one time keys
  172. for this device
  173. device_unused_fallback_key_types: List of key types that have an unused fallback
  174. key
  175. groups: Group updates, if any
  176. """
  177. next_batch: StreamToken
  178. presence: List[UserPresenceState]
  179. account_data: List[JsonDict]
  180. joined: List[JoinedSyncResult]
  181. invited: List[InvitedSyncResult]
  182. knocked: List[KnockedSyncResult]
  183. archived: List[ArchivedSyncResult]
  184. to_device: List[JsonDict]
  185. device_lists: DeviceListUpdates
  186. device_one_time_keys_count: JsonDict
  187. device_unused_fallback_key_types: List[str]
  188. groups: Optional[GroupsSyncResult]
  189. def __bool__(self) -> bool:
  190. """Make the result appear empty if there are no updates. This is used
  191. to tell if the notifier needs to wait for more events when polling for
  192. events.
  193. """
  194. return bool(
  195. self.presence
  196. or self.joined
  197. or self.invited
  198. or self.knocked
  199. or self.archived
  200. or self.account_data
  201. or self.to_device
  202. or self.device_lists
  203. or self.groups
  204. )
  205. class SyncHandler:
  206. def __init__(self, hs: "HomeServer"):
  207. self.hs_config = hs.config
  208. self.store = hs.get_datastores().main
  209. self.notifier = hs.get_notifier()
  210. self.presence_handler = hs.get_presence_handler()
  211. self._relations_handler = hs.get_relations_handler()
  212. self.event_sources = hs.get_event_sources()
  213. self.clock = hs.get_clock()
  214. self.state = hs.get_state_handler()
  215. self.auth = hs.get_auth()
  216. self.storage = hs.get_storage()
  217. self.state_store = self.storage.state
  218. # TODO: flush cache entries on subsequent sync request.
  219. # Once we get the next /sync request (ie, one with the same access token
  220. # that sets 'since' to 'next_batch'), we know that device won't need a
  221. # cached result any more, and we could flush the entry from the cache to save
  222. # memory.
  223. self.response_cache: ResponseCache[SyncRequestKey] = ResponseCache(
  224. hs.get_clock(),
  225. "sync",
  226. timeout_ms=hs.config.caches.sync_response_cache_duration,
  227. )
  228. # ExpiringCache((User, Device)) -> LruCache(user_id => event_id)
  229. self.lazy_loaded_members_cache: ExpiringCache[
  230. Tuple[str, Optional[str]], LruCache[str, str]
  231. ] = ExpiringCache(
  232. "lazy_loaded_members_cache",
  233. self.clock,
  234. max_len=0,
  235. expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
  236. )
  237. self.rooms_to_exclude = hs.config.server.rooms_to_exclude_from_sync
  238. async def wait_for_sync_for_user(
  239. self,
  240. requester: Requester,
  241. sync_config: SyncConfig,
  242. since_token: Optional[StreamToken] = None,
  243. timeout: int = 0,
  244. full_state: bool = False,
  245. ) -> SyncResult:
  246. """Get the sync for a client if we have new data for it now. Otherwise
  247. wait for new data to arrive on the server. If the timeout expires, then
  248. return an empty sync result.
  249. """
  250. # If the user is not part of the mau group, then check that limits have
  251. # not been exceeded (if not part of the group by this point, almost certain
  252. # auth_blocking will occur)
  253. user_id = sync_config.user.to_string()
  254. await self.auth.check_auth_blocking(requester=requester)
  255. res = await self.response_cache.wrap(
  256. sync_config.request_key,
  257. self._wait_for_sync_for_user,
  258. sync_config,
  259. since_token,
  260. timeout,
  261. full_state,
  262. cache_context=True,
  263. )
  264. logger.debug("Returning sync response for %s", user_id)
  265. return res
  266. async def _wait_for_sync_for_user(
  267. self,
  268. sync_config: SyncConfig,
  269. since_token: Optional[StreamToken],
  270. timeout: int,
  271. full_state: bool,
  272. cache_context: ResponseCacheContext[SyncRequestKey],
  273. ) -> SyncResult:
  274. """The start of the machinery that produces a /sync response.
  275. See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details.
  276. This method does high-level bookkeeping:
  277. - tracking the kind of sync in the logging context
  278. - deleting any to_device messages whose delivery has been acknowledged.
  279. - deciding if we should dispatch an instant or delayed response
  280. - marking the sync as being lazily loaded, if appropriate
  281. Computing the body of the response begins in the next method,
  282. `current_sync_for_user`.
  283. """
  284. if since_token is None:
  285. sync_type = "initial_sync"
  286. elif full_state:
  287. sync_type = "full_state_sync"
  288. else:
  289. sync_type = "incremental_sync"
  290. context = current_context()
  291. if context:
  292. context.tag = sync_type
  293. # if we have a since token, delete any to-device messages before that token
  294. # (since we now know that the device has received them)
  295. if since_token is not None:
  296. since_stream_id = since_token.to_device_key
  297. deleted = await self.store.delete_messages_for_device(
  298. sync_config.user.to_string(), sync_config.device_id, since_stream_id
  299. )
  300. logger.debug(
  301. "Deleted %d to-device messages up to %d", deleted, since_stream_id
  302. )
  303. if timeout == 0 or since_token is None or full_state:
  304. # we are going to return immediately, so don't bother calling
  305. # notifier.wait_for_events.
  306. result: SyncResult = await self.current_sync_for_user(
  307. sync_config, since_token, full_state=full_state
  308. )
  309. else:
  310. # Otherwise, we wait for something to happen and report it to the user.
  311. async def current_sync_callback(
  312. before_token: StreamToken, after_token: StreamToken
  313. ) -> SyncResult:
  314. return await self.current_sync_for_user(sync_config, since_token)
  315. result = await self.notifier.wait_for_events(
  316. sync_config.user.to_string(),
  317. timeout,
  318. current_sync_callback,
  319. from_token=since_token,
  320. )
  321. # if nothing has happened in any of the users' rooms since /sync was called,
  322. # the resultant next_batch will be the same as since_token (since the result
  323. # is generated when wait_for_events is first called, and not regenerated
  324. # when wait_for_events times out).
  325. #
  326. # If that happens, we mustn't cache it, so that when the client comes back
  327. # with the same cache token, we don't immediately return the same empty
  328. # result, causing a tightloop. (#8518)
  329. if result.next_batch == since_token:
  330. cache_context.should_cache = False
  331. if result:
  332. if sync_config.filter_collection.lazy_load_members():
  333. lazy_loaded = "true"
  334. else:
  335. lazy_loaded = "false"
  336. non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
  337. return result
  338. async def current_sync_for_user(
  339. self,
  340. sync_config: SyncConfig,
  341. since_token: Optional[StreamToken] = None,
  342. full_state: bool = False,
  343. ) -> SyncResult:
  344. """Generates the response body of a sync result, represented as a SyncResult.
  345. This is a wrapper around `generate_sync_result` which starts an open tracing
  346. span to track the sync. See `generate_sync_result` for the next part of your
  347. indoctrination.
  348. """
  349. with start_active_span("sync.current_sync_for_user"):
  350. log_kv({"since_token": since_token})
  351. sync_result = await self.generate_sync_result(
  352. sync_config, since_token, full_state
  353. )
  354. set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
  355. return sync_result
  356. async def push_rules_for_user(self, user: UserID) -> JsonDict:
  357. user_id = user.to_string()
  358. rules = await self.store.get_push_rules_for_user(user_id)
  359. rules = format_push_rules_for_user(user, rules)
  360. return rules
  361. async def ephemeral_by_room(
  362. self,
  363. sync_result_builder: "SyncResultBuilder",
  364. now_token: StreamToken,
  365. since_token: Optional[StreamToken] = None,
  366. ) -> Tuple[StreamToken, Dict[str, List[JsonDict]]]:
  367. """Get the ephemeral events for each room the user is in
  368. Args:
  369. sync_result_builder
  370. now_token: Where the server is currently up to.
  371. since_token: Where the server was when the client
  372. last synced.
  373. Returns:
  374. A tuple of the now StreamToken, updated to reflect the which typing
  375. events are included, and a dict mapping from room_id to a list of
  376. typing events for that room.
  377. """
  378. sync_config = sync_result_builder.sync_config
  379. with Measure(self.clock, "ephemeral_by_room"):
  380. typing_key = since_token.typing_key if since_token else 0
  381. room_ids = sync_result_builder.joined_room_ids
  382. typing_source = self.event_sources.sources.typing
  383. typing, typing_key = await typing_source.get_new_events(
  384. user=sync_config.user,
  385. from_key=typing_key,
  386. limit=sync_config.filter_collection.ephemeral_limit(),
  387. room_ids=room_ids,
  388. is_guest=sync_config.is_guest,
  389. )
  390. now_token = now_token.copy_and_replace(StreamKeyType.TYPING, typing_key)
  391. ephemeral_by_room: JsonDict = {}
  392. for event in typing:
  393. # we want to exclude the room_id from the event, but modifying the
  394. # result returned by the event source is poor form (it might cache
  395. # the object)
  396. room_id = event["room_id"]
  397. event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
  398. ephemeral_by_room.setdefault(room_id, []).append(event_copy)
  399. receipt_key = since_token.receipt_key if since_token else 0
  400. receipt_source = self.event_sources.sources.receipt
  401. receipts, receipt_key = await receipt_source.get_new_events(
  402. user=sync_config.user,
  403. from_key=receipt_key,
  404. limit=sync_config.filter_collection.ephemeral_limit(),
  405. room_ids=room_ids,
  406. is_guest=sync_config.is_guest,
  407. )
  408. now_token = now_token.copy_and_replace(StreamKeyType.RECEIPT, receipt_key)
  409. for event in receipts:
  410. room_id = event["room_id"]
  411. # exclude room id, as above
  412. event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
  413. ephemeral_by_room.setdefault(room_id, []).append(event_copy)
  414. return now_token, ephemeral_by_room
  415. async def _load_filtered_recents(
  416. self,
  417. room_id: str,
  418. sync_config: SyncConfig,
  419. now_token: StreamToken,
  420. since_token: Optional[StreamToken] = None,
  421. potential_recents: Optional[List[EventBase]] = None,
  422. newly_joined_room: bool = False,
  423. ) -> TimelineBatch:
  424. with Measure(self.clock, "load_filtered_recents"):
  425. timeline_limit = sync_config.filter_collection.timeline_limit()
  426. block_all_timeline = (
  427. sync_config.filter_collection.blocks_all_room_timeline()
  428. )
  429. if (
  430. potential_recents is None
  431. or newly_joined_room
  432. or timeline_limit < len(potential_recents)
  433. ):
  434. limited = True
  435. else:
  436. limited = False
  437. log_kv({"limited": limited})
  438. if potential_recents:
  439. recents = await sync_config.filter_collection.filter_room_timeline(
  440. potential_recents
  441. )
  442. log_kv({"recents_after_sync_filtering": len(recents)})
  443. # We check if there are any state events, if there are then we pass
  444. # all current state events to the filter_events function. This is to
  445. # ensure that we always include current state in the timeline
  446. current_state_ids: FrozenSet[str] = frozenset()
  447. if any(e.is_state() for e in recents):
  448. current_state_ids_map = await self.store.get_current_state_ids(
  449. room_id
  450. )
  451. current_state_ids = frozenset(current_state_ids_map.values())
  452. recents = await filter_events_for_client(
  453. self.storage,
  454. sync_config.user.to_string(),
  455. recents,
  456. always_include_ids=current_state_ids,
  457. )
  458. log_kv({"recents_after_visibility_filtering": len(recents)})
  459. else:
  460. recents = []
  461. if not limited or block_all_timeline:
  462. prev_batch_token = now_token
  463. if recents:
  464. room_key = recents[0].internal_metadata.before
  465. prev_batch_token = now_token.copy_and_replace(
  466. StreamKeyType.ROOM, room_key
  467. )
  468. return TimelineBatch(
  469. events=recents, prev_batch=prev_batch_token, limited=False
  470. )
  471. filtering_factor = 2
  472. load_limit = max(timeline_limit * filtering_factor, 10)
  473. max_repeat = 5 # Only try a few times per room, otherwise
  474. room_key = now_token.room_key
  475. end_key = room_key
  476. since_key = None
  477. if since_token and not newly_joined_room:
  478. since_key = since_token.room_key
  479. while limited and len(recents) < timeline_limit and max_repeat:
  480. # If we have a since_key then we are trying to get any events
  481. # that have happened since `since_key` up to `end_key`, so we
  482. # can just use `get_room_events_stream_for_room`.
  483. # Otherwise, we want to return the last N events in the room
  484. # in topological ordering.
  485. if since_key:
  486. events, end_key = await self.store.get_room_events_stream_for_room(
  487. room_id,
  488. limit=load_limit + 1,
  489. from_key=since_key,
  490. to_key=end_key,
  491. )
  492. else:
  493. events, end_key = await self.store.get_recent_events_for_room(
  494. room_id, limit=load_limit + 1, end_token=end_key
  495. )
  496. log_kv({"loaded_recents": len(events)})
  497. loaded_recents = (
  498. await sync_config.filter_collection.filter_room_timeline(events)
  499. )
  500. log_kv({"loaded_recents_after_sync_filtering": len(loaded_recents)})
  501. # We check if there are any state events, if there are then we pass
  502. # all current state events to the filter_events function. This is to
  503. # ensure that we always include current state in the timeline
  504. current_state_ids = frozenset()
  505. if any(e.is_state() for e in loaded_recents):
  506. current_state_ids_map = await self.store.get_current_state_ids(
  507. room_id
  508. )
  509. current_state_ids = frozenset(current_state_ids_map.values())
  510. loaded_recents = await filter_events_for_client(
  511. self.storage,
  512. sync_config.user.to_string(),
  513. loaded_recents,
  514. always_include_ids=current_state_ids,
  515. )
  516. log_kv({"loaded_recents_after_client_filtering": len(loaded_recents)})
  517. loaded_recents.extend(recents)
  518. recents = loaded_recents
  519. if len(events) <= load_limit:
  520. limited = False
  521. break
  522. max_repeat -= 1
  523. if len(recents) > timeline_limit:
  524. limited = True
  525. recents = recents[-timeline_limit:]
  526. room_key = recents[0].internal_metadata.before
  527. prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
  528. # Don't bother to bundle aggregations if the timeline is unlimited,
  529. # as clients will have all the necessary information.
  530. bundled_aggregations = None
  531. if limited or newly_joined_room:
  532. bundled_aggregations = (
  533. await self._relations_handler.get_bundled_aggregations(
  534. recents, sync_config.user.to_string()
  535. )
  536. )
  537. return TimelineBatch(
  538. events=recents,
  539. prev_batch=prev_batch_token,
  540. limited=limited or newly_joined_room,
  541. bundled_aggregations=bundled_aggregations,
  542. )
  543. async def get_state_after_event(
  544. self, event: EventBase, state_filter: Optional[StateFilter] = None
  545. ) -> StateMap[str]:
  546. """
  547. Get the room state after the given event
  548. Args:
  549. event: event of interest
  550. state_filter: The state filter used to fetch state from the database.
  551. """
  552. state_ids = await self.state_store.get_state_ids_for_event(
  553. event.event_id, state_filter=state_filter or StateFilter.all()
  554. )
  555. if event.is_state():
  556. state_ids = dict(state_ids)
  557. state_ids[(event.type, event.state_key)] = event.event_id
  558. return state_ids
  559. async def get_state_at(
  560. self,
  561. room_id: str,
  562. stream_position: StreamToken,
  563. state_filter: Optional[StateFilter] = None,
  564. ) -> StateMap[str]:
  565. """Get the room state at a particular stream position
  566. Args:
  567. room_id: room for which to get state
  568. stream_position: point at which to get state
  569. state_filter: The state filter used to fetch state from the database.
  570. """
  571. # FIXME: This gets the state at the latest event before the stream ordering,
  572. # which might not be the same as the "current state" of the room at the time
  573. # of the stream token if there were multiple forward extremities at the time.
  574. last_event = await self.store.get_last_event_in_room_before_stream_ordering(
  575. room_id,
  576. end_token=stream_position.room_key,
  577. )
  578. if last_event:
  579. state = await self.get_state_after_event(
  580. last_event, state_filter=state_filter or StateFilter.all()
  581. )
  582. else:
  583. # no events in this room - so presumably no state
  584. state = {}
  585. # (erikj) This should be rarely hit, but we've had some reports that
  586. # we get more state down gappy syncs than we should, so let's add
  587. # some logging.
  588. logger.info(
  589. "Failed to find any events in room %s at %s",
  590. room_id,
  591. stream_position.room_key,
  592. )
  593. return state
  594. async def compute_summary(
  595. self,
  596. room_id: str,
  597. sync_config: SyncConfig,
  598. batch: TimelineBatch,
  599. state: MutableStateMap[EventBase],
  600. now_token: StreamToken,
  601. ) -> Optional[JsonDict]:
  602. """Works out a room summary block for this room, summarising the number
  603. of joined members in the room, and providing the 'hero' members if the
  604. room has no name so clients can consistently name rooms. Also adds
  605. state events to 'state' if needed to describe the heroes.
  606. Args
  607. room_id
  608. sync_config
  609. batch: The timeline batch for the room that will be sent to the user.
  610. state: State as returned by compute_state_delta
  611. now_token: Token of the end of the current batch.
  612. """
  613. # FIXME: we could/should get this from room_stats when matthew/stats lands
  614. # FIXME: this promulgates https://github.com/matrix-org/synapse/issues/3305
  615. last_events, _ = await self.store.get_recent_event_ids_for_room(
  616. room_id, end_token=now_token.room_key, limit=1
  617. )
  618. if not last_events:
  619. return None
  620. last_event = last_events[-1]
  621. state_ids = await self.state_store.get_state_ids_for_event(
  622. last_event.event_id,
  623. state_filter=StateFilter.from_types(
  624. [(EventTypes.Name, ""), (EventTypes.CanonicalAlias, "")]
  625. ),
  626. )
  627. # this is heavily cached, thus: fast.
  628. details = await self.store.get_room_summary(room_id)
  629. name_id = state_ids.get((EventTypes.Name, ""))
  630. canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ""))
  631. summary: JsonDict = {}
  632. empty_ms = MemberSummary([], 0)
  633. # TODO: only send these when they change.
  634. summary["m.joined_member_count"] = details.get(Membership.JOIN, empty_ms).count
  635. summary["m.invited_member_count"] = details.get(
  636. Membership.INVITE, empty_ms
  637. ).count
  638. # if the room has a name or canonical_alias set, we can skip
  639. # calculating heroes. Empty strings are falsey, so we check
  640. # for the "name" value and default to an empty string.
  641. if name_id:
  642. name = await self.store.get_event(name_id, allow_none=True)
  643. if name and name.content.get("name"):
  644. return summary
  645. if canonical_alias_id:
  646. canonical_alias = await self.store.get_event(
  647. canonical_alias_id, allow_none=True
  648. )
  649. if canonical_alias and canonical_alias.content.get("alias"):
  650. return summary
  651. me = sync_config.user.to_string()
  652. joined_user_ids = [
  653. r[0] for r in details.get(Membership.JOIN, empty_ms).members if r[0] != me
  654. ]
  655. invited_user_ids = [
  656. r[0] for r in details.get(Membership.INVITE, empty_ms).members if r[0] != me
  657. ]
  658. gone_user_ids = [
  659. r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me
  660. ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me]
  661. # FIXME: only build up a member_ids list for our heroes
  662. member_ids = {}
  663. for membership in (
  664. Membership.JOIN,
  665. Membership.INVITE,
  666. Membership.LEAVE,
  667. Membership.BAN,
  668. ):
  669. for user_id, event_id in details.get(membership, empty_ms).members:
  670. member_ids[user_id] = event_id
  671. # FIXME: order by stream ordering rather than as returned by SQL
  672. if joined_user_ids or invited_user_ids:
  673. summary["m.heroes"] = sorted(joined_user_ids + invited_user_ids)[0:5]
  674. else:
  675. summary["m.heroes"] = sorted(gone_user_ids)[0:5]
  676. if not sync_config.filter_collection.lazy_load_members():
  677. return summary
  678. # ensure we send membership events for heroes if needed
  679. cache_key = (sync_config.user.to_string(), sync_config.device_id)
  680. cache = self.get_lazy_loaded_members_cache(cache_key)
  681. # track which members the client should already know about via LL:
  682. # Ones which are already in state...
  683. existing_members = {
  684. user_id for (typ, user_id) in state.keys() if typ == EventTypes.Member
  685. }
  686. # ...or ones which are in the timeline...
  687. for ev in batch.events:
  688. if ev.type == EventTypes.Member:
  689. existing_members.add(ev.state_key)
  690. # ...and then ensure any missing ones get included in state.
  691. missing_hero_event_ids = [
  692. member_ids[hero_id]
  693. for hero_id in summary["m.heroes"]
  694. if (
  695. cache.get(hero_id) != member_ids[hero_id]
  696. and hero_id not in existing_members
  697. )
  698. ]
  699. missing_hero_state = await self.store.get_events(missing_hero_event_ids)
  700. for s in missing_hero_state.values():
  701. cache.set(s.state_key, s.event_id)
  702. state[(EventTypes.Member, s.state_key)] = s
  703. return summary
  704. def get_lazy_loaded_members_cache(
  705. self, cache_key: Tuple[str, Optional[str]]
  706. ) -> LruCache[str, str]:
  707. cache: Optional[LruCache[str, str]] = self.lazy_loaded_members_cache.get(
  708. cache_key
  709. )
  710. if cache is None:
  711. logger.debug("creating LruCache for %r", cache_key)
  712. cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
  713. self.lazy_loaded_members_cache[cache_key] = cache
  714. else:
  715. logger.debug("found LruCache for %r", cache_key)
  716. return cache
  717. async def compute_state_delta(
  718. self,
  719. room_id: str,
  720. batch: TimelineBatch,
  721. sync_config: SyncConfig,
  722. since_token: Optional[StreamToken],
  723. now_token: StreamToken,
  724. full_state: bool,
  725. ) -> MutableStateMap[EventBase]:
  726. """Works out the difference in state between the start of the timeline
  727. and the previous sync.
  728. Args:
  729. room_id:
  730. batch: The timeline batch for the room that will be sent to the user.
  731. sync_config:
  732. since_token: Token of the end of the previous batch. May be None.
  733. now_token: Token of the end of the current batch.
  734. full_state: Whether to force returning the full state.
  735. """
  736. # TODO(mjark) Check if the state events were received by the server
  737. # after the previous sync, since we need to include those state
  738. # updates even if they occurred logically before the previous event.
  739. # TODO(mjark) Check for new redactions in the state events.
  740. with Measure(self.clock, "compute_state_delta"):
  741. members_to_fetch = None
  742. lazy_load_members = sync_config.filter_collection.lazy_load_members()
  743. include_redundant_members = (
  744. sync_config.filter_collection.include_redundant_members()
  745. )
  746. if lazy_load_members:
  747. # We only request state for the members needed to display the
  748. # timeline:
  749. members_to_fetch = {
  750. event.sender # FIXME: we also care about invite targets etc.
  751. for event in batch.events
  752. }
  753. if full_state:
  754. # always make sure we LL ourselves so we know we're in the room
  755. # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
  756. # We only need apply this on full state syncs given we disabled
  757. # LL for incr syncs in #3840.
  758. members_to_fetch.add(sync_config.user.to_string())
  759. state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
  760. else:
  761. state_filter = StateFilter.all()
  762. timeline_state = {
  763. (event.type, event.state_key): event.event_id
  764. for event in batch.events
  765. if event.is_state()
  766. }
  767. if full_state:
  768. if batch:
  769. current_state_ids = await self.state_store.get_state_ids_for_event(
  770. batch.events[-1].event_id, state_filter=state_filter
  771. )
  772. state_ids = await self.state_store.get_state_ids_for_event(
  773. batch.events[0].event_id, state_filter=state_filter
  774. )
  775. else:
  776. current_state_ids = await self.get_state_at(
  777. room_id, stream_position=now_token, state_filter=state_filter
  778. )
  779. state_ids = current_state_ids
  780. state_ids = _calculate_state(
  781. timeline_contains=timeline_state,
  782. timeline_start=state_ids,
  783. previous={},
  784. current=current_state_ids,
  785. lazy_load_members=lazy_load_members,
  786. )
  787. elif batch.limited:
  788. if batch:
  789. state_at_timeline_start = (
  790. await self.state_store.get_state_ids_for_event(
  791. batch.events[0].event_id, state_filter=state_filter
  792. )
  793. )
  794. else:
  795. # We can get here if the user has ignored the senders of all
  796. # the recent events.
  797. state_at_timeline_start = await self.get_state_at(
  798. room_id, stream_position=now_token, state_filter=state_filter
  799. )
  800. # for now, we disable LL for gappy syncs - see
  801. # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
  802. # N.B. this slows down incr syncs as we are now processing way
  803. # more state in the server than if we were LLing.
  804. #
  805. # We still have to filter timeline_start to LL entries (above) in order
  806. # for _calculate_state's LL logic to work, as we have to include LL
  807. # members for timeline senders in case they weren't loaded in the initial
  808. # sync. We do this by (counterintuitively) by filtering timeline_start
  809. # members to just be ones which were timeline senders, which then ensures
  810. # all of the rest get included in the state block (if we need to know
  811. # about them).
  812. state_filter = StateFilter.all()
  813. # If this is an initial sync then full_state should be set, and
  814. # that case is handled above. We assert here to ensure that this
  815. # is indeed the case.
  816. assert since_token is not None
  817. state_at_previous_sync = await self.get_state_at(
  818. room_id, stream_position=since_token, state_filter=state_filter
  819. )
  820. if batch:
  821. current_state_ids = await self.state_store.get_state_ids_for_event(
  822. batch.events[-1].event_id, state_filter=state_filter
  823. )
  824. else:
  825. # Its not clear how we get here, but empirically we do
  826. # (#5407). Logging has been added elsewhere to try and
  827. # figure out where this state comes from.
  828. current_state_ids = await self.get_state_at(
  829. room_id, stream_position=now_token, state_filter=state_filter
  830. )
  831. state_ids = _calculate_state(
  832. timeline_contains=timeline_state,
  833. timeline_start=state_at_timeline_start,
  834. previous=state_at_previous_sync,
  835. current=current_state_ids,
  836. # we have to include LL members in case LL initial sync missed them
  837. lazy_load_members=lazy_load_members,
  838. )
  839. else:
  840. state_ids = {}
  841. if lazy_load_members:
  842. if members_to_fetch and batch.events:
  843. # We're returning an incremental sync, with no
  844. # "gap" since the previous sync, so normally there would be
  845. # no state to return.
  846. # But we're lazy-loading, so the client might need some more
  847. # member events to understand the events in this timeline.
  848. # So we fish out all the member events corresponding to the
  849. # timeline here, and then dedupe any redundant ones below.
  850. state_ids = await self.state_store.get_state_ids_for_event(
  851. batch.events[0].event_id,
  852. # we only want members!
  853. state_filter=StateFilter.from_types(
  854. (EventTypes.Member, member)
  855. for member in members_to_fetch
  856. ),
  857. )
  858. if lazy_load_members and not include_redundant_members:
  859. cache_key = (sync_config.user.to_string(), sync_config.device_id)
  860. cache = self.get_lazy_loaded_members_cache(cache_key)
  861. # if it's a new sync sequence, then assume the client has had
  862. # amnesia and doesn't want any recent lazy-loaded members
  863. # de-duplicated.
  864. if since_token is None:
  865. logger.debug("clearing LruCache for %r", cache_key)
  866. cache.clear()
  867. else:
  868. # only send members which aren't in our LruCache (either
  869. # because they're new to this client or have been pushed out
  870. # of the cache)
  871. logger.debug("filtering state from %r...", state_ids)
  872. state_ids = {
  873. t: event_id
  874. for t, event_id in state_ids.items()
  875. if cache.get(t[1]) != event_id
  876. }
  877. logger.debug("...to %r", state_ids)
  878. # add any member IDs we are about to send into our LruCache
  879. for t, event_id in itertools.chain(
  880. state_ids.items(), timeline_state.items()
  881. ):
  882. if t[0] == EventTypes.Member:
  883. cache.set(t[1], event_id)
  884. state: Dict[str, EventBase] = {}
  885. if state_ids:
  886. state = await self.store.get_events(list(state_ids.values()))
  887. return {
  888. (e.type, e.state_key): e
  889. for e in await sync_config.filter_collection.filter_room_state(
  890. list(state.values())
  891. )
  892. if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
  893. }
  894. async def unread_notifs_for_room_id(
  895. self, room_id: str, sync_config: SyncConfig
  896. ) -> NotifCounts:
  897. with Measure(self.clock, "unread_notifs_for_room_id"):
  898. last_unread_event_id = await self.store.get_last_receipt_event_id_for_user(
  899. user_id=sync_config.user.to_string(),
  900. room_id=room_id,
  901. receipt_types=(ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
  902. )
  903. return await self.store.get_unread_event_push_actions_by_room_for_user(
  904. room_id, sync_config.user.to_string(), last_unread_event_id
  905. )
  906. async def generate_sync_result(
  907. self,
  908. sync_config: SyncConfig,
  909. since_token: Optional[StreamToken] = None,
  910. full_state: bool = False,
  911. ) -> SyncResult:
  912. """Generates the response body of a sync result.
  913. This is represented by a `SyncResult` struct, which is built from small pieces
  914. using a `SyncResultBuilder`. See also
  915. https://spec.matrix.org/v1.1/client-server-api/#get_matrixclientv3sync
  916. the `sync_result_builder` is passed as a mutable ("inout") parameter to various
  917. helper functions. These retrieve and process the data which forms the sync body,
  918. often writing to the `sync_result_builder` to store their output.
  919. At the end, we transfer data from the `sync_result_builder` to a new `SyncResult`
  920. instance to signify that the sync calculation is complete.
  921. """
  922. # NB: The now_token gets changed by some of the generate_sync_* methods,
  923. # this is due to some of the underlying streams not supporting the ability
  924. # to query up to a given point.
  925. # Always use the `now_token` in `SyncResultBuilder`
  926. now_token = self.event_sources.get_current_token()
  927. log_kv({"now_token": now_token})
  928. logger.debug(
  929. "Calculating sync response for %r between %s and %s",
  930. sync_config.user,
  931. since_token,
  932. now_token,
  933. )
  934. user_id = sync_config.user.to_string()
  935. app_service = self.store.get_app_service_by_user_id(user_id)
  936. if app_service:
  937. # We no longer support AS users using /sync directly.
  938. # See https://github.com/matrix-org/matrix-doc/issues/1144
  939. raise NotImplementedError()
  940. else:
  941. joined_room_ids = await self.get_rooms_for_user_at(
  942. user_id, now_token.room_key
  943. )
  944. sync_result_builder = SyncResultBuilder(
  945. sync_config,
  946. full_state,
  947. since_token=since_token,
  948. now_token=now_token,
  949. joined_room_ids=joined_room_ids,
  950. )
  951. logger.debug("Fetching account data")
  952. account_data_by_room = await self._generate_sync_entry_for_account_data(
  953. sync_result_builder
  954. )
  955. logger.debug("Fetching room data")
  956. res = await self._generate_sync_entry_for_rooms(
  957. sync_result_builder, account_data_by_room
  958. )
  959. newly_joined_rooms, newly_joined_or_invited_or_knocked_users, _, _ = res
  960. _, _, newly_left_rooms, newly_left_users = res
  961. block_all_presence_data = (
  962. since_token is None and sync_config.filter_collection.blocks_all_presence()
  963. )
  964. if self.hs_config.server.use_presence and not block_all_presence_data:
  965. logger.debug("Fetching presence data")
  966. await self._generate_sync_entry_for_presence(
  967. sync_result_builder,
  968. newly_joined_rooms,
  969. newly_joined_or_invited_or_knocked_users,
  970. )
  971. logger.debug("Fetching to-device data")
  972. await self._generate_sync_entry_for_to_device(sync_result_builder)
  973. device_lists = await self._generate_sync_entry_for_device_list(
  974. sync_result_builder,
  975. newly_joined_rooms=newly_joined_rooms,
  976. newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
  977. newly_left_rooms=newly_left_rooms,
  978. newly_left_users=newly_left_users,
  979. )
  980. logger.debug("Fetching OTK data")
  981. device_id = sync_config.device_id
  982. one_time_key_counts: JsonDict = {}
  983. unused_fallback_key_types: List[str] = []
  984. if device_id:
  985. # TODO: We should have a way to let clients differentiate between the states of:
  986. # * no change in OTK count since the provided since token
  987. # * the server has zero OTKs left for this device
  988. # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
  989. one_time_key_counts = await self.store.count_e2e_one_time_keys(
  990. user_id, device_id
  991. )
  992. unused_fallback_key_types = (
  993. await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
  994. )
  995. if self.hs_config.experimental.groups_enabled:
  996. logger.debug("Fetching group data")
  997. await self._generate_sync_entry_for_groups(sync_result_builder)
  998. num_events = 0
  999. # debug for https://github.com/matrix-org/synapse/issues/9424
  1000. for joined_room in sync_result_builder.joined:
  1001. num_events += len(joined_room.timeline.events)
  1002. log_kv(
  1003. {
  1004. "joined_rooms_in_result": len(sync_result_builder.joined),
  1005. "events_in_result": num_events,
  1006. }
  1007. )
  1008. logger.debug("Sync response calculation complete")
  1009. return SyncResult(
  1010. presence=sync_result_builder.presence,
  1011. account_data=sync_result_builder.account_data,
  1012. joined=sync_result_builder.joined,
  1013. invited=sync_result_builder.invited,
  1014. knocked=sync_result_builder.knocked,
  1015. archived=sync_result_builder.archived,
  1016. to_device=sync_result_builder.to_device,
  1017. device_lists=device_lists,
  1018. groups=sync_result_builder.groups,
  1019. device_one_time_keys_count=one_time_key_counts,
  1020. device_unused_fallback_key_types=unused_fallback_key_types,
  1021. next_batch=sync_result_builder.now_token,
  1022. )
  1023. @measure_func("_generate_sync_entry_for_groups")
  1024. async def _generate_sync_entry_for_groups(
  1025. self, sync_result_builder: "SyncResultBuilder"
  1026. ) -> None:
  1027. user_id = sync_result_builder.sync_config.user.to_string()
  1028. since_token = sync_result_builder.since_token
  1029. now_token = sync_result_builder.now_token
  1030. if since_token and since_token.groups_key:
  1031. results = await self.store.get_groups_changes_for_user(
  1032. user_id, since_token.groups_key, now_token.groups_key
  1033. )
  1034. else:
  1035. results = await self.store.get_all_groups_for_user(
  1036. user_id, now_token.groups_key
  1037. )
  1038. invited = {}
  1039. joined = {}
  1040. left = {}
  1041. for result in results:
  1042. membership = result["membership"]
  1043. group_id = result["group_id"]
  1044. gtype = result["type"]
  1045. content = result["content"]
  1046. if membership == "join":
  1047. if gtype == "membership":
  1048. # TODO: Add profile
  1049. content.pop("membership", None)
  1050. joined[group_id] = content["content"]
  1051. else:
  1052. joined.setdefault(group_id, {})[gtype] = content
  1053. elif membership == "invite":
  1054. if gtype == "membership":
  1055. content.pop("membership", None)
  1056. invited[group_id] = content["content"]
  1057. else:
  1058. if gtype == "membership":
  1059. left[group_id] = content["content"]
  1060. sync_result_builder.groups = GroupsSyncResult(
  1061. join=joined, invite=invited, leave=left
  1062. )
  1063. @measure_func("_generate_sync_entry_for_device_list")
  1064. async def _generate_sync_entry_for_device_list(
  1065. self,
  1066. sync_result_builder: "SyncResultBuilder",
  1067. newly_joined_rooms: Set[str],
  1068. newly_joined_or_invited_or_knocked_users: Set[str],
  1069. newly_left_rooms: Set[str],
  1070. newly_left_users: Set[str],
  1071. ) -> DeviceListUpdates:
  1072. """Generate the DeviceListUpdates section of sync
  1073. Args:
  1074. sync_result_builder
  1075. newly_joined_rooms: Set of rooms user has joined since previous sync
  1076. newly_joined_or_invited_or_knocked_users: Set of users that have joined,
  1077. been invited to a room or are knocking on a room since
  1078. previous sync.
  1079. newly_left_rooms: Set of rooms user has left since previous sync
  1080. newly_left_users: Set of users that have left a room we're in since
  1081. previous sync
  1082. """
  1083. user_id = sync_result_builder.sync_config.user.to_string()
  1084. since_token = sync_result_builder.since_token
  1085. # We're going to mutate these fields, so lets copy them rather than
  1086. # assume they won't get used later.
  1087. newly_joined_or_invited_or_knocked_users = set(
  1088. newly_joined_or_invited_or_knocked_users
  1089. )
  1090. newly_left_users = set(newly_left_users)
  1091. if since_token and since_token.device_list_key:
  1092. # We want to figure out what user IDs the client should refetch
  1093. # device keys for, and which users we aren't going to track changes
  1094. # for anymore.
  1095. #
  1096. # For the first step we check:
  1097. # a. if any users we share a room with have updated their devices,
  1098. # and
  1099. # b. we also check if we've joined any new rooms, or if a user has
  1100. # joined a room we're in.
  1101. #
  1102. # For the second step we just find any users we no longer share a
  1103. # room with by looking at all users that have left a room plus users
  1104. # that were in a room we've left.
  1105. users_that_have_changed = set()
  1106. joined_rooms = sync_result_builder.joined_room_ids
  1107. # Step 1a, check for changes in devices of users we share a room
  1108. # with
  1109. #
  1110. # We do this in two different ways depending on what we have cached.
  1111. # If we already have a list of all the user that have changed since
  1112. # the last sync then it's likely more efficient to compare the rooms
  1113. # they're in with the rooms the syncing user is in.
  1114. #
  1115. # If we don't have that info cached then we get all the users that
  1116. # share a room with our user and check if those users have changed.
  1117. changed_users = self.store.get_cached_device_list_changes(
  1118. since_token.device_list_key
  1119. )
  1120. if changed_users is not None:
  1121. result = await self.store.get_rooms_for_users_with_stream_ordering(
  1122. changed_users
  1123. )
  1124. for changed_user_id, entries in result.items():
  1125. # Check if the changed user shares any rooms with the user,
  1126. # or if the changed user is the syncing user (as we always
  1127. # want to include device list updates of their own devices).
  1128. if user_id == changed_user_id or any(
  1129. e.room_id in joined_rooms for e in entries
  1130. ):
  1131. users_that_have_changed.add(changed_user_id)
  1132. else:
  1133. users_who_share_room = (
  1134. await self.store.get_users_who_share_room_with_user(user_id)
  1135. )
  1136. # Always tell the user about their own devices. We check as the user
  1137. # ID is almost certainly already included (unless they're not in any
  1138. # rooms) and taking a copy of the set is relatively expensive.
  1139. if user_id not in users_who_share_room:
  1140. users_who_share_room = set(users_who_share_room)
  1141. users_who_share_room.add(user_id)
  1142. tracked_users = users_who_share_room
  1143. users_that_have_changed = (
  1144. await self.store.get_users_whose_devices_changed(
  1145. since_token.device_list_key, tracked_users
  1146. )
  1147. )
  1148. # Step 1b, check for newly joined rooms
  1149. for room_id in newly_joined_rooms:
  1150. joined_users = await self.store.get_users_in_room(room_id)
  1151. newly_joined_or_invited_or_knocked_users.update(joined_users)
  1152. # TODO: Check that these users are actually new, i.e. either they
  1153. # weren't in the previous sync *or* they left and rejoined.
  1154. users_that_have_changed.update(newly_joined_or_invited_or_knocked_users)
  1155. user_signatures_changed = (
  1156. await self.store.get_users_whose_signatures_changed(
  1157. user_id, since_token.device_list_key
  1158. )
  1159. )
  1160. users_that_have_changed.update(user_signatures_changed)
  1161. # Now find users that we no longer track
  1162. for room_id in newly_left_rooms:
  1163. left_users = await self.store.get_users_in_room(room_id)
  1164. newly_left_users.update(left_users)
  1165. # Remove any users that we still share a room with.
  1166. left_users_rooms = (
  1167. await self.store.get_rooms_for_users_with_stream_ordering(
  1168. newly_left_users
  1169. )
  1170. )
  1171. for user_id, entries in left_users_rooms.items():
  1172. if any(e.room_id in joined_rooms for e in entries):
  1173. newly_left_users.discard(user_id)
  1174. return DeviceListUpdates(
  1175. changed=users_that_have_changed, left=newly_left_users
  1176. )
  1177. else:
  1178. return DeviceListUpdates()
  1179. async def _generate_sync_entry_for_to_device(
  1180. self, sync_result_builder: "SyncResultBuilder"
  1181. ) -> None:
  1182. """Generates the portion of the sync response. Populates
  1183. `sync_result_builder` with the result.
  1184. """
  1185. user_id = sync_result_builder.sync_config.user.to_string()
  1186. device_id = sync_result_builder.sync_config.device_id
  1187. now_token = sync_result_builder.now_token
  1188. since_stream_id = 0
  1189. if sync_result_builder.since_token is not None:
  1190. since_stream_id = int(sync_result_builder.since_token.to_device_key)
  1191. if device_id is not None and since_stream_id != int(now_token.to_device_key):
  1192. messages, stream_id = await self.store.get_messages_for_device(
  1193. user_id, device_id, since_stream_id, now_token.to_device_key
  1194. )
  1195. for message in messages:
  1196. # We pop here as we shouldn't be sending the message ID down
  1197. # `/sync`
  1198. message_id = message.pop("message_id", None)
  1199. if message_id:
  1200. set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
  1201. logger.debug(
  1202. "Returning %d to-device messages between %d and %d (current token: %d)",
  1203. len(messages),
  1204. since_stream_id,
  1205. stream_id,
  1206. now_token.to_device_key,
  1207. )
  1208. sync_result_builder.now_token = now_token.copy_and_replace(
  1209. StreamKeyType.TO_DEVICE, stream_id
  1210. )
  1211. sync_result_builder.to_device = messages
  1212. else:
  1213. sync_result_builder.to_device = []
  1214. async def _generate_sync_entry_for_account_data(
  1215. self, sync_result_builder: "SyncResultBuilder"
  1216. ) -> Dict[str, Dict[str, JsonDict]]:
  1217. """Generates the account data portion of the sync response.
  1218. Account data (called "Client Config" in the spec) can be set either globally
  1219. or for a specific room. Account data consists of a list of events which
  1220. accumulate state, much like a room.
  1221. This function retrieves global and per-room account data. The former is written
  1222. to the given `sync_result_builder`. The latter is returned directly, to be
  1223. later written to the `sync_result_builder` on a room-by-room basis.
  1224. Args:
  1225. sync_result_builder
  1226. Returns:
  1227. A dictionary whose keys (room ids) map to the per room account data for that
  1228. room.
  1229. """
  1230. sync_config = sync_result_builder.sync_config
  1231. user_id = sync_result_builder.sync_config.user.to_string()
  1232. since_token = sync_result_builder.since_token
  1233. if since_token and not sync_result_builder.full_state:
  1234. (
  1235. global_account_data,
  1236. account_data_by_room,
  1237. ) = await self.store.get_updated_account_data_for_user(
  1238. user_id, since_token.account_data_key
  1239. )
  1240. push_rules_changed = await self.store.have_push_rules_changed_for_user(
  1241. user_id, int(since_token.push_rules_key)
  1242. )
  1243. if push_rules_changed:
  1244. global_account_data["m.push_rules"] = await self.push_rules_for_user(
  1245. sync_config.user
  1246. )
  1247. else:
  1248. (
  1249. global_account_data,
  1250. account_data_by_room,
  1251. ) = await self.store.get_account_data_for_user(sync_config.user.to_string())
  1252. global_account_data["m.push_rules"] = await self.push_rules_for_user(
  1253. sync_config.user
  1254. )
  1255. account_data_for_user = await sync_config.filter_collection.filter_account_data(
  1256. [
  1257. {"type": account_data_type, "content": content}
  1258. for account_data_type, content in global_account_data.items()
  1259. ]
  1260. )
  1261. sync_result_builder.account_data = account_data_for_user
  1262. return account_data_by_room
  1263. async def _generate_sync_entry_for_presence(
  1264. self,
  1265. sync_result_builder: "SyncResultBuilder",
  1266. newly_joined_rooms: Set[str],
  1267. newly_joined_or_invited_users: Set[str],
  1268. ) -> None:
  1269. """Generates the presence portion of the sync response. Populates the
  1270. `sync_result_builder` with the result.
  1271. Args:
  1272. sync_result_builder
  1273. newly_joined_rooms: Set of rooms that the user has joined since
  1274. the last sync (or empty if an initial sync)
  1275. newly_joined_or_invited_users: Set of users that have joined or
  1276. been invited to rooms since the last sync (or empty if an
  1277. initial sync)
  1278. """
  1279. now_token = sync_result_builder.now_token
  1280. sync_config = sync_result_builder.sync_config
  1281. user = sync_result_builder.sync_config.user
  1282. presence_source = self.event_sources.sources.presence
  1283. since_token = sync_result_builder.since_token
  1284. presence_key = None
  1285. include_offline = False
  1286. if since_token and not sync_result_builder.full_state:
  1287. presence_key = since_token.presence_key
  1288. include_offline = True
  1289. presence, presence_key = await presence_source.get_new_events(
  1290. user=user,
  1291. from_key=presence_key,
  1292. is_guest=sync_config.is_guest,
  1293. include_offline=include_offline,
  1294. )
  1295. assert presence_key
  1296. sync_result_builder.now_token = now_token.copy_and_replace(
  1297. StreamKeyType.PRESENCE, presence_key
  1298. )
  1299. extra_users_ids = set(newly_joined_or_invited_users)
  1300. for room_id in newly_joined_rooms:
  1301. users = await self.store.get_users_in_room(room_id)
  1302. extra_users_ids.update(users)
  1303. extra_users_ids.discard(user.to_string())
  1304. if extra_users_ids:
  1305. states = await self.presence_handler.get_states(extra_users_ids)
  1306. presence.extend(states)
  1307. # Deduplicate the presence entries so that there's at most one per user
  1308. presence = list({p.user_id: p for p in presence}.values())
  1309. presence = await sync_config.filter_collection.filter_presence(presence)
  1310. sync_result_builder.presence = presence
  1311. async def _generate_sync_entry_for_rooms(
  1312. self,
  1313. sync_result_builder: "SyncResultBuilder",
  1314. account_data_by_room: Dict[str, Dict[str, JsonDict]],
  1315. ) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
  1316. """Generates the rooms portion of the sync response. Populates the
  1317. `sync_result_builder` with the result.
  1318. In the response that reaches the client, rooms are divided into four categories:
  1319. `invite`, `join`, `knock`, `leave`. These aren't the same as the four sets of
  1320. room ids returned by this function.
  1321. Args:
  1322. sync_result_builder
  1323. account_data_by_room: Dictionary of per room account data
  1324. Returns:
  1325. Returns a 4-tuple describing rooms the user has joined or left, and users who've
  1326. joined or left rooms any rooms the user is in. This gets used later in
  1327. `_generate_sync_entry_for_device_list`.
  1328. Its entries are:
  1329. - newly_joined_rooms
  1330. - newly_joined_or_invited_or_knocked_users
  1331. - newly_left_rooms
  1332. - newly_left_users
  1333. """
  1334. since_token = sync_result_builder.since_token
  1335. # 1. Start by fetching all ephemeral events in rooms we've joined (if required).
  1336. user_id = sync_result_builder.sync_config.user.to_string()
  1337. block_all_room_ephemeral = (
  1338. since_token is None
  1339. and sync_result_builder.sync_config.filter_collection.blocks_all_room_ephemeral()
  1340. )
  1341. if block_all_room_ephemeral:
  1342. ephemeral_by_room: Dict[str, List[JsonDict]] = {}
  1343. else:
  1344. now_token, ephemeral_by_room = await self.ephemeral_by_room(
  1345. sync_result_builder,
  1346. now_token=sync_result_builder.now_token,
  1347. since_token=sync_result_builder.since_token,
  1348. )
  1349. sync_result_builder.now_token = now_token
  1350. # 2. We check up front if anything has changed, if it hasn't then there is
  1351. # no point in going further.
  1352. if not sync_result_builder.full_state:
  1353. if since_token and not ephemeral_by_room and not account_data_by_room:
  1354. have_changed = await self._have_rooms_changed(sync_result_builder)
  1355. log_kv({"rooms_have_changed": have_changed})
  1356. if not have_changed:
  1357. tags_by_room = await self.store.get_updated_tags(
  1358. user_id, since_token.account_data_key
  1359. )
  1360. if not tags_by_room:
  1361. logger.debug("no-oping sync")
  1362. return set(), set(), set(), set()
  1363. # 3. Work out which rooms need reporting in the sync response.
  1364. ignored_users = await self.store.ignored_users(user_id)
  1365. if since_token:
  1366. room_changes = await self._get_rooms_changed(
  1367. sync_result_builder, ignored_users, self.rooms_to_exclude
  1368. )
  1369. tags_by_room = await self.store.get_updated_tags(
  1370. user_id, since_token.account_data_key
  1371. )
  1372. else:
  1373. room_changes = await self._get_all_rooms(
  1374. sync_result_builder, ignored_users, self.rooms_to_exclude
  1375. )
  1376. tags_by_room = await self.store.get_tags_for_user(user_id)
  1377. log_kv({"rooms_changed": len(room_changes.room_entries)})
  1378. room_entries = room_changes.room_entries
  1379. invited = room_changes.invited
  1380. knocked = room_changes.knocked
  1381. newly_joined_rooms = room_changes.newly_joined_rooms
  1382. newly_left_rooms = room_changes.newly_left_rooms
  1383. # 4. We need to apply further processing to `room_entries` (rooms considered
  1384. # joined or archived).
  1385. async def handle_room_entries(room_entry: "RoomSyncResultBuilder") -> None:
  1386. logger.debug("Generating room entry for %s", room_entry.room_id)
  1387. await self._generate_room_entry(
  1388. sync_result_builder,
  1389. room_entry,
  1390. ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
  1391. tags=tags_by_room.get(room_entry.room_id),
  1392. account_data=account_data_by_room.get(room_entry.room_id, {}),
  1393. always_include=sync_result_builder.full_state,
  1394. )
  1395. logger.debug("Generated room entry for %s", room_entry.room_id)
  1396. with start_active_span("sync.generate_room_entries"):
  1397. await concurrently_execute(handle_room_entries, room_entries, 10)
  1398. sync_result_builder.invited.extend(invited)
  1399. sync_result_builder.knocked.extend(knocked)
  1400. # 5. Work out which users have joined or left rooms we're in. We use this
  1401. # to build the device_list part of the sync response in
  1402. # `_generate_sync_entry_for_device_list`.
  1403. (
  1404. newly_joined_or_invited_or_knocked_users,
  1405. newly_left_users,
  1406. ) = sync_result_builder.calculate_user_changes()
  1407. return (
  1408. set(newly_joined_rooms),
  1409. newly_joined_or_invited_or_knocked_users,
  1410. set(newly_left_rooms),
  1411. newly_left_users,
  1412. )
  1413. async def _have_rooms_changed(
  1414. self, sync_result_builder: "SyncResultBuilder"
  1415. ) -> bool:
  1416. """Returns whether there may be any new events that should be sent down
  1417. the sync. Returns True if there are.
  1418. Does not modify the `sync_result_builder`.
  1419. """
  1420. user_id = sync_result_builder.sync_config.user.to_string()
  1421. since_token = sync_result_builder.since_token
  1422. now_token = sync_result_builder.now_token
  1423. assert since_token
  1424. # Get a list of membership change events that have happened to the user
  1425. # requesting the sync.
  1426. membership_changes = await self.store.get_membership_changes_for_user(
  1427. user_id, since_token.room_key, now_token.room_key
  1428. )
  1429. if membership_changes:
  1430. return True
  1431. stream_id = since_token.room_key.stream
  1432. for room_id in sync_result_builder.joined_room_ids:
  1433. if self.store.has_room_changed_since(room_id, stream_id):
  1434. return True
  1435. return False
  1436. async def _get_rooms_changed(
  1437. self,
  1438. sync_result_builder: "SyncResultBuilder",
  1439. ignored_users: FrozenSet[str],
  1440. excluded_rooms: List[str],
  1441. ) -> _RoomChanges:
  1442. """Determine the changes in rooms to report to the user.
  1443. This function is a first pass at generating the rooms part of the sync response.
  1444. It determines which rooms have changed during the sync period, and categorises
  1445. them into four buckets: "knock", "invite", "join" and "leave".
  1446. 1. Finds all membership changes for the user in the sync period (from
  1447. `since_token` up to `now_token`).
  1448. 2. Uses those to place the room in one of the four categories above.
  1449. 3. Builds a `_RoomChanges` struct to record this, and return that struct.
  1450. For rooms classified as "knock", "invite" or "leave", we just need to report
  1451. a single membership event in the eventual /sync response. For "join" we need
  1452. to fetch additional non-membership events, e.g. messages in the room. That is
  1453. more complicated, so instead we report an intermediary `RoomSyncResultBuilder`
  1454. struct, and leave the additional work to `_generate_room_entry`.
  1455. The sync_result_builder is not modified by this function.
  1456. """
  1457. user_id = sync_result_builder.sync_config.user.to_string()
  1458. since_token = sync_result_builder.since_token
  1459. now_token = sync_result_builder.now_token
  1460. sync_config = sync_result_builder.sync_config
  1461. assert since_token
  1462. # TODO: we've already called this function and ran this query in
  1463. # _have_rooms_changed. We could keep the results in memory to avoid a
  1464. # second query, at the cost of more complicated source code.
  1465. membership_change_events = await self.store.get_membership_changes_for_user(
  1466. user_id, since_token.room_key, now_token.room_key, excluded_rooms
  1467. )
  1468. mem_change_events_by_room_id: Dict[str, List[EventBase]] = {}
  1469. for event in membership_change_events:
  1470. mem_change_events_by_room_id.setdefault(event.room_id, []).append(event)
  1471. newly_joined_rooms: List[str] = []
  1472. newly_left_rooms: List[str] = []
  1473. room_entries: List[RoomSyncResultBuilder] = []
  1474. invited: List[InvitedSyncResult] = []
  1475. knocked: List[KnockedSyncResult] = []
  1476. for room_id, events in mem_change_events_by_room_id.items():
  1477. # The body of this loop will add this room to at least one of the five lists
  1478. # above. Things get messy if you've e.g. joined, left, joined then left the
  1479. # room all in the same sync period.
  1480. logger.debug(
  1481. "Membership changes in %s: [%s]",
  1482. room_id,
  1483. ", ".join("%s (%s)" % (e.event_id, e.membership) for e in events),
  1484. )
  1485. non_joins = [e for e in events if e.membership != Membership.JOIN]
  1486. has_join = len(non_joins) != len(events)
  1487. # We want to figure out if we joined the room at some point since
  1488. # the last sync (even if we have since left). This is to make sure
  1489. # we do send down the room, and with full state, where necessary
  1490. old_state_ids = None
  1491. if room_id in sync_result_builder.joined_room_ids and non_joins:
  1492. # Always include if the user (re)joined the room, especially
  1493. # important so that device list changes are calculated correctly.
  1494. # If there are non-join member events, but we are still in the room,
  1495. # then the user must have left and joined
  1496. newly_joined_rooms.append(room_id)
  1497. # User is in the room so we don't need to do the invite/leave checks
  1498. continue
  1499. if room_id in sync_result_builder.joined_room_ids or has_join:
  1500. old_state_ids = await self.get_state_at(room_id, since_token)
  1501. old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
  1502. old_mem_ev = None
  1503. if old_mem_ev_id:
  1504. old_mem_ev = await self.store.get_event(
  1505. old_mem_ev_id, allow_none=True
  1506. )
  1507. if not old_mem_ev or old_mem_ev.membership != Membership.JOIN:
  1508. newly_joined_rooms.append(room_id)
  1509. # If user is in the room then we don't need to do the invite/leave checks
  1510. if room_id in sync_result_builder.joined_room_ids:
  1511. continue
  1512. if not non_joins:
  1513. continue
  1514. last_non_join = non_joins[-1]
  1515. # Check if we have left the room. This can either be because we were
  1516. # joined before *or* that we since joined and then left.
  1517. if events[-1].membership != Membership.JOIN:
  1518. if has_join:
  1519. newly_left_rooms.append(room_id)
  1520. else:
  1521. if not old_state_ids:
  1522. old_state_ids = await self.get_state_at(room_id, since_token)
  1523. old_mem_ev_id = old_state_ids.get(
  1524. (EventTypes.Member, user_id), None
  1525. )
  1526. old_mem_ev = None
  1527. if old_mem_ev_id:
  1528. old_mem_ev = await self.store.get_event(
  1529. old_mem_ev_id, allow_none=True
  1530. )
  1531. if old_mem_ev and old_mem_ev.membership == Membership.JOIN:
  1532. newly_left_rooms.append(room_id)
  1533. # Only bother if we're still currently invited
  1534. should_invite = last_non_join.membership == Membership.INVITE
  1535. if should_invite:
  1536. if last_non_join.sender not in ignored_users:
  1537. invite_room_sync = InvitedSyncResult(room_id, invite=last_non_join)
  1538. if invite_room_sync:
  1539. invited.append(invite_room_sync)
  1540. # Only bother if our latest membership in the room is knock (and we haven't
  1541. # been accepted/rejected in the meantime).
  1542. should_knock = last_non_join.membership == Membership.KNOCK
  1543. if should_knock:
  1544. knock_room_sync = KnockedSyncResult(room_id, knock=last_non_join)
  1545. if knock_room_sync:
  1546. knocked.append(knock_room_sync)
  1547. # Always include leave/ban events. Just take the last one.
  1548. # TODO: How do we handle ban -> leave in same batch?
  1549. leave_events = [
  1550. e
  1551. for e in non_joins
  1552. if e.membership in (Membership.LEAVE, Membership.BAN)
  1553. ]
  1554. if leave_events:
  1555. leave_event = leave_events[-1]
  1556. leave_position = await self.store.get_position_for_event(
  1557. leave_event.event_id
  1558. )
  1559. # If the leave event happened before the since token then we
  1560. # bail.
  1561. if since_token and not leave_position.persisted_after(
  1562. since_token.room_key
  1563. ):
  1564. continue
  1565. # We can safely convert the position of the leave event into a
  1566. # stream token as it'll only be used in the context of this
  1567. # room. (c.f. the docstring of `to_room_stream_token`).
  1568. leave_token = since_token.copy_and_replace(
  1569. StreamKeyType.ROOM, leave_position.to_room_stream_token()
  1570. )
  1571. # If this is an out of band message, like a remote invite
  1572. # rejection, we include it in the recents batch. Otherwise, we
  1573. # let _load_filtered_recents handle fetching the correct
  1574. # batches.
  1575. #
  1576. # This is all screaming out for a refactor, as the logic here is
  1577. # subtle and the moving parts numerous.
  1578. if leave_event.internal_metadata.is_out_of_band_membership():
  1579. batch_events: Optional[List[EventBase]] = [leave_event]
  1580. else:
  1581. batch_events = None
  1582. room_entries.append(
  1583. RoomSyncResultBuilder(
  1584. room_id=room_id,
  1585. rtype="archived",
  1586. events=batch_events,
  1587. newly_joined=room_id in newly_joined_rooms,
  1588. full_state=False,
  1589. since_token=since_token,
  1590. upto_token=leave_token,
  1591. out_of_band=leave_event.internal_metadata.is_out_of_band_membership(),
  1592. )
  1593. )
  1594. timeline_limit = sync_config.filter_collection.timeline_limit()
  1595. # Get all events since the `from_key` in rooms we're currently joined to.
  1596. # If there are too many, we get the most recent events only. This leaves
  1597. # a "gap" in the timeline, as described by the spec for /sync.
  1598. room_to_events = await self.store.get_room_events_stream_for_rooms(
  1599. room_ids=sync_result_builder.joined_room_ids,
  1600. from_key=since_token.room_key,
  1601. to_key=now_token.room_key,
  1602. limit=timeline_limit + 1,
  1603. )
  1604. # We loop through all room ids, even if there are no new events, in case
  1605. # there are non room events that we need to notify about.
  1606. for room_id in sync_result_builder.joined_room_ids:
  1607. room_entry = room_to_events.get(room_id, None)
  1608. newly_joined = room_id in newly_joined_rooms
  1609. if room_entry:
  1610. events, start_key = room_entry
  1611. prev_batch_token = now_token.copy_and_replace(
  1612. StreamKeyType.ROOM, start_key
  1613. )
  1614. entry = RoomSyncResultBuilder(
  1615. room_id=room_id,
  1616. rtype="joined",
  1617. events=events,
  1618. newly_joined=newly_joined,
  1619. full_state=False,
  1620. since_token=None if newly_joined else since_token,
  1621. upto_token=prev_batch_token,
  1622. )
  1623. else:
  1624. entry = RoomSyncResultBuilder(
  1625. room_id=room_id,
  1626. rtype="joined",
  1627. events=[],
  1628. newly_joined=newly_joined,
  1629. full_state=False,
  1630. since_token=since_token,
  1631. upto_token=since_token,
  1632. )
  1633. room_entries.append(entry)
  1634. return _RoomChanges(
  1635. room_entries,
  1636. invited,
  1637. knocked,
  1638. newly_joined_rooms,
  1639. newly_left_rooms,
  1640. )
  1641. async def _get_all_rooms(
  1642. self,
  1643. sync_result_builder: "SyncResultBuilder",
  1644. ignored_users: FrozenSet[str],
  1645. ignored_rooms: List[str],
  1646. ) -> _RoomChanges:
  1647. """Returns entries for all rooms for the user.
  1648. Like `_get_rooms_changed`, but assumes the `since_token` is `None`.
  1649. This function does not modify the sync_result_builder.
  1650. Args:
  1651. sync_result_builder
  1652. ignored_users: Set of users ignored by user.
  1653. ignored_rooms: List of rooms to ignore.
  1654. """
  1655. user_id = sync_result_builder.sync_config.user.to_string()
  1656. since_token = sync_result_builder.since_token
  1657. now_token = sync_result_builder.now_token
  1658. sync_config = sync_result_builder.sync_config
  1659. room_list = await self.store.get_rooms_for_local_user_where_membership_is(
  1660. user_id=user_id,
  1661. membership_list=Membership.LIST,
  1662. excluded_rooms=ignored_rooms,
  1663. )
  1664. room_entries = []
  1665. invited = []
  1666. knocked = []
  1667. for event in room_list:
  1668. if event.room_version_id not in KNOWN_ROOM_VERSIONS:
  1669. continue
  1670. if event.membership == Membership.JOIN:
  1671. room_entries.append(
  1672. RoomSyncResultBuilder(
  1673. room_id=event.room_id,
  1674. rtype="joined",
  1675. events=None,
  1676. newly_joined=False,
  1677. full_state=True,
  1678. since_token=since_token,
  1679. upto_token=now_token,
  1680. )
  1681. )
  1682. elif event.membership == Membership.INVITE:
  1683. if event.sender in ignored_users:
  1684. continue
  1685. invite = await self.store.get_event(event.event_id)
  1686. invited.append(InvitedSyncResult(room_id=event.room_id, invite=invite))
  1687. elif event.membership == Membership.KNOCK:
  1688. knock = await self.store.get_event(event.event_id)
  1689. knocked.append(KnockedSyncResult(room_id=event.room_id, knock=knock))
  1690. elif event.membership in (Membership.LEAVE, Membership.BAN):
  1691. # Always send down rooms we were banned from or kicked from.
  1692. if not sync_config.filter_collection.include_leave:
  1693. if event.membership == Membership.LEAVE:
  1694. if user_id == event.sender:
  1695. continue
  1696. leave_token = now_token.copy_and_replace(
  1697. StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering)
  1698. )
  1699. room_entries.append(
  1700. RoomSyncResultBuilder(
  1701. room_id=event.room_id,
  1702. rtype="archived",
  1703. events=None,
  1704. newly_joined=False,
  1705. full_state=True,
  1706. since_token=since_token,
  1707. upto_token=leave_token,
  1708. )
  1709. )
  1710. return _RoomChanges(room_entries, invited, knocked, [], [])
  1711. async def _generate_room_entry(
  1712. self,
  1713. sync_result_builder: "SyncResultBuilder",
  1714. room_builder: "RoomSyncResultBuilder",
  1715. ephemeral: List[JsonDict],
  1716. tags: Optional[Dict[str, Dict[str, Any]]],
  1717. account_data: Dict[str, JsonDict],
  1718. always_include: bool = False,
  1719. ) -> None:
  1720. """Populates the `joined` and `archived` section of `sync_result_builder`
  1721. based on the `room_builder`.
  1722. Ideally, we want to report all events whose stream ordering `s` lies in the
  1723. range `since_token < s <= now_token`, where the two tokens are read from the
  1724. sync_result_builder.
  1725. If there are too many events in that range to report, things get complicated.
  1726. In this situation we return a truncated list of the most recent events, and
  1727. indicate in the response that there is a "gap" of omitted events. Lots of this
  1728. is handled in `_load_filtered_recents`, but some of is handled in this method.
  1729. Additionally:
  1730. - we include a "state_delta", to describe the changes in state over the gap,
  1731. - we include all membership events applying to the user making the request,
  1732. even those in the gap.
  1733. See the spec for the rationale:
  1734. https://spec.matrix.org/v1.1/client-server-api/#syncing
  1735. Args:
  1736. sync_result_builder
  1737. room_builder
  1738. ephemeral: List of new ephemeral events for room
  1739. tags: List of *all* tags for room, or None if there has been
  1740. no change.
  1741. account_data: List of new account data for room
  1742. always_include: Always include this room in the sync response,
  1743. even if empty.
  1744. """
  1745. newly_joined = room_builder.newly_joined
  1746. full_state = (
  1747. room_builder.full_state or newly_joined or sync_result_builder.full_state
  1748. )
  1749. events = room_builder.events
  1750. # We want to shortcut out as early as possible.
  1751. if not (always_include or account_data or ephemeral or full_state):
  1752. if events == [] and tags is None:
  1753. return
  1754. now_token = sync_result_builder.now_token
  1755. sync_config = sync_result_builder.sync_config
  1756. room_id = room_builder.room_id
  1757. since_token = room_builder.since_token
  1758. upto_token = room_builder.upto_token
  1759. with start_active_span("sync.generate_room_entry"):
  1760. set_tag("room_id", room_id)
  1761. log_kv({"events": len(events or ())})
  1762. log_kv(
  1763. {
  1764. "since_token": since_token,
  1765. "upto_token": upto_token,
  1766. }
  1767. )
  1768. batch = await self._load_filtered_recents(
  1769. room_id,
  1770. sync_config,
  1771. now_token=upto_token,
  1772. since_token=since_token,
  1773. potential_recents=events,
  1774. newly_joined_room=newly_joined,
  1775. )
  1776. log_kv(
  1777. {
  1778. "batch_events": len(batch.events),
  1779. "prev_batch": batch.prev_batch,
  1780. "batch_limited": batch.limited,
  1781. }
  1782. )
  1783. # Note: `batch` can be both empty and limited here in the case where
  1784. # `_load_filtered_recents` can't find any events the user should see
  1785. # (e.g. due to having ignored the sender of the last 50 events).
  1786. # When we join the room (or the client requests full_state), we should
  1787. # send down any existing tags. Usually the user won't have tags in a
  1788. # newly joined room, unless either a) they've joined before or b) the
  1789. # tag was added by synapse e.g. for server notice rooms.
  1790. if full_state:
  1791. user_id = sync_result_builder.sync_config.user.to_string()
  1792. tags = await self.store.get_tags_for_room(user_id, room_id)
  1793. # If there aren't any tags, don't send the empty tags list down
  1794. # sync
  1795. if not tags:
  1796. tags = None
  1797. account_data_events = []
  1798. if tags is not None:
  1799. account_data_events.append({"type": "m.tag", "content": {"tags": tags}})
  1800. for account_data_type, content in account_data.items():
  1801. account_data_events.append(
  1802. {"type": account_data_type, "content": content}
  1803. )
  1804. account_data_events = (
  1805. await sync_config.filter_collection.filter_room_account_data(
  1806. account_data_events
  1807. )
  1808. )
  1809. ephemeral = await sync_config.filter_collection.filter_room_ephemeral(
  1810. ephemeral
  1811. )
  1812. if not (
  1813. always_include
  1814. or batch
  1815. or account_data_events
  1816. or ephemeral
  1817. or full_state
  1818. ):
  1819. return
  1820. if not room_builder.out_of_band:
  1821. state = await self.compute_state_delta(
  1822. room_id,
  1823. batch,
  1824. sync_config,
  1825. since_token,
  1826. now_token,
  1827. full_state=full_state,
  1828. )
  1829. else:
  1830. # An out of band room won't have any state changes.
  1831. state = {}
  1832. summary: Optional[JsonDict] = {}
  1833. # we include a summary in room responses when we're lazy loading
  1834. # members (as the client otherwise doesn't have enough info to form
  1835. # the name itself).
  1836. if (
  1837. not room_builder.out_of_band
  1838. and sync_config.filter_collection.lazy_load_members()
  1839. and (
  1840. # we recalculate the summary:
  1841. # if there are membership changes in the timeline, or
  1842. # if membership has changed during a gappy sync, or
  1843. # if this is an initial sync.
  1844. any(ev.type == EventTypes.Member for ev in batch.events)
  1845. or (
  1846. # XXX: this may include false positives in the form of LL
  1847. # members which have snuck into state
  1848. batch.limited
  1849. and any(t == EventTypes.Member for (t, k) in state)
  1850. )
  1851. or since_token is None
  1852. )
  1853. ):
  1854. summary = await self.compute_summary(
  1855. room_id, sync_config, batch, state, now_token
  1856. )
  1857. if room_builder.rtype == "joined":
  1858. unread_notifications: Dict[str, int] = {}
  1859. room_sync = JoinedSyncResult(
  1860. room_id=room_id,
  1861. timeline=batch,
  1862. state=state,
  1863. ephemeral=ephemeral,
  1864. account_data=account_data_events,
  1865. unread_notifications=unread_notifications,
  1866. summary=summary,
  1867. unread_count=0,
  1868. )
  1869. if room_sync or always_include:
  1870. notifs = await self.unread_notifs_for_room_id(room_id, sync_config)
  1871. unread_notifications["notification_count"] = notifs.notify_count
  1872. unread_notifications["highlight_count"] = notifs.highlight_count
  1873. room_sync.unread_count = notifs.unread_count
  1874. sync_result_builder.joined.append(room_sync)
  1875. if batch.limited and since_token:
  1876. user_id = sync_result_builder.sync_config.user.to_string()
  1877. logger.debug(
  1878. "Incremental gappy sync of %s for user %s with %d state events"
  1879. % (room_id, user_id, len(state))
  1880. )
  1881. elif room_builder.rtype == "archived":
  1882. archived_room_sync = ArchivedSyncResult(
  1883. room_id=room_id,
  1884. timeline=batch,
  1885. state=state,
  1886. account_data=account_data_events,
  1887. )
  1888. if archived_room_sync or always_include:
  1889. sync_result_builder.archived.append(archived_room_sync)
  1890. else:
  1891. raise Exception("Unrecognized rtype: %r", room_builder.rtype)
  1892. async def get_rooms_for_user_at(
  1893. self, user_id: str, room_key: RoomStreamToken
  1894. ) -> FrozenSet[str]:
  1895. """Get set of joined rooms for a user at the given stream ordering.
  1896. The stream ordering *must* be recent, otherwise this may throw an
  1897. exception if older than a month. (This function is called with the
  1898. current token, which should be perfectly fine).
  1899. Args:
  1900. user_id
  1901. stream_ordering
  1902. ReturnValue:
  1903. Set of room_ids the user is in at given stream_ordering.
  1904. """
  1905. joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id)
  1906. joined_room_ids = set()
  1907. # We need to check that the stream ordering of the join for each room
  1908. # is before the stream_ordering asked for. This might not be the case
  1909. # if the user joins a room between us getting the current token and
  1910. # calling `get_rooms_for_user_with_stream_ordering`.
  1911. # If the membership's stream ordering is after the given stream
  1912. # ordering, we need to go and work out if the user was in the room
  1913. # before.
  1914. for joined_room in joined_rooms:
  1915. if not joined_room.event_pos.persisted_after(room_key):
  1916. joined_room_ids.add(joined_room.room_id)
  1917. continue
  1918. logger.info("User joined room after current token: %s", joined_room.room_id)
  1919. extrems = (
  1920. await self.store.get_forward_extremities_for_room_at_stream_ordering(
  1921. joined_room.room_id, joined_room.event_pos.stream
  1922. )
  1923. )
  1924. users_in_room = await self.state.get_current_users_in_room(
  1925. joined_room.room_id, extrems
  1926. )
  1927. if user_id in users_in_room:
  1928. joined_room_ids.add(joined_room.room_id)
  1929. return frozenset(joined_room_ids)
  1930. def _action_has_highlight(actions: List[JsonDict]) -> bool:
  1931. for action in actions:
  1932. try:
  1933. if action.get("set_tweak", None) == "highlight":
  1934. return action.get("value", True)
  1935. except AttributeError:
  1936. pass
  1937. return False
  1938. def _calculate_state(
  1939. timeline_contains: StateMap[str],
  1940. timeline_start: StateMap[str],
  1941. previous: StateMap[str],
  1942. current: StateMap[str],
  1943. lazy_load_members: bool,
  1944. ) -> StateMap[str]:
  1945. """Works out what state to include in a sync response.
  1946. Args:
  1947. timeline_contains: state in the timeline
  1948. timeline_start: state at the start of the timeline
  1949. previous: state at the end of the previous sync (or empty dict
  1950. if this is an initial sync)
  1951. current: state at the end of the timeline
  1952. lazy_load_members: whether to return members from timeline_start
  1953. or not. assumes that timeline_start has already been filtered to
  1954. include only the members the client needs to know about.
  1955. """
  1956. event_id_to_key = {
  1957. e: key
  1958. for key, e in itertools.chain(
  1959. timeline_contains.items(),
  1960. previous.items(),
  1961. timeline_start.items(),
  1962. current.items(),
  1963. )
  1964. }
  1965. c_ids = set(current.values())
  1966. ts_ids = set(timeline_start.values())
  1967. p_ids = set(previous.values())
  1968. tc_ids = set(timeline_contains.values())
  1969. # If we are lazyloading room members, we explicitly add the membership events
  1970. # for the senders in the timeline into the state block returned by /sync,
  1971. # as we may not have sent them to the client before. We find these membership
  1972. # events by filtering them out of timeline_start, which has already been filtered
  1973. # to only include membership events for the senders in the timeline.
  1974. # In practice, we can do this by removing them from the p_ids list,
  1975. # which is the list of relevant state we know we have already sent to the client.
  1976. # see https://github.com/matrix-org/synapse/pull/2970/files/efcdacad7d1b7f52f879179701c7e0d9b763511f#r204732809
  1977. if lazy_load_members:
  1978. p_ids.difference_update(
  1979. e for t, e in timeline_start.items() if t[0] == EventTypes.Member
  1980. )
  1981. state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
  1982. return {event_id_to_key[e]: e for e in state_ids}
  1983. @attr.s(slots=True, auto_attribs=True)
  1984. class SyncResultBuilder:
  1985. """Used to help build up a new SyncResult for a user
  1986. Attributes:
  1987. sync_config
  1988. full_state: The full_state flag as specified by user
  1989. since_token: The token supplied by user, or None.
  1990. now_token: The token to sync up to.
  1991. joined_room_ids: List of rooms the user is joined to
  1992. # The following mirror the fields in a sync response
  1993. presence
  1994. account_data
  1995. joined
  1996. invited
  1997. knocked
  1998. archived
  1999. groups
  2000. to_device
  2001. """
  2002. sync_config: SyncConfig
  2003. full_state: bool
  2004. since_token: Optional[StreamToken]
  2005. now_token: StreamToken
  2006. joined_room_ids: FrozenSet[str]
  2007. presence: List[UserPresenceState] = attr.Factory(list)
  2008. account_data: List[JsonDict] = attr.Factory(list)
  2009. joined: List[JoinedSyncResult] = attr.Factory(list)
  2010. invited: List[InvitedSyncResult] = attr.Factory(list)
  2011. knocked: List[KnockedSyncResult] = attr.Factory(list)
  2012. archived: List[ArchivedSyncResult] = attr.Factory(list)
  2013. groups: Optional[GroupsSyncResult] = None
  2014. to_device: List[JsonDict] = attr.Factory(list)
  2015. def calculate_user_changes(self) -> Tuple[Set[str], Set[str]]:
  2016. """Work out which other users have joined or left rooms we are joined to.
  2017. This data only is only useful for an incremental sync.
  2018. The SyncResultBuilder is not modified by this function.
  2019. """
  2020. newly_joined_or_invited_or_knocked_users = set()
  2021. newly_left_users = set()
  2022. if self.since_token:
  2023. for joined_sync in self.joined:
  2024. it = itertools.chain(
  2025. joined_sync.timeline.events, joined_sync.state.values()
  2026. )
  2027. for event in it:
  2028. if event.type == EventTypes.Member:
  2029. if (
  2030. event.membership == Membership.JOIN
  2031. or event.membership == Membership.INVITE
  2032. or event.membership == Membership.KNOCK
  2033. ):
  2034. newly_joined_or_invited_or_knocked_users.add(
  2035. event.state_key
  2036. )
  2037. else:
  2038. prev_content = event.unsigned.get("prev_content", {})
  2039. prev_membership = prev_content.get("membership", None)
  2040. if prev_membership == Membership.JOIN:
  2041. newly_left_users.add(event.state_key)
  2042. newly_left_users -= newly_joined_or_invited_or_knocked_users
  2043. return newly_joined_or_invited_or_knocked_users, newly_left_users
  2044. @attr.s(slots=True, auto_attribs=True)
  2045. class RoomSyncResultBuilder:
  2046. """Stores information needed to create either a `JoinedSyncResult` or
  2047. `ArchivedSyncResult`.
  2048. Attributes:
  2049. room_id
  2050. rtype: One of `"joined"` or `"archived"`
  2051. events: List of events to include in the room (more events may be added
  2052. when generating result).
  2053. newly_joined: If the user has newly joined the room
  2054. full_state: Whether the full state should be sent in result
  2055. since_token: Earliest point to return events from, or None
  2056. upto_token: Latest point to return events from.
  2057. out_of_band: whether the events in the room are "out of band" events
  2058. and the server isn't in the room.
  2059. """
  2060. room_id: str
  2061. rtype: str
  2062. events: Optional[List[EventBase]]
  2063. newly_joined: bool
  2064. full_state: bool
  2065. since_token: Optional[StreamToken]
  2066. upto_token: StreamToken
  2067. out_of_band: bool = False