admin.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from typing import List
  17. from synapse.api.constants import Membership
  18. from synapse.events import FrozenEvent
  19. from synapse.types import RoomStreamToken, StateMap
  20. from synapse.visibility import filter_events_for_client
  21. from ._base import BaseHandler
  22. logger = logging.getLogger(__name__)
  23. class AdminHandler(BaseHandler):
  24. def __init__(self, hs):
  25. super(AdminHandler, self).__init__(hs)
  26. self.storage = hs.get_storage()
  27. self.state_store = self.storage.state
  28. async def get_whois(self, user):
  29. connections = []
  30. sessions = await self.store.get_user_ip_and_agents(user)
  31. for session in sessions:
  32. connections.append(
  33. {
  34. "ip": session["ip"],
  35. "last_seen": session["last_seen"],
  36. "user_agent": session["user_agent"],
  37. }
  38. )
  39. ret = {
  40. "user_id": user.to_string(),
  41. "devices": {"": {"sessions": [{"connections": connections}]}},
  42. }
  43. return ret
  44. async def get_user(self, user):
  45. """Function to get user details"""
  46. ret = await self.store.get_user_by_id(user.to_string())
  47. if ret:
  48. profile = await self.store.get_profileinfo(user.localpart)
  49. threepids = await self.store.user_get_threepids(user.to_string())
  50. ret["displayname"] = profile.display_name
  51. ret["avatar_url"] = profile.avatar_url
  52. ret["threepids"] = threepids
  53. return ret
  54. async def export_user_data(self, user_id, writer):
  55. """Write all data we have on the user to the given writer.
  56. Args:
  57. user_id (str)
  58. writer (ExfiltrationWriter)
  59. Returns:
  60. defer.Deferred: Resolves when all data for a user has been written.
  61. The returned value is that returned by `writer.finished()`.
  62. """
  63. # Get all rooms the user is in or has been in
  64. rooms = await self.store.get_rooms_for_local_user_where_membership_is(
  65. user_id,
  66. membership_list=(
  67. Membership.JOIN,
  68. Membership.LEAVE,
  69. Membership.BAN,
  70. Membership.INVITE,
  71. ),
  72. )
  73. # We only try and fetch events for rooms the user has been in. If
  74. # they've been e.g. invited to a room without joining then we handle
  75. # those seperately.
  76. rooms_user_has_been_in = await self.store.get_rooms_user_has_been_in(user_id)
  77. for index, room in enumerate(rooms):
  78. room_id = room.room_id
  79. logger.info(
  80. "[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms)
  81. )
  82. forgotten = await self.store.did_forget(user_id, room_id)
  83. if forgotten:
  84. logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
  85. continue
  86. if room_id not in rooms_user_has_been_in:
  87. # If we haven't been in the rooms then the filtering code below
  88. # won't return anything, so we need to handle these cases
  89. # explicitly.
  90. if room.membership == Membership.INVITE:
  91. event_id = room.event_id
  92. invite = await self.store.get_event(event_id, allow_none=True)
  93. if invite:
  94. invited_state = invite.unsigned["invite_room_state"]
  95. writer.write_invite(room_id, invite, invited_state)
  96. continue
  97. # We only want to bother fetching events up to the last time they
  98. # were joined. We estimate that point by looking at the
  99. # stream_ordering of the last membership if it wasn't a join.
  100. if room.membership == Membership.JOIN:
  101. stream_ordering = self.store.get_room_max_stream_ordering()
  102. else:
  103. stream_ordering = room.stream_ordering
  104. from_key = str(RoomStreamToken(0, 0))
  105. to_key = str(RoomStreamToken(None, stream_ordering))
  106. written_events = set() # Events that we've processed in this room
  107. # We need to track gaps in the events stream so that we can then
  108. # write out the state at those events. We do this by keeping track
  109. # of events whose prev events we haven't seen.
  110. # Map from event ID to prev events that haven't been processed,
  111. # dict[str, set[str]].
  112. event_to_unseen_prevs = {}
  113. # The reverse mapping to above, i.e. map from unseen event to events
  114. # that have the unseen event in their prev_events, i.e. the unseen
  115. # events "children". dict[str, set[str]]
  116. unseen_to_child_events = {}
  117. # We fetch events in the room the user could see by fetching *all*
  118. # events that we have and then filtering, this isn't the most
  119. # efficient method perhaps but it does guarantee we get everything.
  120. while True:
  121. events, _ = await self.store.paginate_room_events(
  122. room_id, from_key, to_key, limit=100, direction="f"
  123. )
  124. if not events:
  125. break
  126. from_key = events[-1].internal_metadata.after
  127. events = await filter_events_for_client(self.storage, user_id, events)
  128. writer.write_events(room_id, events)
  129. # Update the extremity tracking dicts
  130. for event in events:
  131. # Check if we have any prev events that haven't been
  132. # processed yet, and add those to the appropriate dicts.
  133. unseen_events = set(event.prev_event_ids()) - written_events
  134. if unseen_events:
  135. event_to_unseen_prevs[event.event_id] = unseen_events
  136. for unseen in unseen_events:
  137. unseen_to_child_events.setdefault(unseen, set()).add(
  138. event.event_id
  139. )
  140. # Now check if this event is an unseen prev event, if so
  141. # then we remove this event from the appropriate dicts.
  142. for child_id in unseen_to_child_events.pop(event.event_id, []):
  143. event_to_unseen_prevs[child_id].discard(event.event_id)
  144. written_events.add(event.event_id)
  145. logger.info(
  146. "Written %d events in room %s", len(written_events), room_id
  147. )
  148. # Extremities are the events who have at least one unseen prev event.
  149. extremities = (
  150. event_id
  151. for event_id, unseen_prevs in event_to_unseen_prevs.items()
  152. if unseen_prevs
  153. )
  154. for event_id in extremities:
  155. if not event_to_unseen_prevs[event_id]:
  156. continue
  157. state = await self.state_store.get_state_for_event(event_id)
  158. writer.write_state(room_id, event_id, state)
  159. return writer.finished()
  160. class ExfiltrationWriter(object):
  161. """Interface used to specify how to write exported data.
  162. """
  163. def write_events(self, room_id: str, events: List[FrozenEvent]):
  164. """Write a batch of events for a room.
  165. """
  166. pass
  167. def write_state(self, room_id: str, event_id: str, state: StateMap[FrozenEvent]):
  168. """Write the state at the given event in the room.
  169. This only gets called for backward extremities rather than for each
  170. event.
  171. """
  172. pass
  173. def write_invite(self, room_id: str, event: FrozenEvent, state: StateMap[dict]):
  174. """Write an invite for the room, with associated invite state.
  175. Args:
  176. room_id
  177. event
  178. state: A subset of the state at the
  179. invite, with a subset of the event keys (type, state_key
  180. content and sender)
  181. """
  182. def finished(self):
  183. """Called when all data has succesfully been exported and written.
  184. This functions return value is passed to the caller of
  185. `export_user_data`.
  186. """
  187. pass