federation_client.py 51 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461
  1. # Copyright 2015-2021 The Matrix.org Foundation C.I.C.
  2. # Copyright 2020 Sorunome
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import copy
  16. import itertools
  17. import logging
  18. from typing import (
  19. TYPE_CHECKING,
  20. Awaitable,
  21. Callable,
  22. Collection,
  23. Container,
  24. Dict,
  25. Iterable,
  26. List,
  27. Mapping,
  28. Optional,
  29. Sequence,
  30. Tuple,
  31. TypeVar,
  32. Union,
  33. )
  34. import attr
  35. from prometheus_client import Counter
  36. from synapse.api.constants import EventTypes, Membership
  37. from synapse.api.errors import (
  38. CodeMessageException,
  39. Codes,
  40. FederationDeniedError,
  41. HttpResponseException,
  42. SynapseError,
  43. UnsupportedRoomVersionError,
  44. )
  45. from synapse.api.room_versions import (
  46. KNOWN_ROOM_VERSIONS,
  47. EventFormatVersions,
  48. RoomVersion,
  49. RoomVersions,
  50. )
  51. from synapse.events import EventBase, builder
  52. from synapse.federation.federation_base import FederationBase, event_from_pdu_json
  53. from synapse.federation.transport.client import SendJoinResponse
  54. from synapse.logging.utils import log_function
  55. from synapse.types import JsonDict, get_domain_from_id
  56. from synapse.util.async_helpers import concurrently_execute
  57. from synapse.util.caches.expiringcache import ExpiringCache
  58. from synapse.util.retryutils import NotRetryingDestination
  59. if TYPE_CHECKING:
  60. from synapse.server import HomeServer
  61. logger = logging.getLogger(__name__)
  62. sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"])
  63. PDU_RETRY_TIME_MS = 1 * 60 * 1000
  64. T = TypeVar("T")
  65. class InvalidResponseError(RuntimeError):
  66. """Helper for _try_destination_list: indicates that the server returned a response
  67. we couldn't parse
  68. """
  69. @attr.s(slots=True, frozen=True, auto_attribs=True)
  70. class SendJoinResult:
  71. # The event to persist.
  72. event: EventBase
  73. # A string giving the server the event was sent to.
  74. origin: str
  75. state: List[EventBase]
  76. auth_chain: List[EventBase]
  77. class FederationClient(FederationBase):
  78. def __init__(self, hs: "HomeServer"):
  79. super().__init__(hs)
  80. self.pdu_destination_tried: Dict[str, Dict[str, int]] = {}
  81. self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
  82. self.state = hs.get_state_handler()
  83. self.transport_layer = hs.get_federation_transport_client()
  84. self.hostname = hs.hostname
  85. self.signing_key = hs.signing_key
  86. self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache(
  87. cache_name="get_pdu_cache",
  88. clock=self._clock,
  89. max_len=1000,
  90. expiry_ms=120 * 1000,
  91. reset_expiry_on_get=False,
  92. )
  93. def _clear_tried_cache(self):
  94. """Clear pdu_destination_tried cache"""
  95. now = self._clock.time_msec()
  96. old_dict = self.pdu_destination_tried
  97. self.pdu_destination_tried = {}
  98. for event_id, destination_dict in old_dict.items():
  99. destination_dict = {
  100. dest: time
  101. for dest, time in destination_dict.items()
  102. if time + PDU_RETRY_TIME_MS > now
  103. }
  104. if destination_dict:
  105. self.pdu_destination_tried[event_id] = destination_dict
  106. @log_function
  107. async def make_query(
  108. self,
  109. destination: str,
  110. query_type: str,
  111. args: dict,
  112. retry_on_dns_fail: bool = False,
  113. ignore_backoff: bool = False,
  114. ) -> JsonDict:
  115. """Sends a federation Query to a remote homeserver of the given type
  116. and arguments.
  117. Args:
  118. destination: Domain name of the remote homeserver
  119. query_type: Category of the query type; should match the
  120. handler name used in register_query_handler().
  121. args: Mapping of strings to strings containing the details
  122. of the query request.
  123. ignore_backoff: true to ignore the historical backoff data
  124. and try the request anyway.
  125. Returns:
  126. The JSON object from the response
  127. """
  128. sent_queries_counter.labels(query_type).inc()
  129. return await self.transport_layer.make_query(
  130. destination,
  131. query_type,
  132. args,
  133. retry_on_dns_fail=retry_on_dns_fail,
  134. ignore_backoff=ignore_backoff,
  135. )
  136. @log_function
  137. async def query_client_keys(
  138. self, destination: str, content: JsonDict, timeout: int
  139. ) -> JsonDict:
  140. """Query device keys for a device hosted on a remote server.
  141. Args:
  142. destination: Domain name of the remote homeserver
  143. content: The query content.
  144. Returns:
  145. The JSON object from the response
  146. """
  147. sent_queries_counter.labels("client_device_keys").inc()
  148. return await self.transport_layer.query_client_keys(
  149. destination, content, timeout
  150. )
  151. @log_function
  152. async def query_user_devices(
  153. self, destination: str, user_id: str, timeout: int = 30000
  154. ) -> JsonDict:
  155. """Query the device keys for a list of user ids hosted on a remote
  156. server.
  157. """
  158. sent_queries_counter.labels("user_devices").inc()
  159. return await self.transport_layer.query_user_devices(
  160. destination, user_id, timeout
  161. )
  162. @log_function
  163. async def claim_client_keys(
  164. self, destination: str, content: JsonDict, timeout: int
  165. ) -> JsonDict:
  166. """Claims one-time keys for a device hosted on a remote server.
  167. Args:
  168. destination: Domain name of the remote homeserver
  169. content: The query content.
  170. Returns:
  171. The JSON object from the response
  172. """
  173. sent_queries_counter.labels("client_one_time_keys").inc()
  174. return await self.transport_layer.claim_client_keys(
  175. destination, content, timeout
  176. )
  177. async def backfill(
  178. self, dest: str, room_id: str, limit: int, extremities: Iterable[str]
  179. ) -> Optional[List[EventBase]]:
  180. """Requests some more historic PDUs for the given room from the
  181. given destination server.
  182. Args:
  183. dest: The remote homeserver to ask.
  184. room_id: The room_id to backfill.
  185. limit: The maximum number of events to return.
  186. extremities: our current backwards extremities, to backfill from
  187. """
  188. logger.debug("backfill extrem=%s", extremities)
  189. # If there are no extremities then we've (probably) reached the start.
  190. if not extremities:
  191. return None
  192. transaction_data = await self.transport_layer.backfill(
  193. dest, room_id, extremities, limit
  194. )
  195. logger.debug("backfill transaction_data=%r", transaction_data)
  196. room_version = await self.store.get_room_version(room_id)
  197. pdus = [
  198. event_from_pdu_json(p, room_version, outlier=False)
  199. for p in transaction_data["pdus"]
  200. ]
  201. # Check signatures and hash of pdus, removing any from the list that fail checks
  202. pdus[:] = await self._check_sigs_and_hash_and_fetch(
  203. dest, pdus, outlier=True, room_version=room_version
  204. )
  205. return pdus
  206. async def get_pdu(
  207. self,
  208. destinations: Iterable[str],
  209. event_id: str,
  210. room_version: RoomVersion,
  211. outlier: bool = False,
  212. timeout: Optional[int] = None,
  213. ) -> Optional[EventBase]:
  214. """Requests the PDU with given origin and ID from the remote home
  215. servers.
  216. Will attempt to get the PDU from each destination in the list until
  217. one succeeds.
  218. Args:
  219. destinations: Which homeservers to query
  220. event_id: event to fetch
  221. room_version: version of the room
  222. outlier: Indicates whether the PDU is an `outlier`, i.e. if
  223. it's from an arbitrary point in the context as opposed to part
  224. of the current block of PDUs. Defaults to `False`
  225. timeout: How long to try (in ms) each destination for before
  226. moving to the next destination. None indicates no timeout.
  227. Returns:
  228. The requested PDU, or None if we were unable to find it.
  229. """
  230. # TODO: Rate limit the number of times we try and get the same event.
  231. ev = self._get_pdu_cache.get(event_id)
  232. if ev:
  233. return ev
  234. pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
  235. signed_pdu = None
  236. for destination in destinations:
  237. now = self._clock.time_msec()
  238. last_attempt = pdu_attempts.get(destination, 0)
  239. if last_attempt + PDU_RETRY_TIME_MS > now:
  240. continue
  241. try:
  242. transaction_data = await self.transport_layer.get_event(
  243. destination, event_id, timeout=timeout
  244. )
  245. logger.debug(
  246. "retrieved event id %s from %s: %r",
  247. event_id,
  248. destination,
  249. transaction_data,
  250. )
  251. pdu_list: List[EventBase] = [
  252. event_from_pdu_json(p, room_version, outlier=outlier)
  253. for p in transaction_data["pdus"]
  254. ]
  255. if pdu_list and pdu_list[0]:
  256. pdu = pdu_list[0]
  257. # Check signatures are correct.
  258. signed_pdu = await self._check_sigs_and_hash(room_version, pdu)
  259. break
  260. pdu_attempts[destination] = now
  261. except SynapseError as e:
  262. logger.info(
  263. "Failed to get PDU %s from %s because %s", event_id, destination, e
  264. )
  265. continue
  266. except NotRetryingDestination as e:
  267. logger.info(str(e))
  268. continue
  269. except FederationDeniedError as e:
  270. logger.info(str(e))
  271. continue
  272. except Exception as e:
  273. pdu_attempts[destination] = now
  274. logger.info(
  275. "Failed to get PDU %s from %s because %s", event_id, destination, e
  276. )
  277. continue
  278. if signed_pdu:
  279. self._get_pdu_cache[event_id] = signed_pdu
  280. return signed_pdu
  281. async def get_room_state_ids(
  282. self, destination: str, room_id: str, event_id: str
  283. ) -> Tuple[List[str], List[str]]:
  284. """Calls the /state_ids endpoint to fetch the state at a particular point
  285. in the room, and the auth events for the given event
  286. Returns:
  287. a tuple of (state event_ids, auth event_ids)
  288. """
  289. result = await self.transport_layer.get_room_state_ids(
  290. destination, room_id, event_id=event_id
  291. )
  292. state_event_ids = result["pdu_ids"]
  293. auth_event_ids = result.get("auth_chain_ids", [])
  294. if not isinstance(state_event_ids, list) or not isinstance(
  295. auth_event_ids, list
  296. ):
  297. raise Exception("invalid response from /state_ids")
  298. return state_event_ids, auth_event_ids
  299. async def _check_sigs_and_hash_and_fetch(
  300. self,
  301. origin: str,
  302. pdus: Collection[EventBase],
  303. room_version: RoomVersion,
  304. outlier: bool = False,
  305. ) -> List[EventBase]:
  306. """Takes a list of PDUs and checks the signatures and hashes of each
  307. one. If a PDU fails its signature check then we check if we have it in
  308. the database and if not then request if from the originating server of
  309. that PDU.
  310. If a PDU fails its content hash check then it is redacted.
  311. The given list of PDUs are not modified, instead the function returns
  312. a new list.
  313. Args:
  314. origin
  315. pdu
  316. room_version
  317. outlier: Whether the events are outliers or not
  318. Returns:
  319. A list of PDUs that have valid signatures and hashes.
  320. """
  321. # We limit how many PDUs we check at once, as if we try to do hundreds
  322. # of thousands of PDUs at once we see large memory spikes.
  323. valid_pdus = []
  324. async def _execute(pdu: EventBase) -> None:
  325. valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
  326. pdu=pdu,
  327. origin=origin,
  328. outlier=outlier,
  329. room_version=room_version,
  330. )
  331. if valid_pdu:
  332. valid_pdus.append(valid_pdu)
  333. await concurrently_execute(_execute, pdus, 10000)
  334. return valid_pdus
  335. async def _check_sigs_and_hash_and_fetch_one(
  336. self,
  337. pdu: EventBase,
  338. origin: str,
  339. room_version: RoomVersion,
  340. outlier: bool = False,
  341. ) -> Optional[EventBase]:
  342. """Takes a PDU and checks its signatures and hashes. If the PDU fails
  343. its signature check then we check if we have it in the database and if
  344. not then request if from the originating server of that PDU.
  345. If then PDU fails its content hash check then it is redacted.
  346. Args:
  347. origin
  348. pdu
  349. room_version
  350. outlier: Whether the events are outliers or not
  351. include_none: Whether to include None in the returned list
  352. for events that have failed their checks
  353. Returns:
  354. The PDU (possibly redacted) if it has valid signatures and hashes.
  355. """
  356. res = None
  357. try:
  358. res = await self._check_sigs_and_hash(room_version, pdu)
  359. except SynapseError:
  360. pass
  361. if not res:
  362. # Check local db.
  363. res = await self.store.get_event(
  364. pdu.event_id, allow_rejected=True, allow_none=True
  365. )
  366. pdu_origin = get_domain_from_id(pdu.sender)
  367. if not res and pdu_origin != origin:
  368. try:
  369. res = await self.get_pdu(
  370. destinations=[pdu_origin],
  371. event_id=pdu.event_id,
  372. room_version=room_version,
  373. outlier=outlier,
  374. timeout=10000,
  375. )
  376. except SynapseError:
  377. pass
  378. if not res:
  379. logger.warning(
  380. "Failed to find copy of %s with valid signature", pdu.event_id
  381. )
  382. return res
  383. async def get_event_auth(
  384. self, destination: str, room_id: str, event_id: str
  385. ) -> List[EventBase]:
  386. res = await self.transport_layer.get_event_auth(destination, room_id, event_id)
  387. room_version = await self.store.get_room_version(room_id)
  388. auth_chain = [
  389. event_from_pdu_json(p, room_version, outlier=True)
  390. for p in res["auth_chain"]
  391. ]
  392. signed_auth = await self._check_sigs_and_hash_and_fetch(
  393. destination, auth_chain, outlier=True, room_version=room_version
  394. )
  395. signed_auth.sort(key=lambda e: e.depth)
  396. return signed_auth
  397. def _is_unknown_endpoint(
  398. self, e: HttpResponseException, synapse_error: Optional[SynapseError] = None
  399. ) -> bool:
  400. """
  401. Returns true if the response was due to an endpoint being unimplemented.
  402. Args:
  403. e: The error response received from the remote server.
  404. synapse_error: The above error converted to a SynapseError. This is
  405. automatically generated if not provided.
  406. """
  407. if synapse_error is None:
  408. synapse_error = e.to_synapse_error()
  409. # There is no good way to detect an "unknown" endpoint.
  410. #
  411. # Dendrite returns a 404 (with no body); synapse returns a 400
  412. # with M_UNRECOGNISED.
  413. return e.code == 404 or (
  414. e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
  415. )
  416. async def _try_destination_list(
  417. self,
  418. description: str,
  419. destinations: Iterable[str],
  420. callback: Callable[[str], Awaitable[T]],
  421. failover_errcodes: Optional[Container[str]] = None,
  422. failover_on_unknown_endpoint: bool = False,
  423. ) -> T:
  424. """Try an operation on a series of servers, until it succeeds
  425. Args:
  426. description: description of the operation we're doing, for logging
  427. destinations: list of server_names to try
  428. callback: Function to run for each server. Passed a single
  429. argument: the server_name to try.
  430. If the callback raises a CodeMessageException with a 300/400 code or
  431. an UnsupportedRoomVersionError, attempts to perform the operation
  432. stop immediately and the exception is reraised.
  433. Otherwise, if the callback raises an Exception the error is logged and the
  434. next server tried. Normally the stacktrace is logged but this is
  435. suppressed if the exception is an InvalidResponseError.
  436. failover_errcodes: Error codes (specific to this endpoint) which should
  437. cause a failover when received as part of an HTTP 400 error.
  438. failover_on_unknown_endpoint: if True, we will try other servers if it looks
  439. like a server doesn't support the endpoint. This is typically useful
  440. if the endpoint in question is new or experimental.
  441. Returns:
  442. The result of callback, if it succeeds
  443. Raises:
  444. SynapseError if the chosen remote server returns a 300/400 code, or
  445. no servers were reachable.
  446. """
  447. if failover_errcodes is None:
  448. failover_errcodes = ()
  449. for destination in destinations:
  450. if destination == self.server_name:
  451. continue
  452. try:
  453. return await callback(destination)
  454. except InvalidResponseError as e:
  455. logger.warning("Failed to %s via %s: %s", description, destination, e)
  456. except UnsupportedRoomVersionError:
  457. raise
  458. except HttpResponseException as e:
  459. synapse_error = e.to_synapse_error()
  460. failover = False
  461. # Failover should occur:
  462. #
  463. # * On internal server errors.
  464. # * If the destination responds that it cannot complete the request.
  465. # * If the destination doesn't implemented the endpoint for some reason.
  466. if 500 <= e.code < 600:
  467. failover = True
  468. elif e.code == 400 and synapse_error.errcode in failover_errcodes:
  469. failover = True
  470. elif failover_on_unknown_endpoint and self._is_unknown_endpoint(
  471. e, synapse_error
  472. ):
  473. failover = True
  474. if not failover:
  475. raise synapse_error from e
  476. logger.warning(
  477. "Failed to %s via %s: %i %s",
  478. description,
  479. destination,
  480. e.code,
  481. e.args[0],
  482. )
  483. except Exception:
  484. logger.warning(
  485. "Failed to %s via %s", description, destination, exc_info=True
  486. )
  487. raise SynapseError(502, "Failed to %s via any server" % (description,))
  488. async def make_membership_event(
  489. self,
  490. destinations: Iterable[str],
  491. room_id: str,
  492. user_id: str,
  493. membership: str,
  494. content: dict,
  495. params: Optional[Mapping[str, Union[str, Iterable[str]]]],
  496. ) -> Tuple[str, EventBase, RoomVersion]:
  497. """
  498. Creates an m.room.member event, with context, without participating in the room.
  499. Does so by asking one of the already participating servers to create an
  500. event with proper context.
  501. Returns a fully signed and hashed event.
  502. Note that this does not append any events to any graphs.
  503. Args:
  504. destinations: Candidate homeservers which are probably
  505. participating in the room.
  506. room_id: The room in which the event will happen.
  507. user_id: The user whose membership is being evented.
  508. membership: The "membership" property of the event. Must be one of
  509. "join" or "leave".
  510. content: Any additional data to put into the content field of the
  511. event.
  512. params: Query parameters to include in the request.
  513. Returns:
  514. `(origin, event, room_version)` where origin is the remote
  515. homeserver which generated the event, and room_version is the
  516. version of the room.
  517. Raises:
  518. UnsupportedRoomVersionError: if remote responds with
  519. a room version we don't understand.
  520. SynapseError: if the chosen remote server returns a 300/400 code, or
  521. no servers successfully handle the request.
  522. """
  523. valid_memberships = {Membership.JOIN, Membership.LEAVE, Membership.KNOCK}
  524. if membership not in valid_memberships:
  525. raise RuntimeError(
  526. "make_membership_event called with membership='%s', must be one of %s"
  527. % (membership, ",".join(valid_memberships))
  528. )
  529. async def send_request(destination: str) -> Tuple[str, EventBase, RoomVersion]:
  530. ret = await self.transport_layer.make_membership_event(
  531. destination, room_id, user_id, membership, params
  532. )
  533. # Note: If not supplied, the room version may be either v1 or v2,
  534. # however either way the event format version will be v1.
  535. room_version_id = ret.get("room_version", RoomVersions.V1.identifier)
  536. room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
  537. if not room_version:
  538. raise UnsupportedRoomVersionError()
  539. if not room_version.msc2403_knocking and membership == Membership.KNOCK:
  540. raise SynapseError(
  541. 400,
  542. "This room version does not support knocking",
  543. errcode=Codes.FORBIDDEN,
  544. )
  545. pdu_dict = ret.get("event", None)
  546. if not isinstance(pdu_dict, dict):
  547. raise InvalidResponseError("Bad 'event' field in response")
  548. logger.debug("Got response to make_%s: %s", membership, pdu_dict)
  549. pdu_dict["content"].update(content)
  550. # The protoevent received over the JSON wire may not have all
  551. # the required fields. Lets just gloss over that because
  552. # there's some we never care about
  553. if "prev_state" not in pdu_dict:
  554. pdu_dict["prev_state"] = []
  555. ev = builder.create_local_event_from_event_dict(
  556. self._clock,
  557. self.hostname,
  558. self.signing_key,
  559. room_version=room_version,
  560. event_dict=pdu_dict,
  561. )
  562. return destination, ev, room_version
  563. # MSC3083 defines additional error codes for room joins. Unfortunately
  564. # we do not yet know the room version, assume these will only be returned
  565. # by valid room versions.
  566. failover_errcodes = (
  567. (Codes.UNABLE_AUTHORISE_JOIN, Codes.UNABLE_TO_GRANT_JOIN)
  568. if membership == Membership.JOIN
  569. else None
  570. )
  571. return await self._try_destination_list(
  572. "make_" + membership,
  573. destinations,
  574. send_request,
  575. failover_errcodes=failover_errcodes,
  576. )
  577. async def send_join(
  578. self, destinations: Iterable[str], pdu: EventBase, room_version: RoomVersion
  579. ) -> SendJoinResult:
  580. """Sends a join event to one of a list of homeservers.
  581. Doing so will cause the remote server to add the event to the graph,
  582. and send the event out to the rest of the federation.
  583. Args:
  584. destinations: Candidate homeservers which are probably
  585. participating in the room.
  586. pdu: event to be sent
  587. room_version: the version of the room (according to the server that
  588. did the make_join)
  589. Returns:
  590. The result of the send join request.
  591. Raises:
  592. SynapseError: if the chosen remote server returns a 300/400 code, or
  593. no servers successfully handle the request.
  594. """
  595. async def send_request(destination) -> SendJoinResult:
  596. response = await self._do_send_join(room_version, destination, pdu)
  597. # If an event was returned (and expected to be returned):
  598. #
  599. # * Ensure it has the same event ID (note that the event ID is a hash
  600. # of the event fields for versions which support MSC3083).
  601. # * Ensure the signatures are good.
  602. #
  603. # Otherwise, fallback to the provided event.
  604. if room_version.msc3083_join_rules and response.event:
  605. event = response.event
  606. valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
  607. pdu=event,
  608. origin=destination,
  609. outlier=True,
  610. room_version=room_version,
  611. )
  612. if valid_pdu is None or event.event_id != pdu.event_id:
  613. raise InvalidResponseError("Returned an invalid join event")
  614. else:
  615. event = pdu
  616. state = response.state
  617. auth_chain = response.auth_events
  618. create_event = None
  619. for e in state:
  620. if (e.type, e.state_key) == (EventTypes.Create, ""):
  621. create_event = e
  622. break
  623. if create_event is None:
  624. # If the state doesn't have a create event then the room is
  625. # invalid, and it would fail auth checks anyway.
  626. raise InvalidResponseError("No create event in state")
  627. # the room version should be sane.
  628. create_room_version = create_event.content.get(
  629. "room_version", RoomVersions.V1.identifier
  630. )
  631. if create_room_version != room_version.identifier:
  632. # either the server that fulfilled the make_join, or the server that is
  633. # handling the send_join, is lying.
  634. raise InvalidResponseError(
  635. "Unexpected room version %s in create event"
  636. % (create_room_version,)
  637. )
  638. logger.info(
  639. "Processing from send_join %d events", len(state) + len(auth_chain)
  640. )
  641. # We now go and check the signatures and hashes for the event. Note
  642. # that we limit how many events we process at a time to keep the
  643. # memory overhead from exploding.
  644. valid_pdus_map: Dict[str, EventBase] = {}
  645. async def _execute(pdu: EventBase) -> None:
  646. valid_pdu = await self._check_sigs_and_hash_and_fetch_one(
  647. pdu=pdu,
  648. origin=destination,
  649. outlier=True,
  650. room_version=room_version,
  651. )
  652. if valid_pdu:
  653. valid_pdus_map[valid_pdu.event_id] = valid_pdu
  654. await concurrently_execute(
  655. _execute, itertools.chain(state, auth_chain), 10000
  656. )
  657. # NB: We *need* to copy to ensure that we don't have multiple
  658. # references being passed on, as that causes... issues.
  659. signed_state = [
  660. copy.copy(valid_pdus_map[p.event_id])
  661. for p in state
  662. if p.event_id in valid_pdus_map
  663. ]
  664. signed_auth = [
  665. valid_pdus_map[p.event_id]
  666. for p in auth_chain
  667. if p.event_id in valid_pdus_map
  668. ]
  669. # NB: We *need* to copy to ensure that we don't have multiple
  670. # references being passed on, as that causes... issues.
  671. for s in signed_state:
  672. s.internal_metadata = copy.deepcopy(s.internal_metadata)
  673. # double-check that the same create event has ended up in the auth chain
  674. auth_chain_create_events = [
  675. e.event_id
  676. for e in signed_auth
  677. if (e.type, e.state_key) == (EventTypes.Create, "")
  678. ]
  679. if auth_chain_create_events != [create_event.event_id]:
  680. raise InvalidResponseError(
  681. "Unexpected create event(s) in auth chain: %s"
  682. % (auth_chain_create_events,)
  683. )
  684. return SendJoinResult(
  685. event=event,
  686. state=signed_state,
  687. auth_chain=signed_auth,
  688. origin=destination,
  689. )
  690. # MSC3083 defines additional error codes for room joins.
  691. failover_errcodes = None
  692. if room_version.msc3083_join_rules:
  693. failover_errcodes = (
  694. Codes.UNABLE_AUTHORISE_JOIN,
  695. Codes.UNABLE_TO_GRANT_JOIN,
  696. )
  697. # If the join is being authorised via allow rules, we need to send
  698. # the /send_join back to the same server that was originally used
  699. # with /make_join.
  700. if "join_authorised_via_users_server" in pdu.content:
  701. destinations = [
  702. get_domain_from_id(pdu.content["join_authorised_via_users_server"])
  703. ]
  704. return await self._try_destination_list(
  705. "send_join", destinations, send_request, failover_errcodes=failover_errcodes
  706. )
  707. async def _do_send_join(
  708. self, room_version: RoomVersion, destination: str, pdu: EventBase
  709. ) -> SendJoinResponse:
  710. time_now = self._clock.time_msec()
  711. try:
  712. return await self.transport_layer.send_join_v2(
  713. room_version=room_version,
  714. destination=destination,
  715. room_id=pdu.room_id,
  716. event_id=pdu.event_id,
  717. content=pdu.get_pdu_json(time_now),
  718. )
  719. except HttpResponseException as e:
  720. # If an error is received that is due to an unrecognised endpoint,
  721. # fallback to the v1 endpoint. Otherwise consider it a legitmate error
  722. # and raise.
  723. if not self._is_unknown_endpoint(e):
  724. raise
  725. logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
  726. return await self.transport_layer.send_join_v1(
  727. room_version=room_version,
  728. destination=destination,
  729. room_id=pdu.room_id,
  730. event_id=pdu.event_id,
  731. content=pdu.get_pdu_json(time_now),
  732. )
  733. async def send_invite(
  734. self,
  735. destination: str,
  736. room_id: str,
  737. event_id: str,
  738. pdu: EventBase,
  739. ) -> EventBase:
  740. room_version = await self.store.get_room_version(room_id)
  741. content = await self._do_send_invite(destination, pdu, room_version)
  742. pdu_dict = content["event"]
  743. logger.debug("Got response to send_invite: %s", pdu_dict)
  744. pdu = event_from_pdu_json(pdu_dict, room_version)
  745. # Check signatures are correct.
  746. pdu = await self._check_sigs_and_hash(room_version, pdu)
  747. # FIXME: We should handle signature failures more gracefully.
  748. return pdu
  749. async def _do_send_invite(
  750. self, destination: str, pdu: EventBase, room_version: RoomVersion
  751. ) -> JsonDict:
  752. """Actually sends the invite, first trying v2 API and falling back to
  753. v1 API if necessary.
  754. Returns:
  755. The event as a dict as returned by the remote server
  756. Raises:
  757. SynapseError: if the remote server returns an error or if the server
  758. only supports the v1 endpoint and a room version other than "1"
  759. or "2" is requested.
  760. """
  761. time_now = self._clock.time_msec()
  762. try:
  763. return await self.transport_layer.send_invite_v2(
  764. destination=destination,
  765. room_id=pdu.room_id,
  766. event_id=pdu.event_id,
  767. content={
  768. "event": pdu.get_pdu_json(time_now),
  769. "room_version": room_version.identifier,
  770. "invite_room_state": pdu.unsigned.get("invite_room_state", []),
  771. },
  772. )
  773. except HttpResponseException as e:
  774. # If an error is received that is due to an unrecognised endpoint,
  775. # fallback to the v1 endpoint if the room uses old-style event IDs.
  776. # Otherwise consider it a legitmate error and raise.
  777. err = e.to_synapse_error()
  778. if self._is_unknown_endpoint(e, err):
  779. if room_version.event_format != EventFormatVersions.V1:
  780. raise SynapseError(
  781. 400,
  782. "User's homeserver does not support this room version",
  783. Codes.UNSUPPORTED_ROOM_VERSION,
  784. )
  785. else:
  786. raise err
  787. # Didn't work, try v1 API.
  788. # Note the v1 API returns a tuple of `(200, content)`
  789. _, content = await self.transport_layer.send_invite_v1(
  790. destination=destination,
  791. room_id=pdu.room_id,
  792. event_id=pdu.event_id,
  793. content=pdu.get_pdu_json(time_now),
  794. )
  795. return content
  796. async def send_leave(self, destinations: Iterable[str], pdu: EventBase) -> None:
  797. """Sends a leave event to one of a list of homeservers.
  798. Doing so will cause the remote server to add the event to the graph,
  799. and send the event out to the rest of the federation.
  800. This is mostly useful to reject received invites.
  801. Args:
  802. destinations: Candidate homeservers which are probably
  803. participating in the room.
  804. pdu: event to be sent
  805. Raises:
  806. SynapseError: if the chosen remote server returns a 300/400 code, or
  807. no servers successfully handle the request.
  808. """
  809. async def send_request(destination: str) -> None:
  810. content = await self._do_send_leave(destination, pdu)
  811. logger.debug("Got content: %s", content)
  812. return await self._try_destination_list(
  813. "send_leave", destinations, send_request
  814. )
  815. async def _do_send_leave(self, destination: str, pdu: EventBase) -> JsonDict:
  816. time_now = self._clock.time_msec()
  817. try:
  818. return await self.transport_layer.send_leave_v2(
  819. destination=destination,
  820. room_id=pdu.room_id,
  821. event_id=pdu.event_id,
  822. content=pdu.get_pdu_json(time_now),
  823. )
  824. except HttpResponseException as e:
  825. # If an error is received that is due to an unrecognised endpoint,
  826. # fallback to the v1 endpoint. Otherwise consider it a legitmate error
  827. # and raise.
  828. if not self._is_unknown_endpoint(e):
  829. raise
  830. logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")
  831. resp = await self.transport_layer.send_leave_v1(
  832. destination=destination,
  833. room_id=pdu.room_id,
  834. event_id=pdu.event_id,
  835. content=pdu.get_pdu_json(time_now),
  836. )
  837. # We expect the v1 API to respond with [200, content], so we only return the
  838. # content.
  839. return resp[1]
  840. async def send_knock(self, destinations: List[str], pdu: EventBase) -> JsonDict:
  841. """Attempts to send a knock event to given a list of servers. Iterates
  842. through the list until one attempt succeeds.
  843. Doing so will cause the remote server to add the event to the graph,
  844. and send the event out to the rest of the federation.
  845. Args:
  846. destinations: A list of candidate homeservers which are likely to be
  847. participating in the room.
  848. pdu: The event to be sent.
  849. Returns:
  850. The remote homeserver return some state from the room. The response
  851. dictionary is in the form:
  852. {"knock_state_events": [<state event dict>, ...]}
  853. The list of state events may be empty.
  854. Raises:
  855. SynapseError: If the chosen remote server returns a 3xx/4xx code.
  856. RuntimeError: If no servers were reachable.
  857. """
  858. async def send_request(destination: str) -> JsonDict:
  859. return await self._do_send_knock(destination, pdu)
  860. return await self._try_destination_list(
  861. "send_knock", destinations, send_request
  862. )
  863. async def _do_send_knock(self, destination: str, pdu: EventBase) -> JsonDict:
  864. """Send a knock event to a remote homeserver.
  865. Args:
  866. destination: The homeserver to send to.
  867. pdu: The event to send.
  868. Returns:
  869. The remote homeserver can optionally return some state from the room. The response
  870. dictionary is in the form:
  871. {"knock_state_events": [<state event dict>, ...]}
  872. The list of state events may be empty.
  873. """
  874. time_now = self._clock.time_msec()
  875. return await self.transport_layer.send_knock_v1(
  876. destination=destination,
  877. room_id=pdu.room_id,
  878. event_id=pdu.event_id,
  879. content=pdu.get_pdu_json(time_now),
  880. )
  881. async def get_public_rooms(
  882. self,
  883. remote_server: str,
  884. limit: Optional[int] = None,
  885. since_token: Optional[str] = None,
  886. search_filter: Optional[Dict] = None,
  887. include_all_networks: bool = False,
  888. third_party_instance_id: Optional[str] = None,
  889. ) -> JsonDict:
  890. """Get the list of public rooms from a remote homeserver
  891. Args:
  892. remote_server: The name of the remote server
  893. limit: Maximum amount of rooms to return
  894. since_token: Used for result pagination
  895. search_filter: A filter dictionary to send the remote homeserver
  896. and filter the result set
  897. include_all_networks: Whether to include results from all third party instances
  898. third_party_instance_id: Whether to only include results from a specific third
  899. party instance
  900. Returns:
  901. The response from the remote server.
  902. Raises:
  903. HttpResponseException / RequestSendFailed: There was an exception
  904. returned from the remote server
  905. SynapseException: M_FORBIDDEN when the remote server has disallowed publicRoom
  906. requests over federation
  907. """
  908. return await self.transport_layer.get_public_rooms(
  909. remote_server,
  910. limit,
  911. since_token,
  912. search_filter,
  913. include_all_networks=include_all_networks,
  914. third_party_instance_id=third_party_instance_id,
  915. )
  916. async def get_missing_events(
  917. self,
  918. destination: str,
  919. room_id: str,
  920. earliest_events_ids: Iterable[str],
  921. latest_events: Iterable[EventBase],
  922. limit: int,
  923. min_depth: int,
  924. timeout: int,
  925. ) -> List[EventBase]:
  926. """Tries to fetch events we are missing. This is called when we receive
  927. an event without having received all of its ancestors.
  928. Args:
  929. destination
  930. room_id
  931. earliest_events_ids: List of event ids. Effectively the
  932. events we expected to receive, but haven't. `get_missing_events`
  933. should only return events that didn't happen before these.
  934. latest_events: List of events we have received that we don't
  935. have all previous events for.
  936. limit: Maximum number of events to return.
  937. min_depth: Minimum depth of events to return.
  938. timeout: Max time to wait in ms
  939. """
  940. try:
  941. content = await self.transport_layer.get_missing_events(
  942. destination=destination,
  943. room_id=room_id,
  944. earliest_events=earliest_events_ids,
  945. latest_events=[e.event_id for e in latest_events],
  946. limit=limit,
  947. min_depth=min_depth,
  948. timeout=timeout,
  949. )
  950. room_version = await self.store.get_room_version(room_id)
  951. events = [
  952. event_from_pdu_json(e, room_version) for e in content.get("events", [])
  953. ]
  954. signed_events = await self._check_sigs_and_hash_and_fetch(
  955. destination, events, outlier=False, room_version=room_version
  956. )
  957. except HttpResponseException as e:
  958. if not e.code == 400:
  959. raise
  960. # We are probably hitting an old server that doesn't support
  961. # get_missing_events
  962. signed_events = []
  963. return signed_events
  964. async def forward_third_party_invite(
  965. self, destinations: Iterable[str], room_id: str, event_dict: JsonDict
  966. ) -> None:
  967. for destination in destinations:
  968. if destination == self.server_name:
  969. continue
  970. try:
  971. await self.transport_layer.exchange_third_party_invite(
  972. destination=destination, room_id=room_id, event_dict=event_dict
  973. )
  974. return
  975. except CodeMessageException:
  976. raise
  977. except Exception as e:
  978. logger.exception(
  979. "Failed to send_third_party_invite via %s: %s", destination, str(e)
  980. )
  981. raise RuntimeError("Failed to send to any server.")
  982. async def get_room_complexity(
  983. self, destination: str, room_id: str
  984. ) -> Optional[JsonDict]:
  985. """
  986. Fetch the complexity of a remote room from another server.
  987. Args:
  988. destination: The remote server
  989. room_id: The room ID to ask about.
  990. Returns:
  991. Dict contains the complexity metric versions, while None means we
  992. could not fetch the complexity.
  993. """
  994. try:
  995. return await self.transport_layer.get_room_complexity(
  996. destination=destination, room_id=room_id
  997. )
  998. except CodeMessageException as e:
  999. # We didn't manage to get it -- probably a 404. We are okay if other
  1000. # servers don't give it to us.
  1001. logger.debug(
  1002. "Failed to fetch room complexity via %s for %s, got a %d",
  1003. destination,
  1004. room_id,
  1005. e.code,
  1006. )
  1007. except Exception:
  1008. logger.exception(
  1009. "Failed to fetch room complexity via %s for %s", destination, room_id
  1010. )
  1011. # If we don't manage to find it, return None. It's not an error if a
  1012. # server doesn't give it to us.
  1013. return None
  1014. async def get_space_summary(
  1015. self,
  1016. destinations: Iterable[str],
  1017. room_id: str,
  1018. suggested_only: bool,
  1019. max_rooms_per_space: Optional[int],
  1020. exclude_rooms: List[str],
  1021. ) -> "FederationSpaceSummaryResult":
  1022. """
  1023. Call other servers to get a summary of the given space
  1024. Args:
  1025. destinations: The remote servers. We will try them in turn, omitting any
  1026. that have been blacklisted.
  1027. room_id: ID of the space to be queried
  1028. suggested_only: If true, ask the remote server to only return children
  1029. with the "suggested" flag set
  1030. max_rooms_per_space: A limit on the number of children to return for each
  1031. space
  1032. exclude_rooms: A list of room IDs to tell the remote server to skip
  1033. Returns:
  1034. a parsed FederationSpaceSummaryResult
  1035. Raises:
  1036. SynapseError if we were unable to get a valid summary from any of the
  1037. remote servers
  1038. """
  1039. async def send_request(destination: str) -> FederationSpaceSummaryResult:
  1040. res = await self.transport_layer.get_space_summary(
  1041. destination=destination,
  1042. room_id=room_id,
  1043. suggested_only=suggested_only,
  1044. max_rooms_per_space=max_rooms_per_space,
  1045. exclude_rooms=exclude_rooms,
  1046. )
  1047. try:
  1048. return FederationSpaceSummaryResult.from_json_dict(res)
  1049. except ValueError as e:
  1050. raise InvalidResponseError(str(e))
  1051. return await self._try_destination_list(
  1052. "fetch space summary",
  1053. destinations,
  1054. send_request,
  1055. failover_on_unknown_endpoint=True,
  1056. )
  1057. async def get_room_hierarchy(
  1058. self,
  1059. destinations: Iterable[str],
  1060. room_id: str,
  1061. suggested_only: bool,
  1062. ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]:
  1063. """
  1064. Call other servers to get a hierarchy of the given room.
  1065. Performs simple data validates and parsing of the response.
  1066. Args:
  1067. destinations: The remote servers. We will try them in turn, omitting any
  1068. that have been blacklisted.
  1069. room_id: ID of the space to be queried
  1070. suggested_only: If true, ask the remote server to only return children
  1071. with the "suggested" flag set
  1072. Returns:
  1073. A tuple of:
  1074. The room as a JSON dictionary.
  1075. A list of children rooms, as JSON dictionaries.
  1076. A list of inaccessible children room IDs.
  1077. Raises:
  1078. SynapseError if we were unable to get a valid summary from any of the
  1079. remote servers
  1080. """
  1081. async def send_request(
  1082. destination: str,
  1083. ) -> Tuple[JsonDict, Sequence[JsonDict], Sequence[str]]:
  1084. res = await self.transport_layer.get_room_hierarchy(
  1085. destination=destination,
  1086. room_id=room_id,
  1087. suggested_only=suggested_only,
  1088. )
  1089. room = res.get("room")
  1090. if not isinstance(room, dict):
  1091. raise InvalidResponseError("'room' must be a dict")
  1092. # Validate children_state of the room.
  1093. children_state = room.get("children_state", [])
  1094. if not isinstance(children_state, Sequence):
  1095. raise InvalidResponseError("'room.children_state' must be a list")
  1096. if any(not isinstance(e, dict) for e in children_state):
  1097. raise InvalidResponseError("Invalid event in 'children_state' list")
  1098. try:
  1099. [
  1100. FederationSpaceSummaryEventResult.from_json_dict(e)
  1101. for e in children_state
  1102. ]
  1103. except ValueError as e:
  1104. raise InvalidResponseError(str(e))
  1105. # Validate the children rooms.
  1106. children = res.get("children", [])
  1107. if not isinstance(children, Sequence):
  1108. raise InvalidResponseError("'children' must be a list")
  1109. if any(not isinstance(r, dict) for r in children):
  1110. raise InvalidResponseError("Invalid room in 'children' list")
  1111. # Validate the inaccessible children.
  1112. inaccessible_children = res.get("inaccessible_children", [])
  1113. if not isinstance(inaccessible_children, Sequence):
  1114. raise InvalidResponseError("'inaccessible_children' must be a list")
  1115. if any(not isinstance(r, str) for r in inaccessible_children):
  1116. raise InvalidResponseError(
  1117. "Invalid room ID in 'inaccessible_children' list"
  1118. )
  1119. return room, children, inaccessible_children
  1120. # TODO Fallback to the old federation API and translate the results.
  1121. return await self._try_destination_list(
  1122. "fetch room hierarchy",
  1123. destinations,
  1124. send_request,
  1125. failover_on_unknown_endpoint=True,
  1126. )
  1127. @attr.s(frozen=True, slots=True, auto_attribs=True)
  1128. class FederationSpaceSummaryEventResult:
  1129. """Represents a single event in the result of a successful get_space_summary call.
  1130. It's essentially just a serialised event object, but we do a bit of parsing and
  1131. validation in `from_json_dict` and store some of the validated properties in
  1132. object attributes.
  1133. """
  1134. event_type: str
  1135. room_id: str
  1136. state_key: str
  1137. via: Sequence[str]
  1138. # the raw data, including the above keys
  1139. data: JsonDict
  1140. @classmethod
  1141. def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryEventResult":
  1142. """Parse an event within the result of a /spaces/ request
  1143. Args:
  1144. d: json object to be parsed
  1145. Raises:
  1146. ValueError if d is not a valid event
  1147. """
  1148. event_type = d.get("type")
  1149. if not isinstance(event_type, str):
  1150. raise ValueError("Invalid event: 'event_type' must be a str")
  1151. room_id = d.get("room_id")
  1152. if not isinstance(room_id, str):
  1153. raise ValueError("Invalid event: 'room_id' must be a str")
  1154. state_key = d.get("state_key")
  1155. if not isinstance(state_key, str):
  1156. raise ValueError("Invalid event: 'state_key' must be a str")
  1157. content = d.get("content")
  1158. if not isinstance(content, dict):
  1159. raise ValueError("Invalid event: 'content' must be a dict")
  1160. via = content.get("via")
  1161. if not isinstance(via, Sequence):
  1162. raise ValueError("Invalid event: 'via' must be a list")
  1163. if any(not isinstance(v, str) for v in via):
  1164. raise ValueError("Invalid event: 'via' must be a list of strings")
  1165. return cls(event_type, room_id, state_key, via, d)
  1166. @attr.s(frozen=True, slots=True, auto_attribs=True)
  1167. class FederationSpaceSummaryResult:
  1168. """Represents the data returned by a successful get_space_summary call."""
  1169. rooms: Sequence[JsonDict]
  1170. events: Sequence[FederationSpaceSummaryEventResult]
  1171. @classmethod
  1172. def from_json_dict(cls, d: JsonDict) -> "FederationSpaceSummaryResult":
  1173. """Parse the result of a /spaces/ request
  1174. Args:
  1175. d: json object to be parsed
  1176. Raises:
  1177. ValueError if d is not a valid /spaces/ response
  1178. """
  1179. rooms = d.get("rooms")
  1180. if not isinstance(rooms, Sequence):
  1181. raise ValueError("'rooms' must be a list")
  1182. if any(not isinstance(r, dict) for r in rooms):
  1183. raise ValueError("Invalid room in 'rooms' list")
  1184. events = d.get("events")
  1185. if not isinstance(events, Sequence):
  1186. raise ValueError("'events' must be a list")
  1187. if any(not isinstance(e, dict) for e in events):
  1188. raise ValueError("Invalid event in 'events' list")
  1189. parsed_events = [
  1190. FederationSpaceSummaryEventResult.from_json_dict(e) for e in events
  1191. ]
  1192. return cls(rooms, parsed_events)