client.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994
  1. # Copyright 2014-2022 The Matrix.org Foundation C.I.C.
  2. # Copyright 2020 Sorunome
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import urllib
  17. from typing import (
  18. Any,
  19. Callable,
  20. Collection,
  21. Dict,
  22. Generator,
  23. Iterable,
  24. List,
  25. Mapping,
  26. Optional,
  27. Tuple,
  28. Union,
  29. )
  30. import attr
  31. import ijson
  32. from synapse.api.constants import Membership
  33. from synapse.api.errors import Codes, HttpResponseException, SynapseError
  34. from synapse.api.room_versions import RoomVersion
  35. from synapse.api.urls import (
  36. FEDERATION_UNSTABLE_PREFIX,
  37. FEDERATION_V1_PREFIX,
  38. FEDERATION_V2_PREFIX,
  39. )
  40. from synapse.events import EventBase, make_event_from_dict
  41. from synapse.federation.units import Transaction
  42. from synapse.http.matrixfederationclient import ByteParser
  43. from synapse.http.types import QueryParams
  44. from synapse.types import JsonDict
  45. from synapse.util import ExceptionBundle
  46. logger = logging.getLogger(__name__)
  47. class TransportLayerClient:
  48. """Sends federation HTTP requests to other servers"""
  49. def __init__(self, hs):
  50. self.server_name = hs.hostname
  51. self.client = hs.get_federation_http_client()
  52. self._faster_joins_enabled = hs.config.experimental.faster_joins_enabled
  53. async def get_room_state_ids(
  54. self, destination: str, room_id: str, event_id: str
  55. ) -> JsonDict:
  56. """Requests the IDs of all state for a given room at the given event.
  57. Args:
  58. destination: The host name of the remote homeserver we want
  59. to get the state from.
  60. room_id: the room we want the state of
  61. event_id: The event we want the context at.
  62. Returns:
  63. Results in a dict received from the remote homeserver.
  64. """
  65. logger.debug("get_room_state_ids dest=%s, room=%s", destination, room_id)
  66. path = _create_v1_path("/state_ids/%s", room_id)
  67. return await self.client.get_json(
  68. destination,
  69. path=path,
  70. args={"event_id": event_id},
  71. try_trailing_slash_on_400=True,
  72. )
  73. async def get_room_state(
  74. self, room_version: RoomVersion, destination: str, room_id: str, event_id: str
  75. ) -> "StateRequestResponse":
  76. """Requests the full state for a given room at the given event.
  77. Args:
  78. room_version: the version of the room (required to build the event objects)
  79. destination: The host name of the remote homeserver we want
  80. to get the state from.
  81. room_id: the room we want the state of
  82. event_id: The event we want the context at.
  83. Returns:
  84. Results in a dict received from the remote homeserver.
  85. """
  86. path = _create_v1_path("/state/%s", room_id)
  87. return await self.client.get_json(
  88. destination,
  89. path=path,
  90. args={"event_id": event_id},
  91. parser=_StateParser(room_version),
  92. )
  93. async def get_event(
  94. self, destination: str, event_id: str, timeout: Optional[int] = None
  95. ) -> JsonDict:
  96. """Requests the pdu with give id and origin from the given server.
  97. Args:
  98. destination: The host name of the remote homeserver we want
  99. to get the state from.
  100. event_id: The id of the event being requested.
  101. timeout: How long to try (in ms) the destination for before
  102. giving up. None indicates no timeout.
  103. Returns:
  104. Results in a dict received from the remote homeserver.
  105. """
  106. logger.debug("get_pdu dest=%s, event_id=%s", destination, event_id)
  107. path = _create_v1_path("/event/%s", event_id)
  108. return await self.client.get_json(
  109. destination, path=path, timeout=timeout, try_trailing_slash_on_400=True
  110. )
  111. async def backfill(
  112. self, destination: str, room_id: str, event_tuples: Collection[str], limit: int
  113. ) -> Optional[JsonDict]:
  114. """Requests `limit` previous PDUs in a given context before list of
  115. PDUs.
  116. Args:
  117. destination
  118. room_id
  119. event_tuples:
  120. Must be a Collection that is falsy when empty.
  121. (Iterable is not enough here!)
  122. limit
  123. Returns:
  124. Results in a dict received from the remote homeserver.
  125. """
  126. logger.debug(
  127. "backfill dest=%s, room_id=%s, event_tuples=%r, limit=%s",
  128. destination,
  129. room_id,
  130. event_tuples,
  131. str(limit),
  132. )
  133. if not event_tuples:
  134. # TODO: raise?
  135. return None
  136. path = _create_v1_path("/backfill/%s", room_id)
  137. args = {"v": event_tuples, "limit": [str(limit)]}
  138. return await self.client.get_json(
  139. destination, path=path, args=args, try_trailing_slash_on_400=True
  140. )
  141. async def timestamp_to_event(
  142. self, destination: str, room_id: str, timestamp: int, direction: str
  143. ) -> Union[JsonDict, List]:
  144. """
  145. Calls a remote federating server at `destination` asking for their
  146. closest event to the given timestamp in the given direction.
  147. Args:
  148. destination: Domain name of the remote homeserver
  149. room_id: Room to fetch the event from
  150. timestamp: The point in time (inclusive) we should navigate from in
  151. the given direction to find the closest event.
  152. direction: ["f"|"b"] to indicate whether we should navigate forward
  153. or backward from the given timestamp to find the closest event.
  154. Returns:
  155. Response dict received from the remote homeserver.
  156. Raises:
  157. Various exceptions when the request fails
  158. """
  159. path = _create_v1_path(
  160. "/timestamp_to_event/%s",
  161. room_id,
  162. )
  163. args = {"ts": [str(timestamp)], "dir": [direction]}
  164. remote_response = await self.client.get_json(
  165. destination, path=path, args=args, try_trailing_slash_on_400=True
  166. )
  167. return remote_response
  168. async def send_transaction(
  169. self,
  170. transaction: Transaction,
  171. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  172. ) -> JsonDict:
  173. """Sends the given Transaction to its destination
  174. Args:
  175. transaction
  176. Returns:
  177. Succeeds when we get a 2xx HTTP response. The result
  178. will be the decoded JSON body.
  179. Fails with ``HTTPRequestException`` if we get an HTTP response
  180. code >= 300.
  181. Fails with ``NotRetryingDestination`` if we are not yet ready
  182. to retry this server.
  183. Fails with ``FederationDeniedError`` if this destination
  184. is not on our federation whitelist
  185. """
  186. logger.debug(
  187. "send_data dest=%s, txid=%s",
  188. transaction.destination,
  189. transaction.transaction_id,
  190. )
  191. if transaction.destination == self.server_name:
  192. raise RuntimeError("Transport layer cannot send to itself!")
  193. # FIXME: This is only used by the tests. The actual json sent is
  194. # generated by the json_data_callback.
  195. json_data = transaction.get_dict()
  196. path = _create_v1_path("/send/%s", transaction.transaction_id)
  197. return await self.client.put_json(
  198. transaction.destination,
  199. path=path,
  200. data=json_data,
  201. json_data_callback=json_data_callback,
  202. long_retries=True,
  203. backoff_on_404=True, # If we get a 404 the other side has gone
  204. try_trailing_slash_on_400=True,
  205. )
  206. async def make_query(
  207. self,
  208. destination: str,
  209. query_type: str,
  210. args: QueryParams,
  211. retry_on_dns_fail: bool,
  212. ignore_backoff: bool = False,
  213. prefix: str = FEDERATION_V1_PREFIX,
  214. ) -> JsonDict:
  215. path = _create_path(prefix, "/query/%s", query_type)
  216. return await self.client.get_json(
  217. destination=destination,
  218. path=path,
  219. args=args,
  220. retry_on_dns_fail=retry_on_dns_fail,
  221. timeout=10000,
  222. ignore_backoff=ignore_backoff,
  223. )
  224. async def make_membership_event(
  225. self,
  226. destination: str,
  227. room_id: str,
  228. user_id: str,
  229. membership: str,
  230. params: Optional[Mapping[str, Union[str, Iterable[str]]]],
  231. ) -> JsonDict:
  232. """Asks a remote server to build and sign us a membership event
  233. Note that this does not append any events to any graphs.
  234. Args:
  235. destination: address of remote homeserver
  236. room_id: room to join/leave
  237. user_id: user to be joined/left
  238. membership: one of join/leave
  239. params: Query parameters to include in the request.
  240. Returns:
  241. Succeeds when we get a 2xx HTTP response. The result
  242. will be the decoded JSON body (ie, the new event).
  243. Fails with ``HTTPRequestException`` if we get an HTTP response
  244. code >= 300.
  245. Fails with ``NotRetryingDestination`` if we are not yet ready
  246. to retry this server.
  247. Fails with ``FederationDeniedError`` if the remote destination
  248. is not in our federation whitelist
  249. """
  250. valid_memberships = {Membership.JOIN, Membership.LEAVE, Membership.KNOCK}
  251. if membership not in valid_memberships:
  252. raise RuntimeError(
  253. "make_membership_event called with membership='%s', must be one of %s"
  254. % (membership, ",".join(valid_memberships))
  255. )
  256. path = _create_v1_path("/make_%s/%s/%s", membership, room_id, user_id)
  257. ignore_backoff = False
  258. retry_on_dns_fail = False
  259. if membership == Membership.LEAVE:
  260. # we particularly want to do our best to send leave events. The
  261. # problem is that if it fails, we won't retry it later, so if the
  262. # remote server was just having a momentary blip, the room will be
  263. # out of sync.
  264. ignore_backoff = True
  265. retry_on_dns_fail = True
  266. return await self.client.get_json(
  267. destination=destination,
  268. path=path,
  269. args=params,
  270. retry_on_dns_fail=retry_on_dns_fail,
  271. timeout=20000,
  272. ignore_backoff=ignore_backoff,
  273. )
  274. async def send_join_v1(
  275. self,
  276. room_version: RoomVersion,
  277. destination: str,
  278. room_id: str,
  279. event_id: str,
  280. content: JsonDict,
  281. ) -> "SendJoinResponse":
  282. path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
  283. return await self.client.put_json(
  284. destination=destination,
  285. path=path,
  286. data=content,
  287. parser=SendJoinParser(room_version, v1_api=True),
  288. )
  289. async def send_join_v2(
  290. self,
  291. room_version: RoomVersion,
  292. destination: str,
  293. room_id: str,
  294. event_id: str,
  295. content: JsonDict,
  296. ) -> "SendJoinResponse":
  297. path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
  298. query_params: Dict[str, str] = {}
  299. if self._faster_joins_enabled:
  300. # lazy-load state on join
  301. query_params["org.matrix.msc3706.partial_state"] = "true"
  302. return await self.client.put_json(
  303. destination=destination,
  304. path=path,
  305. args=query_params,
  306. data=content,
  307. parser=SendJoinParser(room_version, v1_api=False),
  308. )
  309. async def send_leave_v1(
  310. self, destination: str, room_id: str, event_id: str, content: JsonDict
  311. ) -> Tuple[int, JsonDict]:
  312. path = _create_v1_path("/send_leave/%s/%s", room_id, event_id)
  313. return await self.client.put_json(
  314. destination=destination,
  315. path=path,
  316. data=content,
  317. # we want to do our best to send this through. The problem is
  318. # that if it fails, we won't retry it later, so if the remote
  319. # server was just having a momentary blip, the room will be out of
  320. # sync.
  321. ignore_backoff=True,
  322. )
  323. async def send_leave_v2(
  324. self, destination: str, room_id: str, event_id: str, content: JsonDict
  325. ) -> JsonDict:
  326. path = _create_v2_path("/send_leave/%s/%s", room_id, event_id)
  327. return await self.client.put_json(
  328. destination=destination,
  329. path=path,
  330. data=content,
  331. # we want to do our best to send this through. The problem is
  332. # that if it fails, we won't retry it later, so if the remote
  333. # server was just having a momentary blip, the room will be out of
  334. # sync.
  335. ignore_backoff=True,
  336. )
  337. async def send_knock_v1(
  338. self,
  339. destination: str,
  340. room_id: str,
  341. event_id: str,
  342. content: JsonDict,
  343. ) -> JsonDict:
  344. """
  345. Sends a signed knock membership event to a remote server. This is the second
  346. step for knocking after make_knock.
  347. Args:
  348. destination: The remote homeserver.
  349. room_id: The ID of the room to knock on.
  350. event_id: The ID of the knock membership event that we're sending.
  351. content: The knock membership event that we're sending. Note that this is not the
  352. `content` field of the membership event, but the entire signed membership event
  353. itself represented as a JSON dict.
  354. Returns:
  355. The remote homeserver can optionally return some state from the room. The response
  356. dictionary is in the form:
  357. {"knock_state_events": [<state event dict>, ...]}
  358. The list of state events may be empty.
  359. """
  360. path = _create_v1_path("/send_knock/%s/%s", room_id, event_id)
  361. return await self.client.put_json(
  362. destination=destination, path=path, data=content
  363. )
  364. async def send_invite_v1(
  365. self, destination: str, room_id: str, event_id: str, content: JsonDict
  366. ) -> Tuple[int, JsonDict]:
  367. path = _create_v1_path("/invite/%s/%s", room_id, event_id)
  368. return await self.client.put_json(
  369. destination=destination, path=path, data=content, ignore_backoff=True
  370. )
  371. async def send_invite_v2(
  372. self, destination: str, room_id: str, event_id: str, content: JsonDict
  373. ) -> JsonDict:
  374. path = _create_v2_path("/invite/%s/%s", room_id, event_id)
  375. return await self.client.put_json(
  376. destination=destination, path=path, data=content, ignore_backoff=True
  377. )
  378. async def get_public_rooms(
  379. self,
  380. remote_server: str,
  381. limit: Optional[int] = None,
  382. since_token: Optional[str] = None,
  383. search_filter: Optional[Dict] = None,
  384. include_all_networks: bool = False,
  385. third_party_instance_id: Optional[str] = None,
  386. ) -> JsonDict:
  387. """Get the list of public rooms from a remote homeserver
  388. See synapse.federation.federation_client.FederationClient.get_public_rooms for
  389. more information.
  390. """
  391. if search_filter:
  392. # this uses MSC2197 (Search Filtering over Federation)
  393. path = _create_v1_path("/publicRooms")
  394. data: Dict[str, Any] = {
  395. "include_all_networks": "true" if include_all_networks else "false"
  396. }
  397. if third_party_instance_id:
  398. data["third_party_instance_id"] = third_party_instance_id
  399. if limit:
  400. data["limit"] = limit
  401. if since_token:
  402. data["since"] = since_token
  403. data["filter"] = search_filter
  404. try:
  405. response = await self.client.post_json(
  406. destination=remote_server, path=path, data=data, ignore_backoff=True
  407. )
  408. except HttpResponseException as e:
  409. if e.code == 403:
  410. raise SynapseError(
  411. 403,
  412. "You are not allowed to view the public rooms list of %s"
  413. % (remote_server,),
  414. errcode=Codes.FORBIDDEN,
  415. )
  416. raise
  417. else:
  418. path = _create_v1_path("/publicRooms")
  419. args: Dict[str, Union[str, Iterable[str]]] = {
  420. "include_all_networks": "true" if include_all_networks else "false"
  421. }
  422. if third_party_instance_id:
  423. args["third_party_instance_id"] = (third_party_instance_id,)
  424. if limit:
  425. args["limit"] = [str(limit)]
  426. if since_token:
  427. args["since"] = [since_token]
  428. try:
  429. response = await self.client.get_json(
  430. destination=remote_server, path=path, args=args, ignore_backoff=True
  431. )
  432. except HttpResponseException as e:
  433. if e.code == 403:
  434. raise SynapseError(
  435. 403,
  436. "You are not allowed to view the public rooms list of %s"
  437. % (remote_server,),
  438. errcode=Codes.FORBIDDEN,
  439. )
  440. raise
  441. return response
  442. async def exchange_third_party_invite(
  443. self, destination: str, room_id: str, event_dict: JsonDict
  444. ) -> JsonDict:
  445. path = _create_v1_path("/exchange_third_party_invite/%s", room_id)
  446. return await self.client.put_json(
  447. destination=destination, path=path, data=event_dict
  448. )
  449. async def get_event_auth(
  450. self, destination: str, room_id: str, event_id: str
  451. ) -> JsonDict:
  452. path = _create_v1_path("/event_auth/%s/%s", room_id, event_id)
  453. return await self.client.get_json(destination=destination, path=path)
  454. async def query_client_keys(
  455. self, destination: str, query_content: JsonDict, timeout: int
  456. ) -> JsonDict:
  457. """Query the device keys for a list of user ids hosted on a remote
  458. server.
  459. Request:
  460. {
  461. "device_keys": {
  462. "<user_id>": ["<device_id>"]
  463. }
  464. }
  465. Response:
  466. {
  467. "device_keys": {
  468. "<user_id>": {
  469. "<device_id>": {...}
  470. }
  471. },
  472. "master_key": {
  473. "<user_id>": {...}
  474. }
  475. },
  476. "self_signing_key": {
  477. "<user_id>": {...}
  478. }
  479. }
  480. Args:
  481. destination: The server to query.
  482. query_content: The user ids to query.
  483. Returns:
  484. A dict containing device and cross-signing keys.
  485. """
  486. path = _create_v1_path("/user/keys/query")
  487. return await self.client.post_json(
  488. destination=destination, path=path, data=query_content, timeout=timeout
  489. )
  490. async def query_user_devices(
  491. self, destination: str, user_id: str, timeout: int
  492. ) -> JsonDict:
  493. """Query the devices for a user id hosted on a remote server.
  494. Response:
  495. {
  496. "stream_id": "...",
  497. "devices": [ { ... } ],
  498. "master_key": {
  499. "user_id": "<user_id>",
  500. "usage": [...],
  501. "keys": {...},
  502. "signatures": {
  503. "<user_id>": {...}
  504. }
  505. },
  506. "self_signing_key": {
  507. "user_id": "<user_id>",
  508. "usage": [...],
  509. "keys": {...},
  510. "signatures": {
  511. "<user_id>": {...}
  512. }
  513. }
  514. }
  515. Args:
  516. destination: The server to query.
  517. query_content: The user ids to query.
  518. Returns:
  519. A dict containing device and cross-signing keys.
  520. """
  521. path = _create_v1_path("/user/devices/%s", user_id)
  522. return await self.client.get_json(
  523. destination=destination, path=path, timeout=timeout
  524. )
  525. async def claim_client_keys(
  526. self, destination: str, query_content: JsonDict, timeout: Optional[int]
  527. ) -> JsonDict:
  528. """Claim one-time keys for a list of devices hosted on a remote server.
  529. Request:
  530. {
  531. "one_time_keys": {
  532. "<user_id>": {
  533. "<device_id>": "<algorithm>"
  534. }
  535. }
  536. }
  537. Response:
  538. {
  539. "device_keys": {
  540. "<user_id>": {
  541. "<device_id>": {
  542. "<algorithm>:<key_id>": "<key_base64>"
  543. }
  544. }
  545. }
  546. }
  547. Args:
  548. destination: The server to query.
  549. query_content: The user ids to query.
  550. Returns:
  551. A dict containing the one-time keys.
  552. """
  553. path = _create_v1_path("/user/keys/claim")
  554. return await self.client.post_json(
  555. destination=destination, path=path, data=query_content, timeout=timeout
  556. )
  557. async def get_missing_events(
  558. self,
  559. destination: str,
  560. room_id: str,
  561. earliest_events: Iterable[str],
  562. latest_events: Iterable[str],
  563. limit: int,
  564. min_depth: int,
  565. timeout: int,
  566. ) -> JsonDict:
  567. path = _create_v1_path("/get_missing_events/%s", room_id)
  568. return await self.client.post_json(
  569. destination=destination,
  570. path=path,
  571. data={
  572. "limit": int(limit),
  573. "min_depth": int(min_depth),
  574. "earliest_events": earliest_events,
  575. "latest_events": latest_events,
  576. },
  577. timeout=timeout,
  578. )
  579. async def get_room_complexity(self, destination: str, room_id: str) -> JsonDict:
  580. """
  581. Args:
  582. destination: The remote server
  583. room_id: The room ID to ask about.
  584. """
  585. path = _create_path(FEDERATION_UNSTABLE_PREFIX, "/rooms/%s/complexity", room_id)
  586. return await self.client.get_json(destination=destination, path=path)
  587. async def get_room_hierarchy(
  588. self, destination: str, room_id: str, suggested_only: bool
  589. ) -> JsonDict:
  590. """
  591. Args:
  592. destination: The remote server
  593. room_id: The room ID to ask about.
  594. suggested_only: if True, only suggested rooms will be returned
  595. """
  596. path = _create_v1_path("/hierarchy/%s", room_id)
  597. return await self.client.get_json(
  598. destination=destination,
  599. path=path,
  600. args={"suggested_only": "true" if suggested_only else "false"},
  601. )
  602. async def get_room_hierarchy_unstable(
  603. self, destination: str, room_id: str, suggested_only: bool
  604. ) -> JsonDict:
  605. """
  606. Args:
  607. destination: The remote server
  608. room_id: The room ID to ask about.
  609. suggested_only: if True, only suggested rooms will be returned
  610. """
  611. path = _create_path(
  612. FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/hierarchy/%s", room_id
  613. )
  614. return await self.client.get_json(
  615. destination=destination,
  616. path=path,
  617. args={"suggested_only": "true" if suggested_only else "false"},
  618. )
  619. async def get_account_status(
  620. self, destination: str, user_ids: List[str]
  621. ) -> JsonDict:
  622. """
  623. Args:
  624. destination: The remote server.
  625. user_ids: The user ID(s) for which to request account status(es).
  626. """
  627. path = _create_path(
  628. FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc3720/account_status"
  629. )
  630. return await self.client.post_json(
  631. destination=destination, path=path, data={"user_ids": user_ids}
  632. )
  633. def _create_path(federation_prefix: str, path: str, *args: str) -> str:
  634. """
  635. Ensures that all args are url encoded.
  636. """
  637. return federation_prefix + path % tuple(urllib.parse.quote(arg, "") for arg in args)
  638. def _create_v1_path(path: str, *args: str) -> str:
  639. """Creates a path against V1 federation API from the path template and
  640. args. Ensures that all args are url encoded.
  641. Example:
  642. _create_v1_path("/event/%s", event_id)
  643. Args:
  644. path: String template for the path
  645. args: Args to insert into path. Each arg will be url encoded
  646. """
  647. return _create_path(FEDERATION_V1_PREFIX, path, *args)
  648. def _create_v2_path(path: str, *args: str) -> str:
  649. """Creates a path against V2 federation API from the path template and
  650. args. Ensures that all args are url encoded.
  651. Example:
  652. _create_v2_path("/event/%s", event_id)
  653. Args:
  654. path: String template for the path
  655. args: Args to insert into path. Each arg will be url encoded
  656. """
  657. return _create_path(FEDERATION_V2_PREFIX, path, *args)
  658. @attr.s(slots=True, auto_attribs=True)
  659. class SendJoinResponse:
  660. """The parsed response of a `/send_join` request."""
  661. # The list of auth events from the /send_join response.
  662. auth_events: List[EventBase]
  663. # The list of state from the /send_join response.
  664. state: List[EventBase]
  665. # The raw join event from the /send_join response.
  666. event_dict: JsonDict
  667. # The parsed join event from the /send_join response. This will be None if
  668. # "event" is not included in the response.
  669. event: Optional[EventBase] = None
  670. # The room state is incomplete
  671. partial_state: bool = False
  672. # List of servers in the room
  673. servers_in_room: Optional[List[str]] = None
  674. @attr.s(slots=True, auto_attribs=True)
  675. class StateRequestResponse:
  676. """The parsed response of a `/state` request."""
  677. auth_events: List[EventBase]
  678. state: List[EventBase]
  679. @ijson.coroutine
  680. def _event_parser(event_dict: JsonDict) -> Generator[None, Tuple[str, Any], None]:
  681. """Helper function for use with `ijson.kvitems_coro` to parse key-value pairs
  682. to add them to a given dictionary.
  683. """
  684. while True:
  685. key, value = yield
  686. event_dict[key] = value
  687. @ijson.coroutine
  688. def _event_list_parser(
  689. room_version: RoomVersion, events: List[EventBase]
  690. ) -> Generator[None, JsonDict, None]:
  691. """Helper function for use with `ijson.items_coro` to parse an array of
  692. events and add them to the given list.
  693. """
  694. while True:
  695. obj = yield
  696. event = make_event_from_dict(obj, room_version)
  697. events.append(event)
  698. @ijson.coroutine
  699. def _partial_state_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
  700. """Helper function for use with `ijson.items_coro`
  701. Parses the partial_state field in send_join responses
  702. """
  703. while True:
  704. val = yield
  705. if not isinstance(val, bool):
  706. raise TypeError("partial_state must be a boolean")
  707. response.partial_state = val
  708. @ijson.coroutine
  709. def _servers_in_room_parser(response: SendJoinResponse) -> Generator[None, Any, None]:
  710. """Helper function for use with `ijson.items_coro`
  711. Parses the servers_in_room field in send_join responses
  712. """
  713. while True:
  714. val = yield
  715. if not isinstance(val, list) or any(not isinstance(x, str) for x in val):
  716. raise TypeError("servers_in_room must be a list of strings")
  717. response.servers_in_room = val
  718. class SendJoinParser(ByteParser[SendJoinResponse]):
  719. """A parser for the response to `/send_join` requests.
  720. Args:
  721. room_version: The version of the room.
  722. v1_api: Whether the response is in the v1 format.
  723. """
  724. CONTENT_TYPE = "application/json"
  725. # /send_join responses can be huge, so we override the size limit here. The response
  726. # is parsed in a streaming manner, which helps alleviate the issue of memory
  727. # usage a bit.
  728. MAX_RESPONSE_SIZE = 500 * 1024 * 1024
  729. def __init__(self, room_version: RoomVersion, v1_api: bool):
  730. self._response = SendJoinResponse([], [], event_dict={})
  731. self._room_version = room_version
  732. self._coros: List[Generator[None, bytes, None]] = []
  733. # The V1 API has the shape of `[200, {...}]`, which we handle by
  734. # prefixing with `item.*`.
  735. prefix = "item." if v1_api else ""
  736. self._coros = [
  737. ijson.items_coro(
  738. _event_list_parser(room_version, self._response.state),
  739. prefix + "state.item",
  740. use_float=True,
  741. ),
  742. ijson.items_coro(
  743. _event_list_parser(room_version, self._response.auth_events),
  744. prefix + "auth_chain.item",
  745. use_float=True,
  746. ),
  747. ijson.kvitems_coro(
  748. _event_parser(self._response.event_dict),
  749. prefix + "event",
  750. use_float=True,
  751. ),
  752. ]
  753. if not v1_api:
  754. self._coros.append(
  755. ijson.items_coro(
  756. _partial_state_parser(self._response),
  757. "org.matrix.msc3706.partial_state",
  758. use_float="True",
  759. )
  760. )
  761. self._coros.append(
  762. ijson.items_coro(
  763. _servers_in_room_parser(self._response),
  764. "org.matrix.msc3706.servers_in_room",
  765. use_float="True",
  766. )
  767. )
  768. def write(self, data: bytes) -> int:
  769. for c in self._coros:
  770. c.send(data)
  771. return len(data)
  772. def finish(self) -> SendJoinResponse:
  773. _close_coros(self._coros)
  774. if self._response.event_dict:
  775. self._response.event = make_event_from_dict(
  776. self._response.event_dict, self._room_version
  777. )
  778. return self._response
  779. class _StateParser(ByteParser[StateRequestResponse]):
  780. """A parser for the response to `/state` requests.
  781. Args:
  782. room_version: The version of the room.
  783. """
  784. CONTENT_TYPE = "application/json"
  785. # As with /send_join, /state responses can be huge.
  786. MAX_RESPONSE_SIZE = 500 * 1024 * 1024
  787. def __init__(self, room_version: RoomVersion):
  788. self._response = StateRequestResponse([], [])
  789. self._room_version = room_version
  790. self._coros: List[Generator[None, bytes, None]] = [
  791. ijson.items_coro(
  792. _event_list_parser(room_version, self._response.state),
  793. "pdus.item",
  794. use_float=True,
  795. ),
  796. ijson.items_coro(
  797. _event_list_parser(room_version, self._response.auth_events),
  798. "auth_chain.item",
  799. use_float=True,
  800. ),
  801. ]
  802. def write(self, data: bytes) -> int:
  803. for c in self._coros:
  804. c.send(data)
  805. return len(data)
  806. def finish(self) -> StateRequestResponse:
  807. _close_coros(self._coros)
  808. return self._response
  809. def _close_coros(coros: Iterable[Generator[None, bytes, None]]) -> None:
  810. """Close each of the given coroutines.
  811. Always calls .close() on each coroutine, even if doing so raises an exception.
  812. Any exceptions raised are aggregated into an ExceptionBundle.
  813. :raises ExceptionBundle: if at least one coroutine fails to close.
  814. """
  815. exceptions = []
  816. for c in coros:
  817. try:
  818. c.close()
  819. except Exception as e:
  820. exceptions.append(e)
  821. if exceptions:
  822. # raise from the first exception so that the traceback has slightly more context
  823. raise ExceptionBundle(
  824. f"There were {len(exceptions)} errors closing coroutines", exceptions
  825. ) from exceptions[0]