client.py 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321
  1. # Copyright 2014-2021 The Matrix.org Foundation C.I.C.
  2. # Copyright 2020 Sorunome
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import urllib
  17. from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Tuple, Union
  18. import attr
  19. import ijson
  20. from synapse.api.constants import Membership
  21. from synapse.api.errors import Codes, HttpResponseException, SynapseError
  22. from synapse.api.room_versions import RoomVersion
  23. from synapse.api.urls import (
  24. FEDERATION_UNSTABLE_PREFIX,
  25. FEDERATION_V1_PREFIX,
  26. FEDERATION_V2_PREFIX,
  27. )
  28. from synapse.events import EventBase, make_event_from_dict
  29. from synapse.federation.units import Transaction
  30. from synapse.http.matrixfederationclient import ByteParser
  31. from synapse.logging.utils import log_function
  32. from synapse.types import JsonDict
  33. logger = logging.getLogger(__name__)
  34. # Send join responses can be huge, so we set a separate limit here. The response
  35. # is parsed in a streaming manner, which helps alleviate the issue of memory
  36. # usage a bit.
  37. MAX_RESPONSE_SIZE_SEND_JOIN = 500 * 1024 * 1024
  38. class TransportLayerClient:
  39. """Sends federation HTTP requests to other servers"""
  40. def __init__(self, hs):
  41. self.server_name = hs.hostname
  42. self.client = hs.get_federation_http_client()
  43. @log_function
  44. async def get_room_state_ids(
  45. self, destination: str, room_id: str, event_id: str
  46. ) -> JsonDict:
  47. """Requests all state for a given room from the given server at the
  48. given event. Returns the state's event_id's
  49. Args:
  50. destination: The host name of the remote homeserver we want
  51. to get the state from.
  52. context: The name of the context we want the state of
  53. event_id: The event we want the context at.
  54. Returns:
  55. Results in a dict received from the remote homeserver.
  56. """
  57. logger.debug("get_room_state_ids dest=%s, room=%s", destination, room_id)
  58. path = _create_v1_path("/state_ids/%s", room_id)
  59. return await self.client.get_json(
  60. destination,
  61. path=path,
  62. args={"event_id": event_id},
  63. try_trailing_slash_on_400=True,
  64. )
  65. @log_function
  66. async def get_event(
  67. self, destination: str, event_id: str, timeout: Optional[int] = None
  68. ) -> JsonDict:
  69. """Requests the pdu with give id and origin from the given server.
  70. Args:
  71. destination: The host name of the remote homeserver we want
  72. to get the state from.
  73. event_id: The id of the event being requested.
  74. timeout: How long to try (in ms) the destination for before
  75. giving up. None indicates no timeout.
  76. Returns:
  77. Results in a dict received from the remote homeserver.
  78. """
  79. logger.debug("get_pdu dest=%s, event_id=%s", destination, event_id)
  80. path = _create_v1_path("/event/%s", event_id)
  81. return await self.client.get_json(
  82. destination, path=path, timeout=timeout, try_trailing_slash_on_400=True
  83. )
  84. @log_function
  85. async def backfill(
  86. self, destination: str, room_id: str, event_tuples: Iterable[str], limit: int
  87. ) -> Optional[JsonDict]:
  88. """Requests `limit` previous PDUs in a given context before list of
  89. PDUs.
  90. Args:
  91. destination
  92. room_id
  93. event_tuples
  94. limit
  95. Returns:
  96. Results in a dict received from the remote homeserver.
  97. """
  98. logger.debug(
  99. "backfill dest=%s, room_id=%s, event_tuples=%r, limit=%s",
  100. destination,
  101. room_id,
  102. event_tuples,
  103. str(limit),
  104. )
  105. if not event_tuples:
  106. # TODO: raise?
  107. return None
  108. path = _create_v1_path("/backfill/%s", room_id)
  109. args = {"v": event_tuples, "limit": [str(limit)]}
  110. return await self.client.get_json(
  111. destination, path=path, args=args, try_trailing_slash_on_400=True
  112. )
  113. @log_function
  114. async def send_transaction(
  115. self,
  116. transaction: Transaction,
  117. json_data_callback: Optional[Callable[[], JsonDict]] = None,
  118. ) -> JsonDict:
  119. """Sends the given Transaction to its destination
  120. Args:
  121. transaction
  122. Returns:
  123. Succeeds when we get a 2xx HTTP response. The result
  124. will be the decoded JSON body.
  125. Fails with ``HTTPRequestException`` if we get an HTTP response
  126. code >= 300.
  127. Fails with ``NotRetryingDestination`` if we are not yet ready
  128. to retry this server.
  129. Fails with ``FederationDeniedError`` if this destination
  130. is not on our federation whitelist
  131. """
  132. logger.debug(
  133. "send_data dest=%s, txid=%s",
  134. transaction.destination, # type: ignore
  135. transaction.transaction_id, # type: ignore
  136. )
  137. if transaction.destination == self.server_name: # type: ignore
  138. raise RuntimeError("Transport layer cannot send to itself!")
  139. # FIXME: This is only used by the tests. The actual json sent is
  140. # generated by the json_data_callback.
  141. json_data = transaction.get_dict()
  142. path = _create_v1_path("/send/%s", transaction.transaction_id) # type: ignore
  143. return await self.client.put_json(
  144. transaction.destination, # type: ignore
  145. path=path,
  146. data=json_data,
  147. json_data_callback=json_data_callback,
  148. long_retries=True,
  149. backoff_on_404=True, # If we get a 404 the other side has gone
  150. try_trailing_slash_on_400=True,
  151. )
  152. @log_function
  153. async def make_query(
  154. self, destination, query_type, args, retry_on_dns_fail, ignore_backoff=False
  155. ):
  156. path = _create_v1_path("/query/%s", query_type)
  157. content = await self.client.get_json(
  158. destination=destination,
  159. path=path,
  160. args=args,
  161. retry_on_dns_fail=retry_on_dns_fail,
  162. timeout=10000,
  163. ignore_backoff=ignore_backoff,
  164. )
  165. return content
  166. @log_function
  167. async def make_membership_event(
  168. self,
  169. destination: str,
  170. room_id: str,
  171. user_id: str,
  172. membership: str,
  173. params: Optional[Mapping[str, Union[str, Iterable[str]]]],
  174. ) -> JsonDict:
  175. """Asks a remote server to build and sign us a membership event
  176. Note that this does not append any events to any graphs.
  177. Args:
  178. destination (str): address of remote homeserver
  179. room_id (str): room to join/leave
  180. user_id (str): user to be joined/left
  181. membership (str): one of join/leave
  182. params (dict[str, str|Iterable[str]]): Query parameters to include in the
  183. request.
  184. Returns:
  185. Succeeds when we get a 2xx HTTP response. The result
  186. will be the decoded JSON body (ie, the new event).
  187. Fails with ``HTTPRequestException`` if we get an HTTP response
  188. code >= 300.
  189. Fails with ``NotRetryingDestination`` if we are not yet ready
  190. to retry this server.
  191. Fails with ``FederationDeniedError`` if the remote destination
  192. is not in our federation whitelist
  193. """
  194. valid_memberships = {Membership.JOIN, Membership.LEAVE, Membership.KNOCK}
  195. if membership not in valid_memberships:
  196. raise RuntimeError(
  197. "make_membership_event called with membership='%s', must be one of %s"
  198. % (membership, ",".join(valid_memberships))
  199. )
  200. path = _create_v1_path("/make_%s/%s/%s", membership, room_id, user_id)
  201. ignore_backoff = False
  202. retry_on_dns_fail = False
  203. if membership == Membership.LEAVE:
  204. # we particularly want to do our best to send leave events. The
  205. # problem is that if it fails, we won't retry it later, so if the
  206. # remote server was just having a momentary blip, the room will be
  207. # out of sync.
  208. ignore_backoff = True
  209. retry_on_dns_fail = True
  210. return await self.client.get_json(
  211. destination=destination,
  212. path=path,
  213. args=params,
  214. retry_on_dns_fail=retry_on_dns_fail,
  215. timeout=20000,
  216. ignore_backoff=ignore_backoff,
  217. )
  218. @log_function
  219. async def send_join_v1(
  220. self,
  221. room_version: RoomVersion,
  222. destination: str,
  223. room_id: str,
  224. event_id: str,
  225. content: JsonDict,
  226. ) -> "SendJoinResponse":
  227. path = _create_v1_path("/send_join/%s/%s", room_id, event_id)
  228. return await self.client.put_json(
  229. destination=destination,
  230. path=path,
  231. data=content,
  232. parser=SendJoinParser(room_version, v1_api=True),
  233. max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
  234. )
  235. @log_function
  236. async def send_join_v2(
  237. self,
  238. room_version: RoomVersion,
  239. destination: str,
  240. room_id: str,
  241. event_id: str,
  242. content: JsonDict,
  243. ) -> "SendJoinResponse":
  244. path = _create_v2_path("/send_join/%s/%s", room_id, event_id)
  245. return await self.client.put_json(
  246. destination=destination,
  247. path=path,
  248. data=content,
  249. parser=SendJoinParser(room_version, v1_api=False),
  250. max_response_size=MAX_RESPONSE_SIZE_SEND_JOIN,
  251. )
  252. @log_function
  253. async def send_leave_v1(
  254. self, destination: str, room_id: str, event_id: str, content: JsonDict
  255. ) -> Tuple[int, JsonDict]:
  256. path = _create_v1_path("/send_leave/%s/%s", room_id, event_id)
  257. return await self.client.put_json(
  258. destination=destination,
  259. path=path,
  260. data=content,
  261. # we want to do our best to send this through. The problem is
  262. # that if it fails, we won't retry it later, so if the remote
  263. # server was just having a momentary blip, the room will be out of
  264. # sync.
  265. ignore_backoff=True,
  266. )
  267. @log_function
  268. async def send_leave_v2(
  269. self, destination: str, room_id: str, event_id: str, content: JsonDict
  270. ) -> JsonDict:
  271. path = _create_v2_path("/send_leave/%s/%s", room_id, event_id)
  272. return await self.client.put_json(
  273. destination=destination,
  274. path=path,
  275. data=content,
  276. # we want to do our best to send this through. The problem is
  277. # that if it fails, we won't retry it later, so if the remote
  278. # server was just having a momentary blip, the room will be out of
  279. # sync.
  280. ignore_backoff=True,
  281. )
  282. @log_function
  283. async def send_knock_v1(
  284. self,
  285. destination: str,
  286. room_id: str,
  287. event_id: str,
  288. content: JsonDict,
  289. ) -> JsonDict:
  290. """
  291. Sends a signed knock membership event to a remote server. This is the second
  292. step for knocking after make_knock.
  293. Args:
  294. destination: The remote homeserver.
  295. room_id: The ID of the room to knock on.
  296. event_id: The ID of the knock membership event that we're sending.
  297. content: The knock membership event that we're sending. Note that this is not the
  298. `content` field of the membership event, but the entire signed membership event
  299. itself represented as a JSON dict.
  300. Returns:
  301. The remote homeserver can optionally return some state from the room. The response
  302. dictionary is in the form:
  303. {"knock_state_events": [<state event dict>, ...]}
  304. The list of state events may be empty.
  305. """
  306. path = _create_v1_path("/send_knock/%s/%s", room_id, event_id)
  307. return await self.client.put_json(
  308. destination=destination, path=path, data=content
  309. )
  310. @log_function
  311. async def send_invite_v1(
  312. self, destination: str, room_id: str, event_id: str, content: JsonDict
  313. ) -> Tuple[int, JsonDict]:
  314. path = _create_v1_path("/invite/%s/%s", room_id, event_id)
  315. return await self.client.put_json(
  316. destination=destination, path=path, data=content, ignore_backoff=True
  317. )
  318. @log_function
  319. async def send_invite_v2(
  320. self, destination: str, room_id: str, event_id: str, content: JsonDict
  321. ) -> JsonDict:
  322. path = _create_v2_path("/invite/%s/%s", room_id, event_id)
  323. return await self.client.put_json(
  324. destination=destination, path=path, data=content, ignore_backoff=True
  325. )
  326. @log_function
  327. async def get_public_rooms(
  328. self,
  329. remote_server: str,
  330. limit: Optional[int] = None,
  331. since_token: Optional[str] = None,
  332. search_filter: Optional[Dict] = None,
  333. include_all_networks: bool = False,
  334. third_party_instance_id: Optional[str] = None,
  335. ) -> JsonDict:
  336. """Get the list of public rooms from a remote homeserver
  337. See synapse.federation.federation_client.FederationClient.get_public_rooms for
  338. more information.
  339. """
  340. if search_filter:
  341. # this uses MSC2197 (Search Filtering over Federation)
  342. path = _create_v1_path("/publicRooms")
  343. data: Dict[str, Any] = {
  344. "include_all_networks": "true" if include_all_networks else "false"
  345. }
  346. if third_party_instance_id:
  347. data["third_party_instance_id"] = third_party_instance_id
  348. if limit:
  349. data["limit"] = str(limit)
  350. if since_token:
  351. data["since"] = since_token
  352. data["filter"] = search_filter
  353. try:
  354. response = await self.client.post_json(
  355. destination=remote_server, path=path, data=data, ignore_backoff=True
  356. )
  357. except HttpResponseException as e:
  358. if e.code == 403:
  359. raise SynapseError(
  360. 403,
  361. "You are not allowed to view the public rooms list of %s"
  362. % (remote_server,),
  363. errcode=Codes.FORBIDDEN,
  364. )
  365. raise
  366. else:
  367. path = _create_v1_path("/publicRooms")
  368. args: Dict[str, Any] = {
  369. "include_all_networks": "true" if include_all_networks else "false"
  370. }
  371. if third_party_instance_id:
  372. args["third_party_instance_id"] = (third_party_instance_id,)
  373. if limit:
  374. args["limit"] = [str(limit)]
  375. if since_token:
  376. args["since"] = [since_token]
  377. try:
  378. response = await self.client.get_json(
  379. destination=remote_server, path=path, args=args, ignore_backoff=True
  380. )
  381. except HttpResponseException as e:
  382. if e.code == 403:
  383. raise SynapseError(
  384. 403,
  385. "You are not allowed to view the public rooms list of %s"
  386. % (remote_server,),
  387. errcode=Codes.FORBIDDEN,
  388. )
  389. raise
  390. return response
  391. @log_function
  392. async def exchange_third_party_invite(
  393. self, destination: str, room_id: str, event_dict: JsonDict
  394. ) -> JsonDict:
  395. path = _create_v1_path("/exchange_third_party_invite/%s", room_id)
  396. return await self.client.put_json(
  397. destination=destination, path=path, data=event_dict
  398. )
  399. @log_function
  400. async def get_event_auth(
  401. self, destination: str, room_id: str, event_id: str
  402. ) -> JsonDict:
  403. path = _create_v1_path("/event_auth/%s/%s", room_id, event_id)
  404. return await self.client.get_json(destination=destination, path=path)
  405. @log_function
  406. async def query_client_keys(
  407. self, destination: str, query_content: JsonDict, timeout: int
  408. ) -> JsonDict:
  409. """Query the device keys for a list of user ids hosted on a remote
  410. server.
  411. Request:
  412. {
  413. "device_keys": {
  414. "<user_id>": ["<device_id>"]
  415. }
  416. }
  417. Response:
  418. {
  419. "device_keys": {
  420. "<user_id>": {
  421. "<device_id>": {...}
  422. }
  423. },
  424. "master_key": {
  425. "<user_id>": {...}
  426. }
  427. },
  428. "self_signing_key": {
  429. "<user_id>": {...}
  430. }
  431. }
  432. Args:
  433. destination: The server to query.
  434. query_content: The user ids to query.
  435. Returns:
  436. A dict containing device and cross-signing keys.
  437. """
  438. path = _create_v1_path("/user/keys/query")
  439. return await self.client.post_json(
  440. destination=destination, path=path, data=query_content, timeout=timeout
  441. )
  442. @log_function
  443. async def query_user_devices(
  444. self, destination: str, user_id: str, timeout: int
  445. ) -> JsonDict:
  446. """Query the devices for a user id hosted on a remote server.
  447. Response:
  448. {
  449. "stream_id": "...",
  450. "devices": [ { ... } ],
  451. "master_key": {
  452. "user_id": "<user_id>",
  453. "usage": [...],
  454. "keys": {...},
  455. "signatures": {
  456. "<user_id>": {...}
  457. }
  458. },
  459. "self_signing_key": {
  460. "user_id": "<user_id>",
  461. "usage": [...],
  462. "keys": {...},
  463. "signatures": {
  464. "<user_id>": {...}
  465. }
  466. }
  467. }
  468. Args:
  469. destination: The server to query.
  470. query_content: The user ids to query.
  471. Returns:
  472. A dict containing device and cross-signing keys.
  473. """
  474. path = _create_v1_path("/user/devices/%s", user_id)
  475. return await self.client.get_json(
  476. destination=destination, path=path, timeout=timeout
  477. )
  478. @log_function
  479. async def claim_client_keys(
  480. self, destination: str, query_content: JsonDict, timeout: int
  481. ) -> JsonDict:
  482. """Claim one-time keys for a list of devices hosted on a remote server.
  483. Request:
  484. {
  485. "one_time_keys": {
  486. "<user_id>": {
  487. "<device_id>": "<algorithm>"
  488. }
  489. }
  490. }
  491. Response:
  492. {
  493. "device_keys": {
  494. "<user_id>": {
  495. "<device_id>": {
  496. "<algorithm>:<key_id>": "<key_base64>"
  497. }
  498. }
  499. }
  500. }
  501. Args:
  502. destination: The server to query.
  503. query_content: The user ids to query.
  504. Returns:
  505. A dict containing the one-time keys.
  506. """
  507. path = _create_v1_path("/user/keys/claim")
  508. return await self.client.post_json(
  509. destination=destination, path=path, data=query_content, timeout=timeout
  510. )
  511. @log_function
  512. async def get_missing_events(
  513. self,
  514. destination: str,
  515. room_id: str,
  516. earliest_events: Iterable[str],
  517. latest_events: Iterable[str],
  518. limit: int,
  519. min_depth: int,
  520. timeout: int,
  521. ) -> JsonDict:
  522. path = _create_v1_path("/get_missing_events/%s", room_id)
  523. return await self.client.post_json(
  524. destination=destination,
  525. path=path,
  526. data={
  527. "limit": int(limit),
  528. "min_depth": int(min_depth),
  529. "earliest_events": earliest_events,
  530. "latest_events": latest_events,
  531. },
  532. timeout=timeout,
  533. )
  534. @log_function
  535. async def get_group_profile(
  536. self, destination: str, group_id: str, requester_user_id: str
  537. ) -> JsonDict:
  538. """Get a group profile"""
  539. path = _create_v1_path("/groups/%s/profile", group_id)
  540. return await self.client.get_json(
  541. destination=destination,
  542. path=path,
  543. args={"requester_user_id": requester_user_id},
  544. ignore_backoff=True,
  545. )
  546. @log_function
  547. async def update_group_profile(
  548. self, destination: str, group_id: str, requester_user_id: str, content: JsonDict
  549. ) -> JsonDict:
  550. """Update a remote group profile
  551. Args:
  552. destination
  553. group_id
  554. requester_user_id
  555. content: The new profile of the group
  556. """
  557. path = _create_v1_path("/groups/%s/profile", group_id)
  558. return self.client.post_json(
  559. destination=destination,
  560. path=path,
  561. args={"requester_user_id": requester_user_id},
  562. data=content,
  563. ignore_backoff=True,
  564. )
  565. @log_function
  566. async def get_group_summary(
  567. self, destination: str, group_id: str, requester_user_id: str
  568. ) -> JsonDict:
  569. """Get a group summary"""
  570. path = _create_v1_path("/groups/%s/summary", group_id)
  571. return await self.client.get_json(
  572. destination=destination,
  573. path=path,
  574. args={"requester_user_id": requester_user_id},
  575. ignore_backoff=True,
  576. )
  577. @log_function
  578. async def get_rooms_in_group(
  579. self, destination: str, group_id: str, requester_user_id: str
  580. ) -> JsonDict:
  581. """Get all rooms in a group"""
  582. path = _create_v1_path("/groups/%s/rooms", group_id)
  583. return await self.client.get_json(
  584. destination=destination,
  585. path=path,
  586. args={"requester_user_id": requester_user_id},
  587. ignore_backoff=True,
  588. )
  589. async def add_room_to_group(
  590. self,
  591. destination: str,
  592. group_id: str,
  593. requester_user_id: str,
  594. room_id: str,
  595. content: JsonDict,
  596. ) -> JsonDict:
  597. """Add a room to a group"""
  598. path = _create_v1_path("/groups/%s/room/%s", group_id, room_id)
  599. return await self.client.post_json(
  600. destination=destination,
  601. path=path,
  602. args={"requester_user_id": requester_user_id},
  603. data=content,
  604. ignore_backoff=True,
  605. )
  606. async def update_room_in_group(
  607. self,
  608. destination: str,
  609. group_id: str,
  610. requester_user_id: str,
  611. room_id: str,
  612. config_key: str,
  613. content: JsonDict,
  614. ) -> JsonDict:
  615. """Update room in group"""
  616. path = _create_v1_path(
  617. "/groups/%s/room/%s/config/%s", group_id, room_id, config_key
  618. )
  619. return await self.client.post_json(
  620. destination=destination,
  621. path=path,
  622. args={"requester_user_id": requester_user_id},
  623. data=content,
  624. ignore_backoff=True,
  625. )
  626. async def remove_room_from_group(
  627. self, destination: str, group_id: str, requester_user_id: str, room_id: str
  628. ) -> JsonDict:
  629. """Remove a room from a group"""
  630. path = _create_v1_path("/groups/%s/room/%s", group_id, room_id)
  631. return await self.client.delete_json(
  632. destination=destination,
  633. path=path,
  634. args={"requester_user_id": requester_user_id},
  635. ignore_backoff=True,
  636. )
  637. @log_function
  638. async def get_users_in_group(
  639. self, destination: str, group_id: str, requester_user_id: str
  640. ) -> JsonDict:
  641. """Get users in a group"""
  642. path = _create_v1_path("/groups/%s/users", group_id)
  643. return await self.client.get_json(
  644. destination=destination,
  645. path=path,
  646. args={"requester_user_id": requester_user_id},
  647. ignore_backoff=True,
  648. )
  649. @log_function
  650. async def get_invited_users_in_group(
  651. self, destination: str, group_id: str, requester_user_id: str
  652. ) -> JsonDict:
  653. """Get users that have been invited to a group"""
  654. path = _create_v1_path("/groups/%s/invited_users", group_id)
  655. return await self.client.get_json(
  656. destination=destination,
  657. path=path,
  658. args={"requester_user_id": requester_user_id},
  659. ignore_backoff=True,
  660. )
  661. @log_function
  662. async def accept_group_invite(
  663. self, destination: str, group_id: str, user_id: str, content: JsonDict
  664. ) -> JsonDict:
  665. """Accept a group invite"""
  666. path = _create_v1_path("/groups/%s/users/%s/accept_invite", group_id, user_id)
  667. return await self.client.post_json(
  668. destination=destination, path=path, data=content, ignore_backoff=True
  669. )
  670. @log_function
  671. def join_group(
  672. self, destination: str, group_id: str, user_id: str, content: JsonDict
  673. ) -> JsonDict:
  674. """Attempts to join a group"""
  675. path = _create_v1_path("/groups/%s/users/%s/join", group_id, user_id)
  676. return self.client.post_json(
  677. destination=destination, path=path, data=content, ignore_backoff=True
  678. )
  679. @log_function
  680. async def invite_to_group(
  681. self,
  682. destination: str,
  683. group_id: str,
  684. user_id: str,
  685. requester_user_id: str,
  686. content: JsonDict,
  687. ) -> JsonDict:
  688. """Invite a user to a group"""
  689. path = _create_v1_path("/groups/%s/users/%s/invite", group_id, user_id)
  690. return await self.client.post_json(
  691. destination=destination,
  692. path=path,
  693. args={"requester_user_id": requester_user_id},
  694. data=content,
  695. ignore_backoff=True,
  696. )
  697. @log_function
  698. async def invite_to_group_notification(
  699. self, destination: str, group_id: str, user_id: str, content: JsonDict
  700. ) -> JsonDict:
  701. """Sent by group server to inform a user's server that they have been
  702. invited.
  703. """
  704. path = _create_v1_path("/groups/local/%s/users/%s/invite", group_id, user_id)
  705. return await self.client.post_json(
  706. destination=destination, path=path, data=content, ignore_backoff=True
  707. )
  708. @log_function
  709. async def remove_user_from_group(
  710. self,
  711. destination: str,
  712. group_id: str,
  713. requester_user_id: str,
  714. user_id: str,
  715. content: JsonDict,
  716. ) -> JsonDict:
  717. """Remove a user from a group"""
  718. path = _create_v1_path("/groups/%s/users/%s/remove", group_id, user_id)
  719. return await self.client.post_json(
  720. destination=destination,
  721. path=path,
  722. args={"requester_user_id": requester_user_id},
  723. data=content,
  724. ignore_backoff=True,
  725. )
  726. @log_function
  727. async def remove_user_from_group_notification(
  728. self, destination: str, group_id: str, user_id: str, content: JsonDict
  729. ) -> JsonDict:
  730. """Sent by group server to inform a user's server that they have been
  731. kicked from the group.
  732. """
  733. path = _create_v1_path("/groups/local/%s/users/%s/remove", group_id, user_id)
  734. return await self.client.post_json(
  735. destination=destination, path=path, data=content, ignore_backoff=True
  736. )
  737. @log_function
  738. async def renew_group_attestation(
  739. self, destination: str, group_id: str, user_id: str, content: JsonDict
  740. ) -> JsonDict:
  741. """Sent by either a group server or a user's server to periodically update
  742. the attestations
  743. """
  744. path = _create_v1_path("/groups/%s/renew_attestation/%s", group_id, user_id)
  745. return await self.client.post_json(
  746. destination=destination, path=path, data=content, ignore_backoff=True
  747. )
  748. @log_function
  749. async def update_group_summary_room(
  750. self,
  751. destination: str,
  752. group_id: str,
  753. user_id: str,
  754. room_id: str,
  755. category_id: str,
  756. content: JsonDict,
  757. ) -> JsonDict:
  758. """Update a room entry in a group summary"""
  759. if category_id:
  760. path = _create_v1_path(
  761. "/groups/%s/summary/categories/%s/rooms/%s",
  762. group_id,
  763. category_id,
  764. room_id,
  765. )
  766. else:
  767. path = _create_v1_path("/groups/%s/summary/rooms/%s", group_id, room_id)
  768. return await self.client.post_json(
  769. destination=destination,
  770. path=path,
  771. args={"requester_user_id": user_id},
  772. data=content,
  773. ignore_backoff=True,
  774. )
  775. @log_function
  776. async def delete_group_summary_room(
  777. self,
  778. destination: str,
  779. group_id: str,
  780. user_id: str,
  781. room_id: str,
  782. category_id: str,
  783. ) -> JsonDict:
  784. """Delete a room entry in a group summary"""
  785. if category_id:
  786. path = _create_v1_path(
  787. "/groups/%s/summary/categories/%s/rooms/%s",
  788. group_id,
  789. category_id,
  790. room_id,
  791. )
  792. else:
  793. path = _create_v1_path("/groups/%s/summary/rooms/%s", group_id, room_id)
  794. return await self.client.delete_json(
  795. destination=destination,
  796. path=path,
  797. args={"requester_user_id": user_id},
  798. ignore_backoff=True,
  799. )
  800. @log_function
  801. async def get_group_categories(
  802. self, destination: str, group_id: str, requester_user_id: str
  803. ) -> JsonDict:
  804. """Get all categories in a group"""
  805. path = _create_v1_path("/groups/%s/categories", group_id)
  806. return await self.client.get_json(
  807. destination=destination,
  808. path=path,
  809. args={"requester_user_id": requester_user_id},
  810. ignore_backoff=True,
  811. )
  812. @log_function
  813. async def get_group_category(
  814. self, destination: str, group_id: str, requester_user_id: str, category_id: str
  815. ) -> JsonDict:
  816. """Get category info in a group"""
  817. path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id)
  818. return await self.client.get_json(
  819. destination=destination,
  820. path=path,
  821. args={"requester_user_id": requester_user_id},
  822. ignore_backoff=True,
  823. )
  824. @log_function
  825. async def update_group_category(
  826. self,
  827. destination: str,
  828. group_id: str,
  829. requester_user_id: str,
  830. category_id: str,
  831. content: JsonDict,
  832. ) -> JsonDict:
  833. """Update a category in a group"""
  834. path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id)
  835. return await self.client.post_json(
  836. destination=destination,
  837. path=path,
  838. args={"requester_user_id": requester_user_id},
  839. data=content,
  840. ignore_backoff=True,
  841. )
  842. @log_function
  843. async def delete_group_category(
  844. self, destination: str, group_id: str, requester_user_id: str, category_id: str
  845. ) -> JsonDict:
  846. """Delete a category in a group"""
  847. path = _create_v1_path("/groups/%s/categories/%s", group_id, category_id)
  848. return await self.client.delete_json(
  849. destination=destination,
  850. path=path,
  851. args={"requester_user_id": requester_user_id},
  852. ignore_backoff=True,
  853. )
  854. @log_function
  855. async def get_group_roles(
  856. self, destination: str, group_id: str, requester_user_id: str
  857. ) -> JsonDict:
  858. """Get all roles in a group"""
  859. path = _create_v1_path("/groups/%s/roles", group_id)
  860. return await self.client.get_json(
  861. destination=destination,
  862. path=path,
  863. args={"requester_user_id": requester_user_id},
  864. ignore_backoff=True,
  865. )
  866. @log_function
  867. async def get_group_role(
  868. self, destination: str, group_id: str, requester_user_id: str, role_id: str
  869. ) -> JsonDict:
  870. """Get a roles info"""
  871. path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id)
  872. return await self.client.get_json(
  873. destination=destination,
  874. path=path,
  875. args={"requester_user_id": requester_user_id},
  876. ignore_backoff=True,
  877. )
  878. @log_function
  879. async def update_group_role(
  880. self,
  881. destination: str,
  882. group_id: str,
  883. requester_user_id: str,
  884. role_id: str,
  885. content: JsonDict,
  886. ) -> JsonDict:
  887. """Update a role in a group"""
  888. path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id)
  889. return await self.client.post_json(
  890. destination=destination,
  891. path=path,
  892. args={"requester_user_id": requester_user_id},
  893. data=content,
  894. ignore_backoff=True,
  895. )
  896. @log_function
  897. async def delete_group_role(
  898. self, destination: str, group_id: str, requester_user_id: str, role_id: str
  899. ) -> JsonDict:
  900. """Delete a role in a group"""
  901. path = _create_v1_path("/groups/%s/roles/%s", group_id, role_id)
  902. return await self.client.delete_json(
  903. destination=destination,
  904. path=path,
  905. args={"requester_user_id": requester_user_id},
  906. ignore_backoff=True,
  907. )
  908. @log_function
  909. async def update_group_summary_user(
  910. self,
  911. destination: str,
  912. group_id: str,
  913. requester_user_id: str,
  914. user_id: str,
  915. role_id: str,
  916. content: JsonDict,
  917. ) -> JsonDict:
  918. """Update a users entry in a group"""
  919. if role_id:
  920. path = _create_v1_path(
  921. "/groups/%s/summary/roles/%s/users/%s", group_id, role_id, user_id
  922. )
  923. else:
  924. path = _create_v1_path("/groups/%s/summary/users/%s", group_id, user_id)
  925. return await self.client.post_json(
  926. destination=destination,
  927. path=path,
  928. args={"requester_user_id": requester_user_id},
  929. data=content,
  930. ignore_backoff=True,
  931. )
  932. @log_function
  933. async def set_group_join_policy(
  934. self, destination: str, group_id: str, requester_user_id: str, content: JsonDict
  935. ) -> JsonDict:
  936. """Sets the join policy for a group"""
  937. path = _create_v1_path("/groups/%s/settings/m.join_policy", group_id)
  938. return await self.client.put_json(
  939. destination=destination,
  940. path=path,
  941. args={"requester_user_id": requester_user_id},
  942. data=content,
  943. ignore_backoff=True,
  944. )
  945. @log_function
  946. async def delete_group_summary_user(
  947. self,
  948. destination: str,
  949. group_id: str,
  950. requester_user_id: str,
  951. user_id: str,
  952. role_id: str,
  953. ) -> JsonDict:
  954. """Delete a users entry in a group"""
  955. if role_id:
  956. path = _create_v1_path(
  957. "/groups/%s/summary/roles/%s/users/%s", group_id, role_id, user_id
  958. )
  959. else:
  960. path = _create_v1_path("/groups/%s/summary/users/%s", group_id, user_id)
  961. return await self.client.delete_json(
  962. destination=destination,
  963. path=path,
  964. args={"requester_user_id": requester_user_id},
  965. ignore_backoff=True,
  966. )
  967. async def bulk_get_publicised_groups(
  968. self, destination: str, user_ids: Iterable[str]
  969. ) -> JsonDict:
  970. """Get the groups a list of users are publicising"""
  971. path = _create_v1_path("/get_groups_publicised")
  972. content = {"user_ids": user_ids}
  973. return await self.client.post_json(
  974. destination=destination, path=path, data=content, ignore_backoff=True
  975. )
  976. async def get_room_complexity(self, destination: str, room_id: str) -> JsonDict:
  977. """
  978. Args:
  979. destination: The remote server
  980. room_id: The room ID to ask about.
  981. """
  982. path = _create_path(FEDERATION_UNSTABLE_PREFIX, "/rooms/%s/complexity", room_id)
  983. return await self.client.get_json(destination=destination, path=path)
  984. async def get_space_summary(
  985. self,
  986. destination: str,
  987. room_id: str,
  988. suggested_only: bool,
  989. max_rooms_per_space: Optional[int],
  990. exclude_rooms: List[str],
  991. ) -> JsonDict:
  992. """
  993. Args:
  994. destination: The remote server
  995. room_id: The room ID to ask about.
  996. suggested_only: if True, only suggested rooms will be returned
  997. max_rooms_per_space: an optional limit to the number of children to be
  998. returned per space
  999. exclude_rooms: a list of any rooms we can skip
  1000. """
  1001. # TODO When switching to the stable endpoint, use GET instead of POST.
  1002. path = _create_path(
  1003. FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/spaces/%s", room_id
  1004. )
  1005. params = {
  1006. "suggested_only": suggested_only,
  1007. "exclude_rooms": exclude_rooms,
  1008. }
  1009. if max_rooms_per_space is not None:
  1010. params["max_rooms_per_space"] = max_rooms_per_space
  1011. return await self.client.post_json(
  1012. destination=destination, path=path, data=params
  1013. )
  1014. async def get_room_hierarchy(
  1015. self,
  1016. destination: str,
  1017. room_id: str,
  1018. suggested_only: bool,
  1019. ) -> JsonDict:
  1020. """
  1021. Args:
  1022. destination: The remote server
  1023. room_id: The room ID to ask about.
  1024. suggested_only: if True, only suggested rooms will be returned
  1025. """
  1026. path = _create_path(
  1027. FEDERATION_UNSTABLE_PREFIX, "/org.matrix.msc2946/hierarchy/%s", room_id
  1028. )
  1029. return await self.client.get_json(
  1030. destination=destination,
  1031. path=path,
  1032. args={"suggested_only": "true" if suggested_only else "false"},
  1033. )
  1034. def _create_path(federation_prefix: str, path: str, *args: str) -> str:
  1035. """
  1036. Ensures that all args are url encoded.
  1037. """
  1038. return federation_prefix + path % tuple(urllib.parse.quote(arg, "") for arg in args)
  1039. def _create_v1_path(path: str, *args: str) -> str:
  1040. """Creates a path against V1 federation API from the path template and
  1041. args. Ensures that all args are url encoded.
  1042. Example:
  1043. _create_v1_path("/event/%s", event_id)
  1044. Args:
  1045. path: String template for the path
  1046. args: Args to insert into path. Each arg will be url encoded
  1047. """
  1048. return _create_path(FEDERATION_V1_PREFIX, path, *args)
  1049. def _create_v2_path(path: str, *args: str) -> str:
  1050. """Creates a path against V2 federation API from the path template and
  1051. args. Ensures that all args are url encoded.
  1052. Example:
  1053. _create_v2_path("/event/%s", event_id)
  1054. Args:
  1055. path: String template for the path
  1056. args: Args to insert into path. Each arg will be url encoded
  1057. """
  1058. return _create_path(FEDERATION_V2_PREFIX, path, *args)
  1059. @attr.s(slots=True, auto_attribs=True)
  1060. class SendJoinResponse:
  1061. """The parsed response of a `/send_join` request."""
  1062. # The list of auth events from the /send_join response.
  1063. auth_events: List[EventBase]
  1064. # The list of state from the /send_join response.
  1065. state: List[EventBase]
  1066. # The raw join event from the /send_join response.
  1067. event_dict: JsonDict
  1068. # The parsed join event from the /send_join response. This will be None if
  1069. # "event" is not included in the response.
  1070. event: Optional[EventBase] = None
  1071. @ijson.coroutine
  1072. def _event_parser(event_dict: JsonDict):
  1073. """Helper function for use with `ijson.kvitems_coro` to parse key-value pairs
  1074. to add them to a given dictionary.
  1075. """
  1076. while True:
  1077. key, value = yield
  1078. event_dict[key] = value
  1079. @ijson.coroutine
  1080. def _event_list_parser(room_version: RoomVersion, events: List[EventBase]):
  1081. """Helper function for use with `ijson.items_coro` to parse an array of
  1082. events and add them to the given list.
  1083. """
  1084. while True:
  1085. obj = yield
  1086. event = make_event_from_dict(obj, room_version)
  1087. events.append(event)
  1088. class SendJoinParser(ByteParser[SendJoinResponse]):
  1089. """A parser for the response to `/send_join` requests.
  1090. Args:
  1091. room_version: The version of the room.
  1092. v1_api: Whether the response is in the v1 format.
  1093. """
  1094. CONTENT_TYPE = "application/json"
  1095. def __init__(self, room_version: RoomVersion, v1_api: bool):
  1096. self._response = SendJoinResponse([], [], {})
  1097. self._room_version = room_version
  1098. # The V1 API has the shape of `[200, {...}]`, which we handle by
  1099. # prefixing with `item.*`.
  1100. prefix = "item." if v1_api else ""
  1101. self._coro_state = ijson.items_coro(
  1102. _event_list_parser(room_version, self._response.state),
  1103. prefix + "state.item",
  1104. )
  1105. self._coro_auth = ijson.items_coro(
  1106. _event_list_parser(room_version, self._response.auth_events),
  1107. prefix + "auth_chain.item",
  1108. )
  1109. self._coro_event = ijson.kvitems_coro(
  1110. _event_parser(self._response.event_dict),
  1111. prefix + "org.matrix.msc3083.v2.event",
  1112. )
  1113. def write(self, data: bytes) -> int:
  1114. self._coro_state.send(data)
  1115. self._coro_auth.send(data)
  1116. self._coro_event.send(data)
  1117. return len(data)
  1118. def finish(self) -> SendJoinResponse:
  1119. if self._response.event_dict:
  1120. self._response.event = make_event_from_dict(
  1121. self._response.event_dict, self._room_version
  1122. )
  1123. return self._response