admin.py 9.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014-2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. from twisted.internet import defer
  17. from synapse.api.constants import Membership
  18. from synapse.types import RoomStreamToken
  19. from synapse.visibility import filter_events_for_client
  20. from ._base import BaseHandler
  21. logger = logging.getLogger(__name__)
  22. class AdminHandler(BaseHandler):
  23. def __init__(self, hs):
  24. super(AdminHandler, self).__init__(hs)
  25. @defer.inlineCallbacks
  26. def get_whois(self, user):
  27. connections = []
  28. sessions = yield self.store.get_user_ip_and_agents(user)
  29. for session in sessions:
  30. connections.append(
  31. {
  32. "ip": session["ip"],
  33. "last_seen": session["last_seen"],
  34. "user_agent": session["user_agent"],
  35. }
  36. )
  37. ret = {
  38. "user_id": user.to_string(),
  39. "devices": {"": {"sessions": [{"connections": connections}]}},
  40. }
  41. return ret
  42. @defer.inlineCallbacks
  43. def get_users(self):
  44. """Function to reterive a list of users in users table.
  45. Args:
  46. Returns:
  47. defer.Deferred: resolves to list[dict[str, Any]]
  48. """
  49. ret = yield self.store.get_users()
  50. return ret
  51. @defer.inlineCallbacks
  52. def get_users_paginate(self, order, start, limit):
  53. """Function to reterive a paginated list of users from
  54. users list. This will return a json object, which contains
  55. list of users and the total number of users in users table.
  56. Args:
  57. order (str): column name to order the select by this column
  58. start (int): start number to begin the query from
  59. limit (int): number of rows to reterive
  60. Returns:
  61. defer.Deferred: resolves to json object {list[dict[str, Any]], count}
  62. """
  63. ret = yield self.store.get_users_paginate(order, start, limit)
  64. return ret
  65. @defer.inlineCallbacks
  66. def search_users(self, term):
  67. """Function to search users list for one or more users with
  68. the matched term.
  69. Args:
  70. term (str): search term
  71. Returns:
  72. defer.Deferred: resolves to list[dict[str, Any]]
  73. """
  74. ret = yield self.store.search_users(term)
  75. return ret
  76. @defer.inlineCallbacks
  77. def export_user_data(self, user_id, writer):
  78. """Write all data we have on the user to the given writer.
  79. Args:
  80. user_id (str)
  81. writer (ExfiltrationWriter)
  82. Returns:
  83. defer.Deferred: Resolves when all data for a user has been written.
  84. The returned value is that returned by `writer.finished()`.
  85. """
  86. # Get all rooms the user is in or has been in
  87. rooms = yield self.store.get_rooms_for_user_where_membership_is(
  88. user_id,
  89. membership_list=(
  90. Membership.JOIN,
  91. Membership.LEAVE,
  92. Membership.BAN,
  93. Membership.INVITE,
  94. ),
  95. )
  96. # We only try and fetch events for rooms the user has been in. If
  97. # they've been e.g. invited to a room without joining then we handle
  98. # those seperately.
  99. rooms_user_has_been_in = yield self.store.get_rooms_user_has_been_in(user_id)
  100. for index, room in enumerate(rooms):
  101. room_id = room.room_id
  102. logger.info(
  103. "[%s] Handling room %s, %d/%d", user_id, room_id, index + 1, len(rooms)
  104. )
  105. forgotten = yield self.store.did_forget(user_id, room_id)
  106. if forgotten:
  107. logger.info("[%s] User forgot room %d, ignoring", user_id, room_id)
  108. continue
  109. if room_id not in rooms_user_has_been_in:
  110. # If we haven't been in the rooms then the filtering code below
  111. # won't return anything, so we need to handle these cases
  112. # explicitly.
  113. if room.membership == Membership.INVITE:
  114. event_id = room.event_id
  115. invite = yield self.store.get_event(event_id, allow_none=True)
  116. if invite:
  117. invited_state = invite.unsigned["invite_room_state"]
  118. writer.write_invite(room_id, invite, invited_state)
  119. continue
  120. # We only want to bother fetching events up to the last time they
  121. # were joined. We estimate that point by looking at the
  122. # stream_ordering of the last membership if it wasn't a join.
  123. if room.membership == Membership.JOIN:
  124. stream_ordering = yield self.store.get_room_max_stream_ordering()
  125. else:
  126. stream_ordering = room.stream_ordering
  127. from_key = str(RoomStreamToken(0, 0))
  128. to_key = str(RoomStreamToken(None, stream_ordering))
  129. written_events = set() # Events that we've processed in this room
  130. # We need to track gaps in the events stream so that we can then
  131. # write out the state at those events. We do this by keeping track
  132. # of events whose prev events we haven't seen.
  133. # Map from event ID to prev events that haven't been processed,
  134. # dict[str, set[str]].
  135. event_to_unseen_prevs = {}
  136. # The reverse mapping to above, i.e. map from unseen event to events
  137. # that have the unseen event in their prev_events, i.e. the unseen
  138. # events "children". dict[str, set[str]]
  139. unseen_to_child_events = {}
  140. # We fetch events in the room the user could see by fetching *all*
  141. # events that we have and then filtering, this isn't the most
  142. # efficient method perhaps but it does guarantee we get everything.
  143. while True:
  144. events, _ = yield self.store.paginate_room_events(
  145. room_id, from_key, to_key, limit=100, direction="f"
  146. )
  147. if not events:
  148. break
  149. from_key = events[-1].internal_metadata.after
  150. events = yield filter_events_for_client(self.store, user_id, events)
  151. writer.write_events(room_id, events)
  152. # Update the extremity tracking dicts
  153. for event in events:
  154. # Check if we have any prev events that haven't been
  155. # processed yet, and add those to the appropriate dicts.
  156. unseen_events = set(event.prev_event_ids()) - written_events
  157. if unseen_events:
  158. event_to_unseen_prevs[event.event_id] = unseen_events
  159. for unseen in unseen_events:
  160. unseen_to_child_events.setdefault(unseen, set()).add(
  161. event.event_id
  162. )
  163. # Now check if this event is an unseen prev event, if so
  164. # then we remove this event from the appropriate dicts.
  165. for child_id in unseen_to_child_events.pop(event.event_id, []):
  166. event_to_unseen_prevs[child_id].discard(event.event_id)
  167. written_events.add(event.event_id)
  168. logger.info(
  169. "Written %d events in room %s", len(written_events), room_id
  170. )
  171. # Extremities are the events who have at least one unseen prev event.
  172. extremities = (
  173. event_id
  174. for event_id, unseen_prevs in event_to_unseen_prevs.items()
  175. if unseen_prevs
  176. )
  177. for event_id in extremities:
  178. if not event_to_unseen_prevs[event_id]:
  179. continue
  180. state = yield self.store.get_state_for_event(event_id)
  181. writer.write_state(room_id, event_id, state)
  182. return writer.finished()
  183. class ExfiltrationWriter(object):
  184. """Interface used to specify how to write exported data.
  185. """
  186. def write_events(self, room_id, events):
  187. """Write a batch of events for a room.
  188. Args:
  189. room_id (str)
  190. events (list[FrozenEvent])
  191. """
  192. pass
  193. def write_state(self, room_id, event_id, state):
  194. """Write the state at the given event in the room.
  195. This only gets called for backward extremities rather than for each
  196. event.
  197. Args:
  198. room_id (str)
  199. event_id (str)
  200. state (dict[tuple[str, str], FrozenEvent])
  201. """
  202. pass
  203. def write_invite(self, room_id, event, state):
  204. """Write an invite for the room, with associated invite state.
  205. Args:
  206. room_id (str)
  207. event (FrozenEvent)
  208. state (dict[tuple[str, str], dict]): A subset of the state at the
  209. invite, with a subset of the event keys (type, state_key
  210. content and sender)
  211. """
  212. def finished(self):
  213. """Called when all data has succesfully been exported and written.
  214. This functions return value is passed to the caller of
  215. `export_user_data`.
  216. """
  217. pass