room.py 66 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748
  1. # Copyright 2016-2021 The Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. """Contains functions for performing actions on rooms."""
  15. import itertools
  16. import logging
  17. import math
  18. import random
  19. import string
  20. from collections import OrderedDict
  21. from typing import (
  22. TYPE_CHECKING,
  23. Any,
  24. Awaitable,
  25. Collection,
  26. Dict,
  27. List,
  28. Optional,
  29. Tuple,
  30. )
  31. import attr
  32. from typing_extensions import TypedDict
  33. import synapse.events.snapshot
  34. from synapse.api.constants import (
  35. EventContentFields,
  36. EventTypes,
  37. GuestAccess,
  38. HistoryVisibility,
  39. JoinRules,
  40. Membership,
  41. RoomCreationPreset,
  42. RoomEncryptionAlgorithms,
  43. RoomTypes,
  44. )
  45. from synapse.api.errors import (
  46. AuthError,
  47. Codes,
  48. HttpResponseException,
  49. LimitExceededError,
  50. NotFoundError,
  51. StoreError,
  52. SynapseError,
  53. )
  54. from synapse.api.filtering import Filter
  55. from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
  56. from synapse.event_auth import validate_event_for_room_version
  57. from synapse.events import EventBase
  58. from synapse.events.utils import copy_and_fixup_power_levels_contents
  59. from synapse.federation.federation_client import InvalidResponseError
  60. from synapse.handlers.federation import get_domains_from_state
  61. from synapse.handlers.relations import BundledAggregations
  62. from synapse.rest.admin._base import assert_user_is_admin
  63. from synapse.storage.state import StateFilter
  64. from synapse.streams import EventSource
  65. from synapse.types import (
  66. JsonDict,
  67. MutableStateMap,
  68. Requester,
  69. RoomAlias,
  70. RoomID,
  71. RoomStreamToken,
  72. StateMap,
  73. StreamKeyType,
  74. StreamToken,
  75. UserID,
  76. create_requester,
  77. )
  78. from synapse.util import stringutils
  79. from synapse.util.caches.response_cache import ResponseCache
  80. from synapse.util.stringutils import parse_and_validate_server_name
  81. from synapse.visibility import filter_events_for_client
  82. if TYPE_CHECKING:
  83. from synapse.server import HomeServer
  84. logger = logging.getLogger(__name__)
  85. id_server_scheme = "https://"
  86. FIVE_MINUTES_IN_MS = 5 * 60 * 1000
  87. @attr.s(slots=True, frozen=True, auto_attribs=True)
  88. class EventContext:
  89. events_before: List[EventBase]
  90. event: EventBase
  91. events_after: List[EventBase]
  92. state: List[EventBase]
  93. aggregations: Dict[str, BundledAggregations]
  94. start: str
  95. end: str
  96. class RoomCreationHandler:
  97. def __init__(self, hs: "HomeServer"):
  98. self.store = hs.get_datastores().main
  99. self.auth = hs.get_auth()
  100. self.clock = hs.get_clock()
  101. self.hs = hs
  102. self.spam_checker = hs.get_spam_checker()
  103. self.event_creation_handler = hs.get_event_creation_handler()
  104. self.room_member_handler = hs.get_room_member_handler()
  105. self._event_auth_handler = hs.get_event_auth_handler()
  106. self.config = hs.config
  107. self.request_ratelimiter = hs.get_request_ratelimiter()
  108. # Room state based off defined presets
  109. self._presets_dict: Dict[str, Dict[str, Any]] = {
  110. RoomCreationPreset.PRIVATE_CHAT: {
  111. "join_rules": JoinRules.INVITE,
  112. "history_visibility": HistoryVisibility.SHARED,
  113. "original_invitees_have_ops": False,
  114. "guest_can_join": True,
  115. "power_level_content_override": {"invite": 0},
  116. },
  117. RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
  118. "join_rules": JoinRules.INVITE,
  119. "history_visibility": HistoryVisibility.SHARED,
  120. "original_invitees_have_ops": True,
  121. "guest_can_join": True,
  122. "power_level_content_override": {"invite": 0},
  123. },
  124. RoomCreationPreset.PUBLIC_CHAT: {
  125. "join_rules": JoinRules.PUBLIC,
  126. "history_visibility": HistoryVisibility.SHARED,
  127. "original_invitees_have_ops": False,
  128. "guest_can_join": False,
  129. "power_level_content_override": {},
  130. },
  131. }
  132. # Modify presets to selectively enable encryption by default per homeserver config
  133. for preset_name, preset_config in self._presets_dict.items():
  134. encrypted = (
  135. preset_name
  136. in self.config.room.encryption_enabled_by_default_for_room_presets
  137. )
  138. preset_config["encrypted"] = encrypted
  139. self._default_power_level_content_override = (
  140. self.config.room.default_power_level_content_override
  141. )
  142. self._replication = hs.get_replication_data_handler()
  143. # If a user tries to update the same room multiple times in quick
  144. # succession, only process the first attempt and return its result to
  145. # subsequent requests
  146. self._upgrade_response_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
  147. hs.get_clock(), "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS
  148. )
  149. self._server_notices_mxid = hs.config.servernotices.server_notices_mxid
  150. self.third_party_event_rules = hs.get_third_party_event_rules()
  151. async def upgrade_room(
  152. self, requester: Requester, old_room_id: str, new_version: RoomVersion
  153. ) -> str:
  154. """Replace a room with a new room with a different version
  155. Args:
  156. requester: the user requesting the upgrade
  157. old_room_id: the id of the room to be replaced
  158. new_version: the new room version to use
  159. Returns:
  160. the new room id
  161. Raises:
  162. ShadowBanError if the requester is shadow-banned.
  163. """
  164. await self.request_ratelimiter.ratelimit(requester)
  165. user_id = requester.user.to_string()
  166. # Check if this room is already being upgraded by another person
  167. for key in self._upgrade_response_cache.keys():
  168. if key[0] == old_room_id and key[1] != user_id:
  169. # Two different people are trying to upgrade the same room.
  170. # Send the second an error.
  171. #
  172. # Note that this of course only gets caught if both users are
  173. # on the same homeserver.
  174. raise SynapseError(
  175. 400, "An upgrade for this room is currently in progress"
  176. )
  177. # Check whether the room exists and 404 if it doesn't.
  178. # We could go straight for the auth check, but that will raise a 403 instead.
  179. old_room = await self.store.get_room(old_room_id)
  180. if old_room is None:
  181. raise NotFoundError("Unknown room id %s" % (old_room_id,))
  182. new_room_id = self._generate_room_id()
  183. # Check whether the user has the power level to carry out the upgrade.
  184. # `check_auth_rules_from_context` will check that they are in the room and have
  185. # the required power level to send the tombstone event.
  186. (
  187. tombstone_event,
  188. tombstone_context,
  189. ) = await self.event_creation_handler.create_event(
  190. requester,
  191. {
  192. "type": EventTypes.Tombstone,
  193. "state_key": "",
  194. "room_id": old_room_id,
  195. "sender": user_id,
  196. "content": {
  197. "body": "This room has been replaced",
  198. "replacement_room": new_room_id,
  199. },
  200. },
  201. )
  202. old_room_version = await self.store.get_room_version(old_room_id)
  203. validate_event_for_room_version(old_room_version, tombstone_event)
  204. await self._event_auth_handler.check_auth_rules_from_context(
  205. old_room_version, tombstone_event, tombstone_context
  206. )
  207. # Upgrade the room
  208. #
  209. # If this user has sent multiple upgrade requests for the same room
  210. # and one of them is not complete yet, cache the response and
  211. # return it to all subsequent requests
  212. ret = await self._upgrade_response_cache.wrap(
  213. (old_room_id, user_id),
  214. self._upgrade_room,
  215. requester,
  216. old_room_id,
  217. old_room, # args for _upgrade_room
  218. new_room_id,
  219. new_version,
  220. tombstone_event,
  221. tombstone_context,
  222. )
  223. return ret
  224. async def _upgrade_room(
  225. self,
  226. requester: Requester,
  227. old_room_id: str,
  228. old_room: Dict[str, Any],
  229. new_room_id: str,
  230. new_version: RoomVersion,
  231. tombstone_event: EventBase,
  232. tombstone_context: synapse.events.snapshot.EventContext,
  233. ) -> str:
  234. """
  235. Args:
  236. requester: the user requesting the upgrade
  237. old_room_id: the id of the room to be replaced
  238. old_room: a dict containing room information for the room to be replaced,
  239. as returned by `RoomWorkerStore.get_room`.
  240. new_room_id: the id of the replacement room
  241. new_version: the version to upgrade the room to
  242. tombstone_event: the tombstone event to send to the old room
  243. tombstone_context: the context for the tombstone event
  244. Raises:
  245. ShadowBanError if the requester is shadow-banned.
  246. """
  247. user_id = requester.user.to_string()
  248. assert self.hs.is_mine_id(user_id), "User must be our own: %s" % (user_id,)
  249. logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
  250. # create the new room. may raise a `StoreError` in the exceedingly unlikely
  251. # event of a room ID collision.
  252. await self.store.store_room(
  253. room_id=new_room_id,
  254. room_creator_user_id=user_id,
  255. is_public=old_room["is_public"],
  256. room_version=new_version,
  257. )
  258. await self.clone_existing_room(
  259. requester,
  260. old_room_id=old_room_id,
  261. new_room_id=new_room_id,
  262. new_room_version=new_version,
  263. tombstone_event_id=tombstone_event.event_id,
  264. )
  265. # now send the tombstone
  266. await self.event_creation_handler.handle_new_client_event(
  267. requester=requester,
  268. event=tombstone_event,
  269. context=tombstone_context,
  270. )
  271. old_room_state = await tombstone_context.get_current_state_ids()
  272. # We know the tombstone event isn't an outlier so it has current state.
  273. assert old_room_state is not None
  274. # update any aliases
  275. await self._move_aliases_to_new_room(
  276. requester, old_room_id, new_room_id, old_room_state
  277. )
  278. # Copy over user push rules, tags and migrate room directory state
  279. await self.room_member_handler.transfer_room_state_on_room_upgrade(
  280. old_room_id, new_room_id
  281. )
  282. # finally, shut down the PLs in the old room, and update them in the new
  283. # room.
  284. await self._update_upgraded_room_pls(
  285. requester,
  286. old_room_id,
  287. new_room_id,
  288. old_room_state,
  289. )
  290. return new_room_id
  291. async def _update_upgraded_room_pls(
  292. self,
  293. requester: Requester,
  294. old_room_id: str,
  295. new_room_id: str,
  296. old_room_state: StateMap[str],
  297. ) -> None:
  298. """Send updated power levels in both rooms after an upgrade
  299. Args:
  300. requester: the user requesting the upgrade
  301. old_room_id: the id of the room to be replaced
  302. new_room_id: the id of the replacement room
  303. old_room_state: the state map for the old room
  304. Raises:
  305. ShadowBanError if the requester is shadow-banned.
  306. """
  307. old_room_pl_event_id = old_room_state.get((EventTypes.PowerLevels, ""))
  308. if old_room_pl_event_id is None:
  309. logger.warning(
  310. "Not supported: upgrading a room with no PL event. Not setting PLs "
  311. "in old room."
  312. )
  313. return
  314. old_room_pl_state = await self.store.get_event(old_room_pl_event_id)
  315. # we try to stop regular users from speaking by setting the PL required
  316. # to send regular events and invites to 'Moderator' level. That's normally
  317. # 50, but if the default PL in a room is 50 or more, then we set the
  318. # required PL above that.
  319. pl_content = copy_and_fixup_power_levels_contents(old_room_pl_state.content)
  320. users_default: int = pl_content.get("users_default", 0) # type: ignore[assignment]
  321. restricted_level = max(users_default + 1, 50)
  322. updated = False
  323. for v in ("invite", "events_default"):
  324. current: int = pl_content.get(v, 0) # type: ignore[assignment]
  325. if current < restricted_level:
  326. logger.debug(
  327. "Setting level for %s in %s to %i (was %i)",
  328. v,
  329. old_room_id,
  330. restricted_level,
  331. current,
  332. )
  333. pl_content[v] = restricted_level
  334. updated = True
  335. else:
  336. logger.debug("Not setting level for %s (already %i)", v, current)
  337. if updated:
  338. try:
  339. await self.event_creation_handler.create_and_send_nonmember_event(
  340. requester,
  341. {
  342. "type": EventTypes.PowerLevels,
  343. "state_key": "",
  344. "room_id": old_room_id,
  345. "sender": requester.user.to_string(),
  346. "content": pl_content,
  347. },
  348. ratelimit=False,
  349. )
  350. except AuthError as e:
  351. logger.warning("Unable to update PLs in old room: %s", e)
  352. await self.event_creation_handler.create_and_send_nonmember_event(
  353. requester,
  354. {
  355. "type": EventTypes.PowerLevels,
  356. "state_key": "",
  357. "room_id": new_room_id,
  358. "sender": requester.user.to_string(),
  359. "content": copy_and_fixup_power_levels_contents(
  360. old_room_pl_state.content
  361. ),
  362. },
  363. ratelimit=False,
  364. )
  365. async def clone_existing_room(
  366. self,
  367. requester: Requester,
  368. old_room_id: str,
  369. new_room_id: str,
  370. new_room_version: RoomVersion,
  371. tombstone_event_id: str,
  372. ) -> None:
  373. """Populate a new room based on an old room
  374. Args:
  375. requester: the user requesting the upgrade
  376. old_room_id : the id of the room to be replaced
  377. new_room_id: the id to give the new room (should already have been
  378. created with _gemerate_room_id())
  379. new_room_version: the new room version to use
  380. tombstone_event_id: the ID of the tombstone event in the old room.
  381. """
  382. user_id = requester.user.to_string()
  383. if not await self.spam_checker.user_may_create_room(user_id):
  384. raise SynapseError(
  385. 403, "You are not permitted to create rooms", Codes.FORBIDDEN
  386. )
  387. creation_content: JsonDict = {
  388. "room_version": new_room_version.identifier,
  389. "predecessor": {"room_id": old_room_id, "event_id": tombstone_event_id},
  390. }
  391. # Check if old room was non-federatable
  392. # Get old room's create event
  393. old_room_create_event = await self.store.get_create_event_for_room(old_room_id)
  394. # Check if the create event specified a non-federatable room
  395. if not old_room_create_event.content.get(EventContentFields.FEDERATE, True):
  396. # If so, mark the new room as non-federatable as well
  397. creation_content[EventContentFields.FEDERATE] = False
  398. initial_state = {}
  399. # Replicate relevant room events
  400. types_to_copy: List[Tuple[str, Optional[str]]] = [
  401. (EventTypes.JoinRules, ""),
  402. (EventTypes.Name, ""),
  403. (EventTypes.Topic, ""),
  404. (EventTypes.RoomHistoryVisibility, ""),
  405. (EventTypes.GuestAccess, ""),
  406. (EventTypes.RoomAvatar, ""),
  407. (EventTypes.RoomEncryption, ""),
  408. (EventTypes.ServerACL, ""),
  409. (EventTypes.RelatedGroups, ""),
  410. (EventTypes.PowerLevels, ""),
  411. ]
  412. # If the old room was a space, copy over the room type and the rooms in
  413. # the space.
  414. if (
  415. old_room_create_event.content.get(EventContentFields.ROOM_TYPE)
  416. == RoomTypes.SPACE
  417. ):
  418. creation_content[EventContentFields.ROOM_TYPE] = RoomTypes.SPACE
  419. types_to_copy.append((EventTypes.SpaceChild, None))
  420. old_room_state_ids = await self.store.get_filtered_current_state_ids(
  421. old_room_id, StateFilter.from_types(types_to_copy)
  422. )
  423. # map from event_id to BaseEvent
  424. old_room_state_events = await self.store.get_events(old_room_state_ids.values())
  425. for k, old_event_id in old_room_state_ids.items():
  426. old_event = old_room_state_events.get(old_event_id)
  427. if old_event:
  428. # If the event is an space child event with empty content, it was
  429. # removed from the space and should be ignored.
  430. if k[0] == EventTypes.SpaceChild and not old_event.content:
  431. continue
  432. initial_state[k] = old_event.content
  433. # deep-copy the power-levels event before we start modifying it
  434. # note that if frozen_dicts are enabled, `power_levels` will be a frozen
  435. # dict so we can't just copy.deepcopy it.
  436. initial_state[
  437. (EventTypes.PowerLevels, "")
  438. ] = power_levels = copy_and_fixup_power_levels_contents(
  439. initial_state[(EventTypes.PowerLevels, "")]
  440. )
  441. # Resolve the minimum power level required to send any state event
  442. # We will give the upgrading user this power level temporarily (if necessary) such that
  443. # they are able to copy all of the state events over, then revert them back to their
  444. # original power level afterwards in _update_upgraded_room_pls
  445. # Copy over user power levels now as this will not be possible with >100PL users once
  446. # the room has been created
  447. # Calculate the minimum power level needed to clone the room
  448. event_power_levels = power_levels.get("events", {})
  449. if not isinstance(event_power_levels, dict):
  450. event_power_levels = {}
  451. state_default = power_levels.get("state_default", 50)
  452. try:
  453. state_default_int = int(state_default) # type: ignore[arg-type]
  454. except (TypeError, ValueError):
  455. state_default_int = 50
  456. ban = power_levels.get("ban", 50)
  457. try:
  458. ban = int(ban) # type: ignore[arg-type]
  459. except (TypeError, ValueError):
  460. ban = 50
  461. needed_power_level = max(
  462. state_default_int, ban, max(event_power_levels.values())
  463. )
  464. # Get the user's current power level, this matches the logic in get_user_power_level,
  465. # but without the entire state map.
  466. user_power_levels = power_levels.setdefault("users", {})
  467. if not isinstance(user_power_levels, dict):
  468. user_power_levels = {}
  469. users_default = power_levels.get("users_default", 0)
  470. current_power_level = user_power_levels.get(user_id, users_default)
  471. try:
  472. current_power_level_int = int(current_power_level) # type: ignore[arg-type]
  473. except (TypeError, ValueError):
  474. current_power_level_int = 0
  475. # Raise the requester's power level in the new room if necessary
  476. if current_power_level_int < needed_power_level:
  477. user_power_levels[user_id] = needed_power_level
  478. await self._send_events_for_new_room(
  479. requester,
  480. new_room_id,
  481. # we expect to override all the presets with initial_state, so this is
  482. # somewhat arbitrary.
  483. preset_config=RoomCreationPreset.PRIVATE_CHAT,
  484. invite_list=[],
  485. initial_state=initial_state,
  486. creation_content=creation_content,
  487. ratelimit=False,
  488. )
  489. # Transfer membership events
  490. old_room_member_state_ids = await self.store.get_filtered_current_state_ids(
  491. old_room_id, StateFilter.from_types([(EventTypes.Member, None)])
  492. )
  493. # map from event_id to BaseEvent
  494. old_room_member_state_events = await self.store.get_events(
  495. old_room_member_state_ids.values()
  496. )
  497. for old_event in old_room_member_state_events.values():
  498. # Only transfer ban events
  499. if (
  500. "membership" in old_event.content
  501. and old_event.content["membership"] == "ban"
  502. ):
  503. await self.room_member_handler.update_membership(
  504. requester,
  505. UserID.from_string(old_event.state_key),
  506. new_room_id,
  507. "ban",
  508. ratelimit=False,
  509. content=old_event.content,
  510. )
  511. # XXX invites/joins
  512. # XXX 3pid invites
  513. async def _move_aliases_to_new_room(
  514. self,
  515. requester: Requester,
  516. old_room_id: str,
  517. new_room_id: str,
  518. old_room_state: StateMap[str],
  519. ) -> None:
  520. # check to see if we have a canonical alias.
  521. canonical_alias_event = None
  522. canonical_alias_event_id = old_room_state.get((EventTypes.CanonicalAlias, ""))
  523. if canonical_alias_event_id:
  524. canonical_alias_event = await self.store.get_event(canonical_alias_event_id)
  525. await self.store.update_aliases_for_room(old_room_id, new_room_id)
  526. if not canonical_alias_event:
  527. return
  528. # If there is a canonical alias we need to update the one in the old
  529. # room and set one in the new one.
  530. old_canonical_alias_content = dict(canonical_alias_event.content)
  531. new_canonical_alias_content = {}
  532. canonical = canonical_alias_event.content.get("alias")
  533. if canonical and self.hs.is_mine_id(canonical):
  534. new_canonical_alias_content["alias"] = canonical
  535. old_canonical_alias_content.pop("alias", None)
  536. # We convert to a list as it will be a Tuple.
  537. old_alt_aliases = list(old_canonical_alias_content.get("alt_aliases", []))
  538. if old_alt_aliases:
  539. old_canonical_alias_content["alt_aliases"] = old_alt_aliases
  540. new_alt_aliases = new_canonical_alias_content.setdefault("alt_aliases", [])
  541. for alias in canonical_alias_event.content.get("alt_aliases", []):
  542. try:
  543. if self.hs.is_mine_id(alias):
  544. new_alt_aliases.append(alias)
  545. old_alt_aliases.remove(alias)
  546. except Exception:
  547. logger.info(
  548. "Invalid alias %s in canonical alias event %s",
  549. alias,
  550. canonical_alias_event_id,
  551. )
  552. if not old_alt_aliases:
  553. old_canonical_alias_content.pop("alt_aliases")
  554. # If a canonical alias event existed for the old room, fire a canonical
  555. # alias event for the new room with a copy of the information.
  556. try:
  557. await self.event_creation_handler.create_and_send_nonmember_event(
  558. requester,
  559. {
  560. "type": EventTypes.CanonicalAlias,
  561. "state_key": "",
  562. "room_id": old_room_id,
  563. "sender": requester.user.to_string(),
  564. "content": old_canonical_alias_content,
  565. },
  566. ratelimit=False,
  567. )
  568. except SynapseError as e:
  569. # again I'm not really expecting this to fail, but if it does, I'd rather
  570. # we returned the new room to the client at this point.
  571. logger.error("Unable to send updated alias events in old room: %s", e)
  572. try:
  573. await self.event_creation_handler.create_and_send_nonmember_event(
  574. requester,
  575. {
  576. "type": EventTypes.CanonicalAlias,
  577. "state_key": "",
  578. "room_id": new_room_id,
  579. "sender": requester.user.to_string(),
  580. "content": new_canonical_alias_content,
  581. },
  582. ratelimit=False,
  583. )
  584. except SynapseError as e:
  585. # again I'm not really expecting this to fail, but if it does, I'd rather
  586. # we returned the new room to the client at this point.
  587. logger.error("Unable to send updated alias events in new room: %s", e)
  588. async def create_room(
  589. self,
  590. requester: Requester,
  591. config: JsonDict,
  592. ratelimit: bool = True,
  593. creator_join_profile: Optional[JsonDict] = None,
  594. ) -> Tuple[dict, int]:
  595. """Creates a new room.
  596. Args:
  597. requester:
  598. The user who requested the room creation.
  599. config : A dict of configuration options.
  600. ratelimit: set to False to disable the rate limiter
  601. creator_join_profile:
  602. Set to override the displayname and avatar for the creating
  603. user in this room. If unset, displayname and avatar will be
  604. derived from the user's profile. If set, should contain the
  605. values to go in the body of the 'join' event (typically
  606. `avatar_url` and/or `displayname`.
  607. Returns:
  608. First, a dict containing the keys `room_id` and, if an alias
  609. was, requested, `room_alias`. Secondly, the stream_id of the
  610. last persisted event.
  611. Raises:
  612. SynapseError if the room ID couldn't be stored, or something went
  613. horribly wrong.
  614. ResourceLimitError if server is blocked to some resource being
  615. exceeded
  616. """
  617. user_id = requester.user.to_string()
  618. await self.auth.check_auth_blocking(requester=requester)
  619. if (
  620. self._server_notices_mxid is not None
  621. and requester.user.to_string() == self._server_notices_mxid
  622. ):
  623. # allow the server notices mxid to create rooms
  624. is_requester_admin = True
  625. else:
  626. is_requester_admin = await self.auth.is_server_admin(requester.user)
  627. # Let the third party rules modify the room creation config if needed, or abort
  628. # the room creation entirely with an exception.
  629. await self.third_party_event_rules.on_create_room(
  630. requester, config, is_requester_admin=is_requester_admin
  631. )
  632. invite_3pid_list = config.get("invite_3pid", [])
  633. invite_list = config.get("invite", [])
  634. if not is_requester_admin and not (
  635. await self.spam_checker.user_may_create_room(user_id)
  636. ):
  637. raise SynapseError(
  638. 403, "You are not permitted to create rooms", Codes.FORBIDDEN
  639. )
  640. if ratelimit:
  641. await self.request_ratelimiter.ratelimit(requester)
  642. room_version_id = config.get(
  643. "room_version", self.config.server.default_room_version.identifier
  644. )
  645. if not isinstance(room_version_id, str):
  646. raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON)
  647. room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
  648. if room_version is None:
  649. raise SynapseError(
  650. 400,
  651. "Your homeserver does not support this room version",
  652. Codes.UNSUPPORTED_ROOM_VERSION,
  653. )
  654. room_alias = None
  655. if "room_alias_name" in config:
  656. for wchar in string.whitespace:
  657. if wchar in config["room_alias_name"]:
  658. raise SynapseError(400, "Invalid characters in room alias")
  659. room_alias = RoomAlias(config["room_alias_name"], self.hs.hostname)
  660. mapping = await self.store.get_association_from_room_alias(room_alias)
  661. if mapping:
  662. raise SynapseError(400, "Room alias already taken", Codes.ROOM_IN_USE)
  663. for i in invite_list:
  664. try:
  665. uid = UserID.from_string(i)
  666. parse_and_validate_server_name(uid.domain)
  667. except Exception:
  668. raise SynapseError(400, "Invalid user_id: %s" % (i,))
  669. if (invite_list or invite_3pid_list) and requester.shadow_banned:
  670. # We randomly sleep a bit just to annoy the requester.
  671. await self.clock.sleep(random.randint(1, 10))
  672. # Allow the request to go through, but remove any associated invites.
  673. invite_3pid_list = []
  674. invite_list = []
  675. if invite_list or invite_3pid_list:
  676. try:
  677. # If there are invites in the request, see if the ratelimiting settings
  678. # allow that number of invites to be sent from the current user.
  679. await self.room_member_handler.ratelimit_multiple_invites(
  680. requester,
  681. room_id=None,
  682. n_invites=len(invite_list) + len(invite_3pid_list),
  683. update=False,
  684. )
  685. except LimitExceededError:
  686. raise SynapseError(400, "Cannot invite so many users at once")
  687. await self.event_creation_handler.assert_accepted_privacy_policy(requester)
  688. power_level_content_override = config.get("power_level_content_override")
  689. if (
  690. power_level_content_override
  691. and "users" in power_level_content_override
  692. and user_id not in power_level_content_override["users"]
  693. ):
  694. raise SynapseError(
  695. 400,
  696. "Not a valid power_level_content_override: 'users' did not contain %s"
  697. % (user_id,),
  698. )
  699. # The spec says rooms should default to private visibility if
  700. # `visibility` is not specified.
  701. visibility = config.get("visibility", "private")
  702. is_public = visibility == "public"
  703. room_id = await self._generate_and_create_room_id(
  704. creator_id=user_id,
  705. is_public=is_public,
  706. room_version=room_version,
  707. )
  708. # Check whether this visibility value is blocked by a third party module
  709. allowed_by_third_party_rules = await (
  710. self.third_party_event_rules.check_visibility_can_be_modified(
  711. room_id, visibility
  712. )
  713. )
  714. if not allowed_by_third_party_rules:
  715. raise SynapseError(403, "Room visibility value not allowed.")
  716. if is_public:
  717. room_aliases = []
  718. if room_alias:
  719. room_aliases.append(room_alias.to_string())
  720. if not self.config.roomdirectory.is_publishing_room_allowed(
  721. user_id, room_id, room_aliases
  722. ):
  723. # Let's just return a generic message, as there may be all sorts of
  724. # reasons why we said no. TODO: Allow configurable error messages
  725. # per alias creation rule?
  726. raise SynapseError(403, "Not allowed to publish room")
  727. directory_handler = self.hs.get_directory_handler()
  728. if room_alias:
  729. await directory_handler.create_association(
  730. requester=requester,
  731. room_id=room_id,
  732. room_alias=room_alias,
  733. servers=[self.hs.hostname],
  734. check_membership=False,
  735. )
  736. preset_config = config.get(
  737. "preset",
  738. RoomCreationPreset.PRIVATE_CHAT
  739. if visibility == "private"
  740. else RoomCreationPreset.PUBLIC_CHAT,
  741. )
  742. raw_initial_state = config.get("initial_state", [])
  743. initial_state = OrderedDict()
  744. for val in raw_initial_state:
  745. initial_state[(val["type"], val.get("state_key", ""))] = val["content"]
  746. creation_content = config.get("creation_content", {})
  747. # override any attempt to set room versions via the creation_content
  748. creation_content["room_version"] = room_version.identifier
  749. last_stream_id = await self._send_events_for_new_room(
  750. requester,
  751. room_id,
  752. preset_config=preset_config,
  753. invite_list=invite_list,
  754. initial_state=initial_state,
  755. creation_content=creation_content,
  756. room_alias=room_alias,
  757. power_level_content_override=power_level_content_override,
  758. creator_join_profile=creator_join_profile,
  759. ratelimit=ratelimit,
  760. )
  761. if "name" in config:
  762. name = config["name"]
  763. (
  764. _,
  765. last_stream_id,
  766. ) = await self.event_creation_handler.create_and_send_nonmember_event(
  767. requester,
  768. {
  769. "type": EventTypes.Name,
  770. "room_id": room_id,
  771. "sender": user_id,
  772. "state_key": "",
  773. "content": {"name": name},
  774. },
  775. ratelimit=False,
  776. )
  777. if "topic" in config:
  778. topic = config["topic"]
  779. (
  780. _,
  781. last_stream_id,
  782. ) = await self.event_creation_handler.create_and_send_nonmember_event(
  783. requester,
  784. {
  785. "type": EventTypes.Topic,
  786. "room_id": room_id,
  787. "sender": user_id,
  788. "state_key": "",
  789. "content": {"topic": topic},
  790. },
  791. ratelimit=False,
  792. )
  793. # we avoid dropping the lock between invites, as otherwise joins can
  794. # start coming in and making the createRoom slow.
  795. #
  796. # we also don't need to check the requester's shadow-ban here, as we
  797. # have already done so above (and potentially emptied invite_list).
  798. async with self.room_member_handler.member_linearizer.queue((room_id,)):
  799. content = {}
  800. is_direct = config.get("is_direct", None)
  801. if is_direct:
  802. content["is_direct"] = is_direct
  803. for invitee in invite_list:
  804. (
  805. _,
  806. last_stream_id,
  807. ) = await self.room_member_handler.update_membership_locked(
  808. requester,
  809. UserID.from_string(invitee),
  810. room_id,
  811. "invite",
  812. ratelimit=False,
  813. content=content,
  814. new_room=True,
  815. )
  816. for invite_3pid in invite_3pid_list:
  817. id_server = invite_3pid["id_server"]
  818. id_access_token = invite_3pid.get("id_access_token") # optional
  819. address = invite_3pid["address"]
  820. medium = invite_3pid["medium"]
  821. # Note that do_3pid_invite can raise a ShadowBanError, but this was
  822. # handled above by emptying invite_3pid_list.
  823. last_stream_id = await self.hs.get_room_member_handler().do_3pid_invite(
  824. room_id,
  825. requester.user,
  826. medium,
  827. address,
  828. id_server,
  829. requester,
  830. txn_id=None,
  831. id_access_token=id_access_token,
  832. )
  833. result = {"room_id": room_id}
  834. if room_alias:
  835. result["room_alias"] = room_alias.to_string()
  836. # Always wait for room creation to propagate before returning
  837. await self._replication.wait_for_stream_position(
  838. self.hs.config.worker.events_shard_config.get_instance(room_id),
  839. "events",
  840. last_stream_id,
  841. )
  842. return result, last_stream_id
  843. async def _send_events_for_new_room(
  844. self,
  845. creator: Requester,
  846. room_id: str,
  847. preset_config: str,
  848. invite_list: List[str],
  849. initial_state: MutableStateMap,
  850. creation_content: JsonDict,
  851. room_alias: Optional[RoomAlias] = None,
  852. power_level_content_override: Optional[JsonDict] = None,
  853. creator_join_profile: Optional[JsonDict] = None,
  854. ratelimit: bool = True,
  855. ) -> int:
  856. """Sends the initial events into a new room.
  857. `power_level_content_override` doesn't apply when initial state has
  858. power level state event content.
  859. Returns:
  860. The stream_id of the last event persisted.
  861. """
  862. creator_id = creator.user.to_string()
  863. event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
  864. def create(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
  865. e = {"type": etype, "content": content}
  866. e.update(event_keys)
  867. e.update(kwargs)
  868. return e
  869. async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
  870. event = create(etype, content, **kwargs)
  871. logger.debug("Sending %s in new room", etype)
  872. # Allow these events to be sent even if the user is shadow-banned to
  873. # allow the room creation to complete.
  874. (
  875. _,
  876. last_stream_id,
  877. ) = await self.event_creation_handler.create_and_send_nonmember_event(
  878. creator,
  879. event,
  880. ratelimit=False,
  881. ignore_shadow_ban=True,
  882. )
  883. return last_stream_id
  884. try:
  885. config = self._presets_dict[preset_config]
  886. except KeyError:
  887. raise SynapseError(
  888. 400, f"'{preset_config}' is not a valid preset", errcode=Codes.BAD_JSON
  889. )
  890. creation_content.update({"creator": creator_id})
  891. await send(etype=EventTypes.Create, content=creation_content)
  892. logger.debug("Sending %s in new room", EventTypes.Member)
  893. await self.room_member_handler.update_membership(
  894. creator,
  895. creator.user,
  896. room_id,
  897. "join",
  898. ratelimit=ratelimit,
  899. content=creator_join_profile,
  900. new_room=True,
  901. )
  902. # We treat the power levels override specially as this needs to be one
  903. # of the first events that get sent into a room.
  904. pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
  905. if pl_content is not None:
  906. last_sent_stream_id = await send(
  907. etype=EventTypes.PowerLevels, content=pl_content
  908. )
  909. else:
  910. power_level_content: JsonDict = {
  911. "users": {creator_id: 100},
  912. "users_default": 0,
  913. "events": {
  914. EventTypes.Name: 50,
  915. EventTypes.PowerLevels: 100,
  916. EventTypes.RoomHistoryVisibility: 100,
  917. EventTypes.CanonicalAlias: 50,
  918. EventTypes.RoomAvatar: 50,
  919. EventTypes.Tombstone: 100,
  920. EventTypes.ServerACL: 100,
  921. EventTypes.RoomEncryption: 100,
  922. },
  923. "events_default": 0,
  924. "state_default": 50,
  925. "ban": 50,
  926. "kick": 50,
  927. "redact": 50,
  928. "invite": 50,
  929. "historical": 100,
  930. }
  931. if config["original_invitees_have_ops"]:
  932. for invitee in invite_list:
  933. power_level_content["users"][invitee] = 100
  934. # If the user supplied a preset name e.g. "private_chat",
  935. # we apply that preset
  936. power_level_content.update(config["power_level_content_override"])
  937. # If the server config contains default_power_level_content_override,
  938. # and that contains information for this room preset, apply it.
  939. if self._default_power_level_content_override:
  940. override = self._default_power_level_content_override.get(preset_config)
  941. if override is not None:
  942. power_level_content.update(override)
  943. # Finally, if the user supplied specific permissions for this room,
  944. # apply those.
  945. if power_level_content_override:
  946. power_level_content.update(power_level_content_override)
  947. last_sent_stream_id = await send(
  948. etype=EventTypes.PowerLevels, content=power_level_content
  949. )
  950. if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
  951. last_sent_stream_id = await send(
  952. etype=EventTypes.CanonicalAlias,
  953. content={"alias": room_alias.to_string()},
  954. )
  955. if (EventTypes.JoinRules, "") not in initial_state:
  956. last_sent_stream_id = await send(
  957. etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]}
  958. )
  959. if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
  960. last_sent_stream_id = await send(
  961. etype=EventTypes.RoomHistoryVisibility,
  962. content={"history_visibility": config["history_visibility"]},
  963. )
  964. if config["guest_can_join"]:
  965. if (EventTypes.GuestAccess, "") not in initial_state:
  966. last_sent_stream_id = await send(
  967. etype=EventTypes.GuestAccess,
  968. content={EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
  969. )
  970. for (etype, state_key), content in initial_state.items():
  971. last_sent_stream_id = await send(
  972. etype=etype, state_key=state_key, content=content
  973. )
  974. if config["encrypted"]:
  975. last_sent_stream_id = await send(
  976. etype=EventTypes.RoomEncryption,
  977. state_key="",
  978. content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
  979. )
  980. return last_sent_stream_id
  981. def _generate_room_id(self) -> str:
  982. """Generates a random room ID.
  983. Room IDs look like "!opaque_id:domain" and are case-sensitive as per the spec
  984. at https://spec.matrix.org/v1.2/appendices/#room-ids-and-event-ids.
  985. Does not check for collisions with existing rooms or prevent future calls from
  986. returning the same room ID. To ensure the uniqueness of a new room ID, use
  987. `_generate_and_create_room_id` instead.
  988. Synapse's room IDs are 18 [a-zA-Z] characters long, which comes out to around
  989. 102 bits.
  990. Returns:
  991. A random room ID of the form "!opaque_id:domain".
  992. """
  993. random_string = stringutils.random_string(18)
  994. return RoomID(random_string, self.hs.hostname).to_string()
  995. async def _generate_and_create_room_id(
  996. self,
  997. creator_id: str,
  998. is_public: bool,
  999. room_version: RoomVersion,
  1000. ) -> str:
  1001. # autogen room IDs and try to create it. We may clash, so just
  1002. # try a few times till one goes through, giving up eventually.
  1003. attempts = 0
  1004. while attempts < 5:
  1005. try:
  1006. gen_room_id = self._generate_room_id()
  1007. await self.store.store_room(
  1008. room_id=gen_room_id,
  1009. room_creator_user_id=creator_id,
  1010. is_public=is_public,
  1011. room_version=room_version,
  1012. )
  1013. return gen_room_id
  1014. except StoreError:
  1015. attempts += 1
  1016. raise StoreError(500, "Couldn't generate a room ID.")
  1017. class RoomContextHandler:
  1018. def __init__(self, hs: "HomeServer"):
  1019. self.hs = hs
  1020. self.auth = hs.get_auth()
  1021. self.store = hs.get_datastores().main
  1022. self.storage = hs.get_storage()
  1023. self.state_store = self.storage.state
  1024. self._relations_handler = hs.get_relations_handler()
  1025. async def get_event_context(
  1026. self,
  1027. requester: Requester,
  1028. room_id: str,
  1029. event_id: str,
  1030. limit: int,
  1031. event_filter: Optional[Filter],
  1032. use_admin_priviledge: bool = False,
  1033. ) -> Optional[EventContext]:
  1034. """Retrieves events, pagination tokens and state around a given event
  1035. in a room.
  1036. Args:
  1037. requester
  1038. room_id
  1039. event_id
  1040. limit: The maximum number of events to return in total
  1041. (excluding state).
  1042. event_filter: the filter to apply to the events returned
  1043. (excluding the target event_id)
  1044. use_admin_priviledge: if `True`, return all events, regardless
  1045. of whether `user` has access to them. To be used **ONLY**
  1046. from the admin API.
  1047. Returns:
  1048. dict, or None if the event isn't found
  1049. """
  1050. user = requester.user
  1051. if use_admin_priviledge:
  1052. await assert_user_is_admin(self.auth, requester.user)
  1053. before_limit = math.floor(limit / 2.0)
  1054. after_limit = limit - before_limit
  1055. users = await self.store.get_users_in_room(room_id)
  1056. is_peeking = user.to_string() not in users
  1057. async def filter_evts(events: List[EventBase]) -> List[EventBase]:
  1058. if use_admin_priviledge:
  1059. return events
  1060. return await filter_events_for_client(
  1061. self.storage, user.to_string(), events, is_peeking=is_peeking
  1062. )
  1063. event = await self.store.get_event(
  1064. event_id, get_prev_content=True, allow_none=True
  1065. )
  1066. if not event:
  1067. return None
  1068. filtered = await filter_evts([event])
  1069. if not filtered:
  1070. raise AuthError(403, "You don't have permission to access that event.")
  1071. results = await self.store.get_events_around(
  1072. room_id, event_id, before_limit, after_limit, event_filter
  1073. )
  1074. events_before = results.events_before
  1075. events_after = results.events_after
  1076. if event_filter:
  1077. events_before = await event_filter.filter(events_before)
  1078. events_after = await event_filter.filter(events_after)
  1079. events_before = await filter_evts(events_before)
  1080. events_after = await filter_evts(events_after)
  1081. # filter_evts can return a pruned event in case the user is allowed to see that
  1082. # there's something there but not see the content, so use the event that's in
  1083. # `filtered` rather than the event we retrieved from the datastore.
  1084. event = filtered[0]
  1085. # Fetch the aggregations.
  1086. aggregations = await self._relations_handler.get_bundled_aggregations(
  1087. itertools.chain(events_before, (event,), events_after),
  1088. user.to_string(),
  1089. )
  1090. if events_after:
  1091. last_event_id = events_after[-1].event_id
  1092. else:
  1093. last_event_id = event_id
  1094. if event_filter and event_filter.lazy_load_members:
  1095. state_filter = StateFilter.from_lazy_load_member_list(
  1096. ev.sender
  1097. for ev in itertools.chain(
  1098. events_before,
  1099. (event,),
  1100. events_after,
  1101. )
  1102. )
  1103. else:
  1104. state_filter = StateFilter.all()
  1105. # XXX: why do we return the state as of the last event rather than the
  1106. # first? Shouldn't we be consistent with /sync?
  1107. # https://github.com/matrix-org/matrix-doc/issues/687
  1108. state = await self.state_store.get_state_for_events(
  1109. [last_event_id], state_filter=state_filter
  1110. )
  1111. state_events = list(state[last_event_id].values())
  1112. if event_filter:
  1113. state_events = await event_filter.filter(state_events)
  1114. # We use a dummy token here as we only care about the room portion of
  1115. # the token, which we replace.
  1116. token = StreamToken.START
  1117. return EventContext(
  1118. events_before=events_before,
  1119. event=event,
  1120. events_after=events_after,
  1121. state=await filter_evts(state_events),
  1122. aggregations=aggregations,
  1123. start=await token.copy_and_replace(
  1124. StreamKeyType.ROOM, results.start
  1125. ).to_string(self.store),
  1126. end=await token.copy_and_replace(StreamKeyType.ROOM, results.end).to_string(
  1127. self.store
  1128. ),
  1129. )
  1130. class TimestampLookupHandler:
  1131. def __init__(self, hs: "HomeServer"):
  1132. self.server_name = hs.hostname
  1133. self.store = hs.get_datastores().main
  1134. self.state_handler = hs.get_state_handler()
  1135. self.federation_client = hs.get_federation_client()
  1136. async def get_event_for_timestamp(
  1137. self,
  1138. requester: Requester,
  1139. room_id: str,
  1140. timestamp: int,
  1141. direction: str,
  1142. ) -> Tuple[str, int]:
  1143. """Find the closest event to the given timestamp in the given direction.
  1144. If we can't find an event locally or the event we have locally is next to a gap,
  1145. it will ask other federated homeservers for an event.
  1146. Args:
  1147. requester: The user making the request according to the access token
  1148. room_id: Room to fetch the event from
  1149. timestamp: The point in time (inclusive) we should navigate from in
  1150. the given direction to find the closest event.
  1151. direction: ["f"|"b"] to indicate whether we should navigate forward
  1152. or backward from the given timestamp to find the closest event.
  1153. Returns:
  1154. A tuple containing the `event_id` closest to the given timestamp in
  1155. the given direction and the `origin_server_ts`.
  1156. Raises:
  1157. SynapseError if unable to find any event locally in the given direction
  1158. """
  1159. local_event_id = await self.store.get_event_id_for_timestamp(
  1160. room_id, timestamp, direction
  1161. )
  1162. logger.debug(
  1163. "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s",
  1164. local_event_id,
  1165. timestamp,
  1166. )
  1167. # Check for gaps in the history where events could be hiding in between
  1168. # the timestamp given and the event we were able to find locally
  1169. is_event_next_to_backward_gap = False
  1170. is_event_next_to_forward_gap = False
  1171. if local_event_id:
  1172. local_event = await self.store.get_event(
  1173. local_event_id, allow_none=False, allow_rejected=False
  1174. )
  1175. if direction == "f":
  1176. # We only need to check for a backward gap if we're looking forwards
  1177. # to ensure there is nothing in between.
  1178. is_event_next_to_backward_gap = (
  1179. await self.store.is_event_next_to_backward_gap(local_event)
  1180. )
  1181. elif direction == "b":
  1182. # We only need to check for a forward gap if we're looking backwards
  1183. # to ensure there is nothing in between
  1184. is_event_next_to_forward_gap = (
  1185. await self.store.is_event_next_to_forward_gap(local_event)
  1186. )
  1187. # If we found a gap, we should probably ask another homeserver first
  1188. # about more history in between
  1189. if (
  1190. not local_event_id
  1191. or is_event_next_to_backward_gap
  1192. or is_event_next_to_forward_gap
  1193. ):
  1194. logger.debug(
  1195. "get_event_for_timestamp: locally, we found event_id=%s closest to timestamp=%s which is next to a gap in event history so we're asking other homeservers first",
  1196. local_event_id,
  1197. timestamp,
  1198. )
  1199. # Find other homeservers from the given state in the room
  1200. curr_state = await self.state_handler.get_current_state(room_id)
  1201. curr_domains = get_domains_from_state(curr_state)
  1202. likely_domains = [
  1203. domain for domain, depth in curr_domains if domain != self.server_name
  1204. ]
  1205. # Loop through each homeserver candidate until we get a succesful response
  1206. for domain in likely_domains:
  1207. try:
  1208. remote_response = await self.federation_client.timestamp_to_event(
  1209. domain, room_id, timestamp, direction
  1210. )
  1211. logger.debug(
  1212. "get_event_for_timestamp: response from domain(%s)=%s",
  1213. domain,
  1214. remote_response,
  1215. )
  1216. # TODO: Do we want to persist this as an extremity?
  1217. # TODO: I think ideally, we would try to backfill from
  1218. # this event and run this whole
  1219. # `get_event_for_timestamp` function again to make sure
  1220. # they didn't give us an event from their gappy history.
  1221. remote_event_id = remote_response.event_id
  1222. origin_server_ts = remote_response.origin_server_ts
  1223. # Only return the remote event if it's closer than the local event
  1224. if not local_event or (
  1225. abs(origin_server_ts - timestamp)
  1226. < abs(local_event.origin_server_ts - timestamp)
  1227. ):
  1228. return remote_event_id, origin_server_ts
  1229. except (HttpResponseException, InvalidResponseError) as ex:
  1230. # Let's not put a high priority on some other homeserver
  1231. # failing to respond or giving a random response
  1232. logger.debug(
  1233. "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
  1234. domain,
  1235. type(ex).__name__,
  1236. ex,
  1237. ex.args,
  1238. )
  1239. except Exception as ex:
  1240. # But we do want to see some exceptions in our code
  1241. logger.warning(
  1242. "Failed to fetch /timestamp_to_event from %s because of exception(%s) %s args=%s",
  1243. domain,
  1244. type(ex).__name__,
  1245. ex,
  1246. ex.args,
  1247. )
  1248. if not local_event_id:
  1249. raise SynapseError(
  1250. 404,
  1251. "Unable to find event from %s in direction %s" % (timestamp, direction),
  1252. errcode=Codes.NOT_FOUND,
  1253. )
  1254. return local_event_id, local_event.origin_server_ts
  1255. class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
  1256. def __init__(self, hs: "HomeServer"):
  1257. self.store = hs.get_datastores().main
  1258. async def get_new_events(
  1259. self,
  1260. user: UserID,
  1261. from_key: RoomStreamToken,
  1262. limit: Optional[int],
  1263. room_ids: Collection[str],
  1264. is_guest: bool,
  1265. explicit_room_id: Optional[str] = None,
  1266. ) -> Tuple[List[EventBase], RoomStreamToken]:
  1267. # We just ignore the key for now.
  1268. to_key = self.get_current_key()
  1269. if from_key.topological:
  1270. logger.warning("Stream has topological part!!!! %r", from_key)
  1271. from_key = RoomStreamToken(None, from_key.stream)
  1272. app_service = self.store.get_app_service_by_user_id(user.to_string())
  1273. if app_service:
  1274. # We no longer support AS users using /sync directly.
  1275. # See https://github.com/matrix-org/matrix-doc/issues/1144
  1276. raise NotImplementedError()
  1277. else:
  1278. room_events = await self.store.get_membership_changes_for_user(
  1279. user.to_string(), from_key, to_key
  1280. )
  1281. room_to_events = await self.store.get_room_events_stream_for_rooms(
  1282. room_ids=room_ids,
  1283. from_key=from_key,
  1284. to_key=to_key,
  1285. limit=limit or 10,
  1286. order="ASC",
  1287. )
  1288. events = list(room_events)
  1289. events.extend(e for evs, _ in room_to_events.values() for e in evs)
  1290. events.sort(key=lambda e: e.internal_metadata.order)
  1291. if limit:
  1292. events[:] = events[:limit]
  1293. if events:
  1294. end_key = events[-1].internal_metadata.after
  1295. else:
  1296. end_key = to_key
  1297. return events, end_key
  1298. def get_current_key(self) -> RoomStreamToken:
  1299. return self.store.get_room_max_token()
  1300. def get_current_key_for_room(self, room_id: str) -> Awaitable[RoomStreamToken]:
  1301. return self.store.get_current_room_stream_token_for_room_id(room_id)
  1302. class ShutdownRoomResponse(TypedDict):
  1303. """
  1304. Attributes:
  1305. kicked_users: An array of users (`user_id`) that were kicked.
  1306. failed_to_kick_users:
  1307. An array of users (`user_id`) that that were not kicked.
  1308. local_aliases:
  1309. An array of strings representing the local aliases that were
  1310. migrated from the old room to the new.
  1311. new_room_id: A string representing the room ID of the new room.
  1312. """
  1313. kicked_users: List[str]
  1314. failed_to_kick_users: List[str]
  1315. local_aliases: List[str]
  1316. new_room_id: Optional[str]
  1317. class RoomShutdownHandler:
  1318. DEFAULT_MESSAGE = (
  1319. "Sharing illegal content on this server is not permitted and rooms in"
  1320. " violation will be blocked."
  1321. )
  1322. DEFAULT_ROOM_NAME = "Content Violation Notification"
  1323. def __init__(self, hs: "HomeServer"):
  1324. self.hs = hs
  1325. self.room_member_handler = hs.get_room_member_handler()
  1326. self._room_creation_handler = hs.get_room_creation_handler()
  1327. self._replication = hs.get_replication_data_handler()
  1328. self._third_party_rules = hs.get_third_party_event_rules()
  1329. self.event_creation_handler = hs.get_event_creation_handler()
  1330. self.store = hs.get_datastores().main
  1331. async def shutdown_room(
  1332. self,
  1333. room_id: str,
  1334. requester_user_id: str,
  1335. new_room_user_id: Optional[str] = None,
  1336. new_room_name: Optional[str] = None,
  1337. message: Optional[str] = None,
  1338. block: bool = False,
  1339. ) -> ShutdownRoomResponse:
  1340. """
  1341. Shuts down a room. Moves all local users and room aliases automatically
  1342. to a new room if `new_room_user_id` is set. Otherwise local users only
  1343. leave the room without any information.
  1344. The new room will be created with the user specified by the
  1345. `new_room_user_id` parameter as room administrator and will contain a
  1346. message explaining what happened. Users invited to the new room will
  1347. have power level `-10` by default, and thus be unable to speak.
  1348. The local server will only have the power to move local user and room
  1349. aliases to the new room. Users on other servers will be unaffected.
  1350. Args:
  1351. room_id: The ID of the room to shut down.
  1352. requester_user_id:
  1353. User who requested the action and put the room on the
  1354. blocking list.
  1355. new_room_user_id:
  1356. If set, a new room will be created with this user ID
  1357. as the creator and admin, and all users in the old room will be
  1358. moved into that room. If not set, no new room will be created
  1359. and the users will just be removed from the old room.
  1360. new_room_name:
  1361. A string representing the name of the room that new users will
  1362. be invited to. Defaults to `Content Violation Notification`
  1363. message:
  1364. A string containing the first message that will be sent as
  1365. `new_room_user_id` in the new room. Ideally this will clearly
  1366. convey why the original room was shut down.
  1367. Defaults to `Sharing illegal content on this server is not
  1368. permitted and rooms in violation will be blocked.`
  1369. block:
  1370. If set to `True`, users will be prevented from joining the old
  1371. room. This option can also be used to pre-emptively block a room,
  1372. even if it's unknown to this homeserver. In this case, the room
  1373. will be blocked, and no further action will be taken. If `False`,
  1374. attempting to delete an unknown room is invalid.
  1375. Defaults to `False`.
  1376. Returns: a dict containing the following keys:
  1377. kicked_users: An array of users (`user_id`) that were kicked.
  1378. failed_to_kick_users:
  1379. An array of users (`user_id`) that that were not kicked.
  1380. local_aliases:
  1381. An array of strings representing the local aliases that were
  1382. migrated from the old room to the new.
  1383. new_room_id:
  1384. A string representing the room ID of the new room, or None if
  1385. no such room was created.
  1386. """
  1387. if not new_room_name:
  1388. new_room_name = self.DEFAULT_ROOM_NAME
  1389. if not message:
  1390. message = self.DEFAULT_MESSAGE
  1391. if not RoomID.is_valid(room_id):
  1392. raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
  1393. if not await self._third_party_rules.check_can_shutdown_room(
  1394. requester_user_id, room_id
  1395. ):
  1396. raise SynapseError(
  1397. 403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
  1398. )
  1399. # Action the block first (even if the room doesn't exist yet)
  1400. if block:
  1401. # This will work even if the room is already blocked, but that is
  1402. # desirable in case the first attempt at blocking the room failed below.
  1403. await self.store.block_room(room_id, requester_user_id)
  1404. if not await self.store.get_room(room_id):
  1405. # if we don't know about the room, there is nothing left to do.
  1406. return {
  1407. "kicked_users": [],
  1408. "failed_to_kick_users": [],
  1409. "local_aliases": [],
  1410. "new_room_id": None,
  1411. }
  1412. if new_room_user_id is not None:
  1413. if not self.hs.is_mine_id(new_room_user_id):
  1414. raise SynapseError(
  1415. 400, "User must be our own: %s" % (new_room_user_id,)
  1416. )
  1417. room_creator_requester = create_requester(
  1418. new_room_user_id, authenticated_entity=requester_user_id
  1419. )
  1420. info, stream_id = await self._room_creation_handler.create_room(
  1421. room_creator_requester,
  1422. config={
  1423. "preset": RoomCreationPreset.PUBLIC_CHAT,
  1424. "name": new_room_name,
  1425. "power_level_content_override": {"users_default": -10},
  1426. },
  1427. ratelimit=False,
  1428. )
  1429. new_room_id = info["room_id"]
  1430. logger.info(
  1431. "Shutting down room %r, joining to new room: %r", room_id, new_room_id
  1432. )
  1433. # We now wait for the create room to come back in via replication so
  1434. # that we can assume that all the joins/invites have propagated before
  1435. # we try and auto join below.
  1436. await self._replication.wait_for_stream_position(
  1437. self.hs.config.worker.events_shard_config.get_instance(new_room_id),
  1438. "events",
  1439. stream_id,
  1440. )
  1441. else:
  1442. new_room_id = None
  1443. logger.info("Shutting down room %r", room_id)
  1444. users = await self.store.get_users_in_room(room_id)
  1445. kicked_users = []
  1446. failed_to_kick_users = []
  1447. for user_id in users:
  1448. if not self.hs.is_mine_id(user_id):
  1449. continue
  1450. logger.info("Kicking %r from %r...", user_id, room_id)
  1451. try:
  1452. # Kick users from room
  1453. target_requester = create_requester(
  1454. user_id, authenticated_entity=requester_user_id
  1455. )
  1456. _, stream_id = await self.room_member_handler.update_membership(
  1457. requester=target_requester,
  1458. target=target_requester.user,
  1459. room_id=room_id,
  1460. action=Membership.LEAVE,
  1461. content={},
  1462. ratelimit=False,
  1463. require_consent=False,
  1464. )
  1465. # Wait for leave to come in over replication before trying to forget.
  1466. await self._replication.wait_for_stream_position(
  1467. self.hs.config.worker.events_shard_config.get_instance(room_id),
  1468. "events",
  1469. stream_id,
  1470. )
  1471. await self.room_member_handler.forget(target_requester.user, room_id)
  1472. # Join users to new room
  1473. if new_room_user_id:
  1474. await self.room_member_handler.update_membership(
  1475. requester=target_requester,
  1476. target=target_requester.user,
  1477. room_id=new_room_id,
  1478. action=Membership.JOIN,
  1479. content={},
  1480. ratelimit=False,
  1481. require_consent=False,
  1482. )
  1483. kicked_users.append(user_id)
  1484. except Exception:
  1485. logger.exception(
  1486. "Failed to leave old room and join new room for %r", user_id
  1487. )
  1488. failed_to_kick_users.append(user_id)
  1489. # Send message in new room and move aliases
  1490. if new_room_user_id:
  1491. await self.event_creation_handler.create_and_send_nonmember_event(
  1492. room_creator_requester,
  1493. {
  1494. "type": "m.room.message",
  1495. "content": {"body": message, "msgtype": "m.text"},
  1496. "room_id": new_room_id,
  1497. "sender": new_room_user_id,
  1498. },
  1499. ratelimit=False,
  1500. )
  1501. aliases_for_room = await self.store.get_aliases_for_room(room_id)
  1502. await self.store.update_aliases_for_room(
  1503. room_id, new_room_id, requester_user_id
  1504. )
  1505. else:
  1506. aliases_for_room = []
  1507. return {
  1508. "kicked_users": kicked_users,
  1509. "failed_to_kick_users": failed_to_kick_users,
  1510. "local_aliases": aliases_for_room,
  1511. "new_room_id": new_room_id,
  1512. }