admin.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307
  1. # Copyright 2014-2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import abc
  15. import logging
  16. from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set
  17. from synapse.api.constants import Membership
  18. from synapse.events import EventBase
  19. from synapse.types import JsonDict, RoomStreamToken, StateMap, UserID
  20. from synapse.visibility import filter_events_for_client
  21. if TYPE_CHECKING:
  22. from synapse.server import HomeServer
  23. logger = logging.getLogger(__name__)
  24. class AdminHandler:
  25. def __init__(self, hs: "HomeServer"):
  26. self.store = hs.get_datastores().main
  27. self._storage_controllers = hs.get_storage_controllers()
  28. self._state_storage_controller = self._storage_controllers.state
  29. self._msc3866_enabled = hs.config.experimental.msc3866.enabled
  30. async def get_whois(self, user: UserID) -> JsonDict:
  31. connections = []
  32. sessions = await self.store.get_user_ip_and_agents(user)
  33. for session in sessions:
  34. connections.append(
  35. {
  36. "ip": session["ip"],
  37. "last_seen": session["last_seen"],
  38. "user_agent": session["user_agent"],
  39. }
  40. )
  41. ret = {
  42. "user_id": user.to_string(),
  43. "devices": {"": {"sessions": [{"connections": connections}]}},
  44. }
  45. return ret
  46. async def get_user(self, user: UserID) -> Optional[JsonDict]:
  47. """Function to get user details"""
  48. user_info_dict = await self.store.get_user_by_id(user.to_string())
  49. if user_info_dict is None:
  50. return None
  51. # Restrict returned information to a known set of fields. This prevents additional
  52. # fields added to get_user_by_id from modifying Synapse's external API surface.
  53. user_info_to_return = {
  54. "name",
  55. "admin",
  56. "deactivated",
  57. "shadow_banned",
  58. "creation_ts",
  59. "appservice_id",
  60. "consent_server_notice_sent",
  61. "consent_version",
  62. "consent_ts",
  63. "user_type",
  64. "is_guest",
  65. }
  66. if self._msc3866_enabled:
  67. # Only include the approved flag if support for MSC3866 is enabled.
  68. user_info_to_return.add("approved")
  69. # Restrict returned keys to a known set.
  70. user_info_dict = {
  71. key: value
  72. for key, value in user_info_dict.items()
  73. if key in user_info_to_return
  74. }
  75. # Add additional user metadata
  76. profile = await self.store.get_profileinfo(user.localpart)
  77. threepids = await self.store.user_get_threepids(user.to_string())
  78. external_ids = [
  79. ({"auth_provider": auth_provider, "external_id": external_id})
  80. for auth_provider, external_id in await self.store.get_external_ids_by_user(
  81. user.to_string()
  82. )
  83. ]
  84. user_info_dict["displayname"] = profile.display_name
  85. user_info_dict["avatar_url"] = profile.avatar_url
  86. user_info_dict["threepids"] = threepids
  87. user_info_dict["external_ids"] = external_ids
  88. user_info_dict["erased"] = await self.store.is_user_erased(user.to_string())
  89. return user_info_dict
  90. async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> Any:
  91. """Write all data we have on the user to the given writer.
  92. Args:
  93. user_id: The user ID to fetch data of.
  94. writer: The writer to write to.
  95. Returns:
  96. Resolves when all data for a user has been written.
  97. The returned value is that returned by `writer.finished()`.
  98. """
  99. # Get all rooms the user is in or has been in
  100. rooms = await self.store.get_rooms_for_local_user_where_membership_is(
  101. user_id,
  102. membership_list=(
  103. Membership.JOIN,
  104. Membership.LEAVE,
  105. Membership.BAN,
  106. Membership.INVITE,
  107. Membership.KNOCK,
  108. ),
  109. )
  110. # We only try and fetch events for rooms the user has been in. If
  111. # they've been e.g. invited to a room without joining then we handle
  112. # those separately.
  113. rooms_user_has_been_in = await self.store.get_rooms_user_has_been_in(user_id)
  114. for index, room in enumerate(rooms):
  115. room_id = room.room_id
  116. logger.info(
  117. "[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms)
  118. )
  119. forgotten = await self.store.did_forget(user_id, room_id)
  120. if forgotten:
  121. logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
  122. continue
  123. if room_id not in rooms_user_has_been_in:
  124. # If we haven't been in the rooms then the filtering code below
  125. # won't return anything, so we need to handle these cases
  126. # explicitly.
  127. if room.membership == Membership.INVITE:
  128. event_id = room.event_id
  129. invite = await self.store.get_event(event_id, allow_none=True)
  130. if invite:
  131. invited_state = invite.unsigned["invite_room_state"]
  132. writer.write_invite(room_id, invite, invited_state)
  133. if room.membership == Membership.KNOCK:
  134. event_id = room.event_id
  135. knock = await self.store.get_event(event_id, allow_none=True)
  136. if knock:
  137. knock_state = knock.unsigned["knock_room_state"]
  138. writer.write_knock(room_id, knock, knock_state)
  139. continue
  140. # We only want to bother fetching events up to the last time they
  141. # were joined. We estimate that point by looking at the
  142. # stream_ordering of the last membership if it wasn't a join.
  143. if room.membership == Membership.JOIN:
  144. stream_ordering = self.store.get_room_max_stream_ordering()
  145. else:
  146. stream_ordering = room.stream_ordering
  147. from_key = RoomStreamToken(0, 0)
  148. to_key = RoomStreamToken(None, stream_ordering)
  149. # Events that we've processed in this room
  150. written_events: Set[str] = set()
  151. # We need to track gaps in the events stream so that we can then
  152. # write out the state at those events. We do this by keeping track
  153. # of events whose prev events we haven't seen.
  154. # Map from event ID to prev events that haven't been processed,
  155. # dict[str, set[str]].
  156. event_to_unseen_prevs = {}
  157. # The reverse mapping to above, i.e. map from unseen event to events
  158. # that have the unseen event in their prev_events, i.e. the unseen
  159. # events "children".
  160. unseen_to_child_events: Dict[str, Set[str]] = {}
  161. # We fetch events in the room the user could see by fetching *all*
  162. # events that we have and then filtering, this isn't the most
  163. # efficient method perhaps but it does guarantee we get everything.
  164. while True:
  165. events, _ = await self.store.paginate_room_events(
  166. room_id, from_key, to_key, limit=100, direction="f"
  167. )
  168. if not events:
  169. break
  170. from_key = events[-1].internal_metadata.after
  171. events = await filter_events_for_client(
  172. self._storage_controllers, user_id, events
  173. )
  174. writer.write_events(room_id, events)
  175. # Update the extremity tracking dicts
  176. for event in events:
  177. # Check if we have any prev events that haven't been
  178. # processed yet, and add those to the appropriate dicts.
  179. unseen_events = set(event.prev_event_ids()) - written_events
  180. if unseen_events:
  181. event_to_unseen_prevs[event.event_id] = unseen_events
  182. for unseen in unseen_events:
  183. unseen_to_child_events.setdefault(unseen, set()).add(
  184. event.event_id
  185. )
  186. # Now check if this event is an unseen prev event, if so
  187. # then we remove this event from the appropriate dicts.
  188. for child_id in unseen_to_child_events.pop(event.event_id, []):
  189. event_to_unseen_prevs[child_id].discard(event.event_id)
  190. written_events.add(event.event_id)
  191. logger.info(
  192. "Written %d events in room %s", len(written_events), room_id
  193. )
  194. # Extremities are the events who have at least one unseen prev event.
  195. extremities = (
  196. event_id
  197. for event_id, unseen_prevs in event_to_unseen_prevs.items()
  198. if unseen_prevs
  199. )
  200. for event_id in extremities:
  201. if not event_to_unseen_prevs[event_id]:
  202. continue
  203. state = await self._state_storage_controller.get_state_for_event(
  204. event_id
  205. )
  206. writer.write_state(room_id, event_id, state)
  207. return writer.finished()
  208. class ExfiltrationWriter(metaclass=abc.ABCMeta):
  209. """Interface used to specify how to write exported data."""
  210. @abc.abstractmethod
  211. def write_events(self, room_id: str, events: List[EventBase]) -> None:
  212. """Write a batch of events for a room."""
  213. raise NotImplementedError()
  214. @abc.abstractmethod
  215. def write_state(
  216. self, room_id: str, event_id: str, state: StateMap[EventBase]
  217. ) -> None:
  218. """Write the state at the given event in the room.
  219. This only gets called for backward extremities rather than for each
  220. event.
  221. """
  222. raise NotImplementedError()
  223. @abc.abstractmethod
  224. def write_invite(
  225. self, room_id: str, event: EventBase, state: StateMap[EventBase]
  226. ) -> None:
  227. """Write an invite for the room, with associated invite state.
  228. Args:
  229. room_id: The room ID the invite is for.
  230. event: The invite event.
  231. state: A subset of the state at the invite, with a subset of the
  232. event keys (type, state_key content and sender).
  233. """
  234. raise NotImplementedError()
  235. @abc.abstractmethod
  236. def write_knock(
  237. self, room_id: str, event: EventBase, state: StateMap[EventBase]
  238. ) -> None:
  239. """Write a knock for the room, with associated knock state.
  240. Args:
  241. room_id: The room ID the knock is for.
  242. event: The knock event.
  243. state: A subset of the state at the knock, with a subset of the
  244. event keys (type, state_key content and sender).
  245. """
  246. raise NotImplementedError()
  247. @abc.abstractmethod
  248. def finished(self) -> Any:
  249. """Called when all data has successfully been exported and written.
  250. This functions return value is passed to the caller of
  251. `export_user_data`.
  252. """
  253. raise NotImplementedError()