groups_local.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2017 Vector Creations Ltd
  3. # Copyright 2018 New Vector Ltd
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import logging
  17. from six import iteritems
  18. from twisted.internet import defer
  19. from synapse.api.errors import SynapseError
  20. from synapse.types import get_domain_from_id
  21. logger = logging.getLogger(__name__)
  22. def _create_rerouter(func_name):
  23. """Returns a function that looks at the group id and calls the function
  24. on federation or the local group server if the group is local
  25. """
  26. def f(self, group_id, *args, **kwargs):
  27. if self.is_mine_id(group_id):
  28. return getattr(self.groups_server_handler, func_name)(
  29. group_id, *args, **kwargs
  30. )
  31. else:
  32. destination = get_domain_from_id(group_id)
  33. return getattr(self.transport_client, func_name)(
  34. destination, group_id, *args, **kwargs
  35. )
  36. return f
  37. class GroupsLocalHandler(object):
  38. def __init__(self, hs):
  39. self.hs = hs
  40. self.store = hs.get_datastore()
  41. self.room_list_handler = hs.get_room_list_handler()
  42. self.groups_server_handler = hs.get_groups_server_handler()
  43. self.transport_client = hs.get_federation_transport_client()
  44. self.auth = hs.get_auth()
  45. self.clock = hs.get_clock()
  46. self.keyring = hs.get_keyring()
  47. self.is_mine_id = hs.is_mine_id
  48. self.signing_key = hs.config.signing_key[0]
  49. self.server_name = hs.hostname
  50. self.notifier = hs.get_notifier()
  51. self.attestations = hs.get_groups_attestation_signing()
  52. self.profile_handler = hs.get_profile_handler()
  53. # Ensure attestations get renewed
  54. hs.get_groups_attestation_renewer()
  55. # The following functions merely route the query to the local groups server
  56. # or federation depending on if the group is local or remote
  57. get_group_profile = _create_rerouter("get_group_profile")
  58. update_group_profile = _create_rerouter("update_group_profile")
  59. get_rooms_in_group = _create_rerouter("get_rooms_in_group")
  60. get_invited_users_in_group = _create_rerouter("get_invited_users_in_group")
  61. add_room_to_group = _create_rerouter("add_room_to_group")
  62. update_room_in_group = _create_rerouter("update_room_in_group")
  63. remove_room_from_group = _create_rerouter("remove_room_from_group")
  64. update_group_summary_room = _create_rerouter("update_group_summary_room")
  65. delete_group_summary_room = _create_rerouter("delete_group_summary_room")
  66. update_group_category = _create_rerouter("update_group_category")
  67. delete_group_category = _create_rerouter("delete_group_category")
  68. get_group_category = _create_rerouter("get_group_category")
  69. get_group_categories = _create_rerouter("get_group_categories")
  70. update_group_summary_user = _create_rerouter("update_group_summary_user")
  71. delete_group_summary_user = _create_rerouter("delete_group_summary_user")
  72. update_group_role = _create_rerouter("update_group_role")
  73. delete_group_role = _create_rerouter("delete_group_role")
  74. get_group_role = _create_rerouter("get_group_role")
  75. get_group_roles = _create_rerouter("get_group_roles")
  76. set_group_join_policy = _create_rerouter("set_group_join_policy")
  77. @defer.inlineCallbacks
  78. def get_group_summary(self, group_id, requester_user_id):
  79. """Get the group summary for a group.
  80. If the group is remote we check that the users have valid attestations.
  81. """
  82. if self.is_mine_id(group_id):
  83. res = yield self.groups_server_handler.get_group_summary(
  84. group_id, requester_user_id
  85. )
  86. else:
  87. res = yield self.transport_client.get_group_summary(
  88. get_domain_from_id(group_id), group_id, requester_user_id,
  89. )
  90. group_server_name = get_domain_from_id(group_id)
  91. # Loop through the users and validate the attestations.
  92. chunk = res["users_section"]["users"]
  93. valid_users = []
  94. for entry in chunk:
  95. g_user_id = entry["user_id"]
  96. attestation = entry.pop("attestation", {})
  97. try:
  98. if get_domain_from_id(g_user_id) != group_server_name:
  99. yield self.attestations.verify_attestation(
  100. attestation,
  101. group_id=group_id,
  102. user_id=g_user_id,
  103. server_name=get_domain_from_id(g_user_id),
  104. )
  105. valid_users.append(entry)
  106. except Exception as e:
  107. logger.info("Failed to verify user is in group: %s", e)
  108. res["users_section"]["users"] = valid_users
  109. res["users_section"]["users"].sort(key=lambda e: e.get("order", 0))
  110. res["rooms_section"]["rooms"].sort(key=lambda e: e.get("order", 0))
  111. # Add `is_publicised` flag to indicate whether the user has publicised their
  112. # membership of the group on their profile
  113. result = yield self.store.get_publicised_groups_for_user(requester_user_id)
  114. is_publicised = group_id in result
  115. res.setdefault("user", {})["is_publicised"] = is_publicised
  116. defer.returnValue(res)
  117. @defer.inlineCallbacks
  118. def create_group(self, group_id, user_id, content):
  119. """Create a group
  120. """
  121. logger.info("Asking to create group with ID: %r", group_id)
  122. if self.is_mine_id(group_id):
  123. res = yield self.groups_server_handler.create_group(
  124. group_id, user_id, content
  125. )
  126. local_attestation = None
  127. remote_attestation = None
  128. else:
  129. local_attestation = self.attestations.create_attestation(group_id, user_id)
  130. content["attestation"] = local_attestation
  131. content["user_profile"] = yield self.profile_handler.get_profile(user_id)
  132. res = yield self.transport_client.create_group(
  133. get_domain_from_id(group_id), group_id, user_id, content,
  134. )
  135. remote_attestation = res["attestation"]
  136. yield self.attestations.verify_attestation(
  137. remote_attestation,
  138. group_id=group_id,
  139. user_id=user_id,
  140. server_name=get_domain_from_id(group_id),
  141. )
  142. is_publicised = content.get("publicise", False)
  143. token = yield self.store.register_user_group_membership(
  144. group_id, user_id,
  145. membership="join",
  146. is_admin=True,
  147. local_attestation=local_attestation,
  148. remote_attestation=remote_attestation,
  149. is_publicised=is_publicised,
  150. )
  151. self.notifier.on_new_event(
  152. "groups_key", token, users=[user_id],
  153. )
  154. defer.returnValue(res)
  155. @defer.inlineCallbacks
  156. def get_users_in_group(self, group_id, requester_user_id):
  157. """Get users in a group
  158. """
  159. if self.is_mine_id(group_id):
  160. res = yield self.groups_server_handler.get_users_in_group(
  161. group_id, requester_user_id
  162. )
  163. defer.returnValue(res)
  164. group_server_name = get_domain_from_id(group_id)
  165. res = yield self.transport_client.get_users_in_group(
  166. get_domain_from_id(group_id), group_id, requester_user_id,
  167. )
  168. chunk = res["chunk"]
  169. valid_entries = []
  170. for entry in chunk:
  171. g_user_id = entry["user_id"]
  172. attestation = entry.pop("attestation", {})
  173. try:
  174. if get_domain_from_id(g_user_id) != group_server_name:
  175. yield self.attestations.verify_attestation(
  176. attestation,
  177. group_id=group_id,
  178. user_id=g_user_id,
  179. server_name=get_domain_from_id(g_user_id),
  180. )
  181. valid_entries.append(entry)
  182. except Exception as e:
  183. logger.info("Failed to verify user is in group: %s", e)
  184. res["chunk"] = valid_entries
  185. defer.returnValue(res)
  186. @defer.inlineCallbacks
  187. def join_group(self, group_id, user_id, content):
  188. """Request to join a group
  189. """
  190. if self.is_mine_id(group_id):
  191. yield self.groups_server_handler.join_group(
  192. group_id, user_id, content
  193. )
  194. local_attestation = None
  195. remote_attestation = None
  196. else:
  197. local_attestation = self.attestations.create_attestation(group_id, user_id)
  198. content["attestation"] = local_attestation
  199. res = yield self.transport_client.join_group(
  200. get_domain_from_id(group_id), group_id, user_id, content,
  201. )
  202. remote_attestation = res["attestation"]
  203. yield self.attestations.verify_attestation(
  204. remote_attestation,
  205. group_id=group_id,
  206. user_id=user_id,
  207. server_name=get_domain_from_id(group_id),
  208. )
  209. # TODO: Check that the group is public and we're being added publically
  210. is_publicised = content.get("publicise", False)
  211. token = yield self.store.register_user_group_membership(
  212. group_id, user_id,
  213. membership="join",
  214. is_admin=False,
  215. local_attestation=local_attestation,
  216. remote_attestation=remote_attestation,
  217. is_publicised=is_publicised,
  218. )
  219. self.notifier.on_new_event(
  220. "groups_key", token, users=[user_id],
  221. )
  222. defer.returnValue({})
  223. @defer.inlineCallbacks
  224. def accept_invite(self, group_id, user_id, content):
  225. """Accept an invite to a group
  226. """
  227. if self.is_mine_id(group_id):
  228. yield self.groups_server_handler.accept_invite(
  229. group_id, user_id, content
  230. )
  231. local_attestation = None
  232. remote_attestation = None
  233. else:
  234. local_attestation = self.attestations.create_attestation(group_id, user_id)
  235. content["attestation"] = local_attestation
  236. res = yield self.transport_client.accept_group_invite(
  237. get_domain_from_id(group_id), group_id, user_id, content,
  238. )
  239. remote_attestation = res["attestation"]
  240. yield self.attestations.verify_attestation(
  241. remote_attestation,
  242. group_id=group_id,
  243. user_id=user_id,
  244. server_name=get_domain_from_id(group_id),
  245. )
  246. # TODO: Check that the group is public and we're being added publically
  247. is_publicised = content.get("publicise", False)
  248. token = yield self.store.register_user_group_membership(
  249. group_id, user_id,
  250. membership="join",
  251. is_admin=False,
  252. local_attestation=local_attestation,
  253. remote_attestation=remote_attestation,
  254. is_publicised=is_publicised,
  255. )
  256. self.notifier.on_new_event(
  257. "groups_key", token, users=[user_id],
  258. )
  259. defer.returnValue({})
  260. @defer.inlineCallbacks
  261. def invite(self, group_id, user_id, requester_user_id, config):
  262. """Invite a user to a group
  263. """
  264. content = {
  265. "requester_user_id": requester_user_id,
  266. "config": config,
  267. }
  268. if self.is_mine_id(group_id):
  269. res = yield self.groups_server_handler.invite_to_group(
  270. group_id, user_id, requester_user_id, content,
  271. )
  272. else:
  273. res = yield self.transport_client.invite_to_group(
  274. get_domain_from_id(group_id), group_id, user_id, requester_user_id,
  275. content,
  276. )
  277. defer.returnValue(res)
  278. @defer.inlineCallbacks
  279. def on_invite(self, group_id, user_id, content):
  280. """One of our users were invited to a group
  281. """
  282. # TODO: Support auto join and rejection
  283. if not self.is_mine_id(user_id):
  284. raise SynapseError(400, "User not on this server")
  285. local_profile = {}
  286. if "profile" in content:
  287. if "name" in content["profile"]:
  288. local_profile["name"] = content["profile"]["name"]
  289. if "avatar_url" in content["profile"]:
  290. local_profile["avatar_url"] = content["profile"]["avatar_url"]
  291. token = yield self.store.register_user_group_membership(
  292. group_id, user_id,
  293. membership="invite",
  294. content={"profile": local_profile, "inviter": content["inviter"]},
  295. )
  296. self.notifier.on_new_event(
  297. "groups_key", token, users=[user_id],
  298. )
  299. try:
  300. user_profile = yield self.profile_handler.get_profile(user_id)
  301. except Exception as e:
  302. logger.warn("No profile for user %s: %s", user_id, e)
  303. user_profile = {}
  304. defer.returnValue({"state": "invite", "user_profile": user_profile})
  305. @defer.inlineCallbacks
  306. def remove_user_from_group(self, group_id, user_id, requester_user_id, content):
  307. """Remove a user from a group
  308. """
  309. if user_id == requester_user_id:
  310. token = yield self.store.register_user_group_membership(
  311. group_id, user_id,
  312. membership="leave",
  313. )
  314. self.notifier.on_new_event(
  315. "groups_key", token, users=[user_id],
  316. )
  317. # TODO: Should probably remember that we tried to leave so that we can
  318. # retry if the group server is currently down.
  319. if self.is_mine_id(group_id):
  320. res = yield self.groups_server_handler.remove_user_from_group(
  321. group_id, user_id, requester_user_id, content,
  322. )
  323. else:
  324. content["requester_user_id"] = requester_user_id
  325. res = yield self.transport_client.remove_user_from_group(
  326. get_domain_from_id(group_id), group_id, requester_user_id,
  327. user_id, content,
  328. )
  329. defer.returnValue(res)
  330. @defer.inlineCallbacks
  331. def user_removed_from_group(self, group_id, user_id, content):
  332. """One of our users was removed/kicked from a group
  333. """
  334. # TODO: Check if user in group
  335. token = yield self.store.register_user_group_membership(
  336. group_id, user_id,
  337. membership="leave",
  338. )
  339. self.notifier.on_new_event(
  340. "groups_key", token, users=[user_id],
  341. )
  342. @defer.inlineCallbacks
  343. def get_joined_groups(self, user_id):
  344. group_ids = yield self.store.get_joined_groups(user_id)
  345. defer.returnValue({"groups": group_ids})
  346. @defer.inlineCallbacks
  347. def get_publicised_groups_for_user(self, user_id):
  348. if self.hs.is_mine_id(user_id):
  349. result = yield self.store.get_publicised_groups_for_user(user_id)
  350. # Check AS associated groups for this user - this depends on the
  351. # RegExps in the AS registration file (under `users`)
  352. for app_service in self.store.get_app_services():
  353. result.extend(app_service.get_groups_for_user(user_id))
  354. defer.returnValue({"groups": result})
  355. else:
  356. bulk_result = yield self.transport_client.bulk_get_publicised_groups(
  357. get_domain_from_id(user_id), [user_id],
  358. )
  359. result = bulk_result.get("users", {}).get(user_id)
  360. # TODO: Verify attestations
  361. defer.returnValue({"groups": result})
  362. @defer.inlineCallbacks
  363. def bulk_get_publicised_groups(self, user_ids, proxy=True):
  364. destinations = {}
  365. local_users = set()
  366. for user_id in user_ids:
  367. if self.hs.is_mine_id(user_id):
  368. local_users.add(user_id)
  369. else:
  370. destinations.setdefault(
  371. get_domain_from_id(user_id), set()
  372. ).add(user_id)
  373. if not proxy and destinations:
  374. raise SynapseError(400, "Some user_ids are not local")
  375. results = {}
  376. failed_results = []
  377. for destination, dest_user_ids in iteritems(destinations):
  378. try:
  379. r = yield self.transport_client.bulk_get_publicised_groups(
  380. destination, list(dest_user_ids),
  381. )
  382. results.update(r["users"])
  383. except Exception:
  384. failed_results.extend(dest_user_ids)
  385. for uid in local_users:
  386. results[uid] = yield self.store.get_publicised_groups_for_user(
  387. uid
  388. )
  389. # Check AS associated groups for this user - this depends on the
  390. # RegExps in the AS registration file (under `users`)
  391. for app_service in self.store.get_app_services():
  392. results[uid].extend(app_service.get_groups_for_user(uid))
  393. defer.returnValue({"users": results})