groups_local.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503
  1. # Copyright 2017 Vector Creations Ltd
  2. # Copyright 2018 New Vector Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Iterable, List, Set
  17. from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
  18. from synapse.types import GroupID, JsonDict, get_domain_from_id
  19. if TYPE_CHECKING:
  20. from synapse.server import HomeServer
  21. logger = logging.getLogger(__name__)
  22. def _create_rerouter(func_name: str) -> Callable[..., Awaitable[JsonDict]]:
  23. """Returns an async function that looks at the group id and calls the function
  24. on federation or the local group server if the group is local
  25. """
  26. async def f(
  27. self: "GroupsLocalWorkerHandler", group_id: str, *args: Any, **kwargs: Any
  28. ) -> JsonDict:
  29. if not GroupID.is_valid(group_id):
  30. raise SynapseError(400, "%s is not a legal group ID" % (group_id,))
  31. if self.is_mine_id(group_id):
  32. return await getattr(self.groups_server_handler, func_name)(
  33. group_id, *args, **kwargs
  34. )
  35. else:
  36. destination = get_domain_from_id(group_id)
  37. try:
  38. return await getattr(self.transport_client, func_name)(
  39. destination, group_id, *args, **kwargs
  40. )
  41. except HttpResponseException as e:
  42. # Capture errors returned by the remote homeserver and
  43. # re-throw specific errors as SynapseErrors. This is so
  44. # when the remote end responds with things like 403 Not
  45. # In Group, we can communicate that to the client instead
  46. # of a 500.
  47. raise e.to_synapse_error()
  48. except RequestSendFailed:
  49. raise SynapseError(502, "Failed to contact group server")
  50. return f
  51. class GroupsLocalWorkerHandler:
  52. def __init__(self, hs: "HomeServer"):
  53. self.hs = hs
  54. self.store = hs.get_datastores().main
  55. self.room_list_handler = hs.get_room_list_handler()
  56. self.groups_server_handler = hs.get_groups_server_handler()
  57. self.transport_client = hs.get_federation_transport_client()
  58. self.auth = hs.get_auth()
  59. self.clock = hs.get_clock()
  60. self.keyring = hs.get_keyring()
  61. self.is_mine_id = hs.is_mine_id
  62. self.signing_key = hs.signing_key
  63. self.server_name = hs.hostname
  64. self.notifier = hs.get_notifier()
  65. self.attestations = hs.get_groups_attestation_signing()
  66. self.profile_handler = hs.get_profile_handler()
  67. # The following functions merely route the query to the local groups server
  68. # or federation depending on if the group is local or remote
  69. get_group_profile = _create_rerouter("get_group_profile")
  70. get_rooms_in_group = _create_rerouter("get_rooms_in_group")
  71. get_invited_users_in_group = _create_rerouter("get_invited_users_in_group")
  72. get_group_category = _create_rerouter("get_group_category")
  73. get_group_categories = _create_rerouter("get_group_categories")
  74. get_group_role = _create_rerouter("get_group_role")
  75. get_group_roles = _create_rerouter("get_group_roles")
  76. async def get_group_summary(
  77. self, group_id: str, requester_user_id: str
  78. ) -> JsonDict:
  79. """Get the group summary for a group.
  80. If the group is remote we check that the users have valid attestations.
  81. """
  82. if self.is_mine_id(group_id):
  83. res = await self.groups_server_handler.get_group_summary(
  84. group_id, requester_user_id
  85. )
  86. else:
  87. try:
  88. res = await self.transport_client.get_group_summary(
  89. get_domain_from_id(group_id), group_id, requester_user_id
  90. )
  91. except HttpResponseException as e:
  92. raise e.to_synapse_error()
  93. except RequestSendFailed:
  94. raise SynapseError(502, "Failed to contact group server")
  95. group_server_name = get_domain_from_id(group_id)
  96. # Loop through the users and validate the attestations.
  97. chunk = res["users_section"]["users"]
  98. valid_users = []
  99. for entry in chunk:
  100. g_user_id = entry["user_id"]
  101. attestation = entry.pop("attestation", {})
  102. try:
  103. if get_domain_from_id(g_user_id) != group_server_name:
  104. await self.attestations.verify_attestation(
  105. attestation,
  106. group_id=group_id,
  107. user_id=g_user_id,
  108. server_name=get_domain_from_id(g_user_id),
  109. )
  110. valid_users.append(entry)
  111. except Exception as e:
  112. logger.info("Failed to verify user is in group: %s", e)
  113. res["users_section"]["users"] = valid_users
  114. res["users_section"]["users"].sort(key=lambda e: e.get("order", 0))
  115. res["rooms_section"]["rooms"].sort(key=lambda e: e.get("order", 0))
  116. # Add `is_publicised` flag to indicate whether the user has publicised their
  117. # membership of the group on their profile
  118. result = await self.store.get_publicised_groups_for_user(requester_user_id)
  119. is_publicised = group_id in result
  120. res.setdefault("user", {})["is_publicised"] = is_publicised
  121. return res
  122. async def get_users_in_group(
  123. self, group_id: str, requester_user_id: str
  124. ) -> JsonDict:
  125. """Get users in a group"""
  126. if self.is_mine_id(group_id):
  127. return await self.groups_server_handler.get_users_in_group(
  128. group_id, requester_user_id
  129. )
  130. group_server_name = get_domain_from_id(group_id)
  131. try:
  132. res = await self.transport_client.get_users_in_group(
  133. get_domain_from_id(group_id), group_id, requester_user_id
  134. )
  135. except HttpResponseException as e:
  136. raise e.to_synapse_error()
  137. except RequestSendFailed:
  138. raise SynapseError(502, "Failed to contact group server")
  139. chunk = res["chunk"]
  140. valid_entries = []
  141. for entry in chunk:
  142. g_user_id = entry["user_id"]
  143. attestation = entry.pop("attestation", {})
  144. try:
  145. if get_domain_from_id(g_user_id) != group_server_name:
  146. await self.attestations.verify_attestation(
  147. attestation,
  148. group_id=group_id,
  149. user_id=g_user_id,
  150. server_name=get_domain_from_id(g_user_id),
  151. )
  152. valid_entries.append(entry)
  153. except Exception as e:
  154. logger.info("Failed to verify user is in group: %s", e)
  155. res["chunk"] = valid_entries
  156. return res
  157. async def get_joined_groups(self, user_id: str) -> JsonDict:
  158. group_ids = await self.store.get_joined_groups(user_id)
  159. return {"groups": group_ids}
  160. async def get_publicised_groups_for_user(self, user_id: str) -> JsonDict:
  161. if self.hs.is_mine_id(user_id):
  162. result = await self.store.get_publicised_groups_for_user(user_id)
  163. # Check AS associated groups for this user - this depends on the
  164. # RegExps in the AS registration file (under `users`)
  165. for app_service in self.store.get_app_services():
  166. result.extend(app_service.get_groups_for_user(user_id))
  167. return {"groups": result}
  168. else:
  169. try:
  170. bulk_result = await self.transport_client.bulk_get_publicised_groups(
  171. get_domain_from_id(user_id), [user_id]
  172. )
  173. except HttpResponseException as e:
  174. raise e.to_synapse_error()
  175. except RequestSendFailed:
  176. raise SynapseError(502, "Failed to contact group server")
  177. result = bulk_result.get("users", {}).get(user_id)
  178. # TODO: Verify attestations
  179. return {"groups": result}
  180. async def bulk_get_publicised_groups(
  181. self, user_ids: Iterable[str], proxy: bool = True
  182. ) -> JsonDict:
  183. destinations: Dict[str, Set[str]] = {}
  184. local_users = set()
  185. for user_id in user_ids:
  186. if self.hs.is_mine_id(user_id):
  187. local_users.add(user_id)
  188. else:
  189. destinations.setdefault(get_domain_from_id(user_id), set()).add(user_id)
  190. if not proxy and destinations:
  191. raise SynapseError(400, "Some user_ids are not local")
  192. results = {}
  193. failed_results: List[str] = []
  194. for destination, dest_user_ids in destinations.items():
  195. try:
  196. r = await self.transport_client.bulk_get_publicised_groups(
  197. destination, list(dest_user_ids)
  198. )
  199. results.update(r["users"])
  200. except Exception:
  201. failed_results.extend(dest_user_ids)
  202. for uid in local_users:
  203. results[uid] = await self.store.get_publicised_groups_for_user(uid)
  204. # Check AS associated groups for this user - this depends on the
  205. # RegExps in the AS registration file (under `users`)
  206. for app_service in self.store.get_app_services():
  207. results[uid].extend(app_service.get_groups_for_user(uid))
  208. return {"users": results}
  209. class GroupsLocalHandler(GroupsLocalWorkerHandler):
  210. def __init__(self, hs: "HomeServer"):
  211. super().__init__(hs)
  212. # Ensure attestations get renewed
  213. hs.get_groups_attestation_renewer()
  214. # The following functions merely route the query to the local groups server
  215. # or federation depending on if the group is local or remote
  216. update_group_profile = _create_rerouter("update_group_profile")
  217. add_room_to_group = _create_rerouter("add_room_to_group")
  218. update_room_in_group = _create_rerouter("update_room_in_group")
  219. remove_room_from_group = _create_rerouter("remove_room_from_group")
  220. update_group_summary_room = _create_rerouter("update_group_summary_room")
  221. delete_group_summary_room = _create_rerouter("delete_group_summary_room")
  222. update_group_category = _create_rerouter("update_group_category")
  223. delete_group_category = _create_rerouter("delete_group_category")
  224. update_group_summary_user = _create_rerouter("update_group_summary_user")
  225. delete_group_summary_user = _create_rerouter("delete_group_summary_user")
  226. update_group_role = _create_rerouter("update_group_role")
  227. delete_group_role = _create_rerouter("delete_group_role")
  228. set_group_join_policy = _create_rerouter("set_group_join_policy")
  229. async def create_group(
  230. self, group_id: str, user_id: str, content: JsonDict
  231. ) -> JsonDict:
  232. """Create a group"""
  233. logger.info("Asking to create group with ID: %r", group_id)
  234. if self.is_mine_id(group_id):
  235. res = await self.groups_server_handler.create_group(
  236. group_id, user_id, content
  237. )
  238. local_attestation = None
  239. remote_attestation = None
  240. else:
  241. raise SynapseError(400, "Unable to create remote groups")
  242. is_publicised = content.get("publicise", False)
  243. token = await self.store.register_user_group_membership(
  244. group_id,
  245. user_id,
  246. membership="join",
  247. is_admin=True,
  248. local_attestation=local_attestation,
  249. remote_attestation=remote_attestation,
  250. is_publicised=is_publicised,
  251. )
  252. self.notifier.on_new_event("groups_key", token, users=[user_id])
  253. return res
  254. async def join_group(
  255. self, group_id: str, user_id: str, content: JsonDict
  256. ) -> JsonDict:
  257. """Request to join a group"""
  258. if self.is_mine_id(group_id):
  259. await self.groups_server_handler.join_group(group_id, user_id, content)
  260. local_attestation = None
  261. remote_attestation = None
  262. else:
  263. local_attestation = self.attestations.create_attestation(group_id, user_id)
  264. content["attestation"] = local_attestation
  265. try:
  266. res = await self.transport_client.join_group(
  267. get_domain_from_id(group_id), group_id, user_id, content
  268. )
  269. except HttpResponseException as e:
  270. raise e.to_synapse_error()
  271. except RequestSendFailed:
  272. raise SynapseError(502, "Failed to contact group server")
  273. remote_attestation = res["attestation"]
  274. await self.attestations.verify_attestation(
  275. remote_attestation,
  276. group_id=group_id,
  277. user_id=user_id,
  278. server_name=get_domain_from_id(group_id),
  279. )
  280. # TODO: Check that the group is public and we're being added publicly
  281. is_publicised = content.get("publicise", False)
  282. token = await self.store.register_user_group_membership(
  283. group_id,
  284. user_id,
  285. membership="join",
  286. is_admin=False,
  287. local_attestation=local_attestation,
  288. remote_attestation=remote_attestation,
  289. is_publicised=is_publicised,
  290. )
  291. self.notifier.on_new_event("groups_key", token, users=[user_id])
  292. return {}
  293. async def accept_invite(
  294. self, group_id: str, user_id: str, content: JsonDict
  295. ) -> JsonDict:
  296. """Accept an invite to a group"""
  297. if self.is_mine_id(group_id):
  298. await self.groups_server_handler.accept_invite(group_id, user_id, content)
  299. local_attestation = None
  300. remote_attestation = None
  301. else:
  302. local_attestation = self.attestations.create_attestation(group_id, user_id)
  303. content["attestation"] = local_attestation
  304. try:
  305. res = await self.transport_client.accept_group_invite(
  306. get_domain_from_id(group_id), group_id, user_id, content
  307. )
  308. except HttpResponseException as e:
  309. raise e.to_synapse_error()
  310. except RequestSendFailed:
  311. raise SynapseError(502, "Failed to contact group server")
  312. remote_attestation = res["attestation"]
  313. await self.attestations.verify_attestation(
  314. remote_attestation,
  315. group_id=group_id,
  316. user_id=user_id,
  317. server_name=get_domain_from_id(group_id),
  318. )
  319. # TODO: Check that the group is public and we're being added publicly
  320. is_publicised = content.get("publicise", False)
  321. token = await self.store.register_user_group_membership(
  322. group_id,
  323. user_id,
  324. membership="join",
  325. is_admin=False,
  326. local_attestation=local_attestation,
  327. remote_attestation=remote_attestation,
  328. is_publicised=is_publicised,
  329. )
  330. self.notifier.on_new_event("groups_key", token, users=[user_id])
  331. return {}
  332. async def invite(
  333. self, group_id: str, user_id: str, requester_user_id: str, config: JsonDict
  334. ) -> JsonDict:
  335. """Invite a user to a group"""
  336. content = {"requester_user_id": requester_user_id, "config": config}
  337. if self.is_mine_id(group_id):
  338. res = await self.groups_server_handler.invite_to_group(
  339. group_id, user_id, requester_user_id, content
  340. )
  341. else:
  342. try:
  343. res = await self.transport_client.invite_to_group(
  344. get_domain_from_id(group_id),
  345. group_id,
  346. user_id,
  347. requester_user_id,
  348. content,
  349. )
  350. except HttpResponseException as e:
  351. raise e.to_synapse_error()
  352. except RequestSendFailed:
  353. raise SynapseError(502, "Failed to contact group server")
  354. return res
  355. async def on_invite(
  356. self, group_id: str, user_id: str, content: JsonDict
  357. ) -> JsonDict:
  358. """One of our users were invited to a group"""
  359. # TODO: Support auto join and rejection
  360. if not self.is_mine_id(user_id):
  361. raise SynapseError(400, "User not on this server")
  362. local_profile = {}
  363. if "profile" in content:
  364. if "name" in content["profile"]:
  365. local_profile["name"] = content["profile"]["name"]
  366. if "avatar_url" in content["profile"]:
  367. local_profile["avatar_url"] = content["profile"]["avatar_url"]
  368. token = await self.store.register_user_group_membership(
  369. group_id,
  370. user_id,
  371. membership="invite",
  372. content={"profile": local_profile, "inviter": content["inviter"]},
  373. )
  374. self.notifier.on_new_event("groups_key", token, users=[user_id])
  375. try:
  376. user_profile = await self.profile_handler.get_profile(user_id)
  377. except Exception as e:
  378. logger.warning("No profile for user %s: %s", user_id, e)
  379. user_profile = {}
  380. return {"state": "invite", "user_profile": user_profile}
  381. async def remove_user_from_group(
  382. self, group_id: str, user_id: str, requester_user_id: str, content: JsonDict
  383. ) -> JsonDict:
  384. """Remove a user from a group"""
  385. if user_id == requester_user_id:
  386. token = await self.store.register_user_group_membership(
  387. group_id, user_id, membership="leave"
  388. )
  389. self.notifier.on_new_event("groups_key", token, users=[user_id])
  390. # TODO: Should probably remember that we tried to leave so that we can
  391. # retry if the group server is currently down.
  392. if self.is_mine_id(group_id):
  393. res = await self.groups_server_handler.remove_user_from_group(
  394. group_id, user_id, requester_user_id, content
  395. )
  396. else:
  397. content["requester_user_id"] = requester_user_id
  398. try:
  399. res = await self.transport_client.remove_user_from_group(
  400. get_domain_from_id(group_id),
  401. group_id,
  402. requester_user_id,
  403. user_id,
  404. content,
  405. )
  406. except HttpResponseException as e:
  407. raise e.to_synapse_error()
  408. except RequestSendFailed:
  409. raise SynapseError(502, "Failed to contact group server")
  410. return res
  411. async def user_removed_from_group(
  412. self, group_id: str, user_id: str, content: JsonDict
  413. ) -> None:
  414. """One of our users was removed/kicked from a group"""
  415. # TODO: Check if user in group
  416. token = await self.store.register_user_group_membership(
  417. group_id, user_id, membership="leave"
  418. )
  419. self.notifier.on_new_event("groups_key", token, users=[user_id])