admin_cmd.py 8.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. # Copyright 2019 Matrix.org Foundation C.I.C.
  4. #
  5. # Licensed under the Apache License, Version 2.0 (the "License");
  6. # you may not use this file except in compliance with the License.
  7. # You may obtain a copy of the License at
  8. #
  9. # http://www.apache.org/licenses/LICENSE-2.0
  10. #
  11. # Unless required by applicable law or agreed to in writing, software
  12. # distributed under the License is distributed on an "AS IS" BASIS,
  13. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  14. # See the License for the specific language governing permissions and
  15. # limitations under the License.
  16. import argparse
  17. import logging
  18. import os
  19. import sys
  20. import tempfile
  21. from canonicaljson import json
  22. from twisted.internet import defer, task
  23. import synapse
  24. from synapse.app import _base
  25. from synapse.config._base import ConfigError
  26. from synapse.config.homeserver import HomeServerConfig
  27. from synapse.config.logger import setup_logging
  28. from synapse.handlers.admin import ExfiltrationWriter
  29. from synapse.replication.slave.storage._base import BaseSlavedStore
  30. from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
  31. from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
  32. from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
  33. from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
  34. from synapse.replication.slave.storage.devices import SlavedDeviceStore
  35. from synapse.replication.slave.storage.events import SlavedEventStore
  36. from synapse.replication.slave.storage.filtering import SlavedFilteringStore
  37. from synapse.replication.slave.storage.groups import SlavedGroupServerStore
  38. from synapse.replication.slave.storage.presence import SlavedPresenceStore
  39. from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
  40. from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
  41. from synapse.replication.slave.storage.registration import SlavedRegistrationStore
  42. from synapse.replication.slave.storage.room import RoomStore
  43. from synapse.replication.tcp.client import ReplicationClientHandler
  44. from synapse.server import HomeServer
  45. from synapse.storage.engines import create_engine
  46. from synapse.util.logcontext import LoggingContext
  47. from synapse.util.versionstring import get_version_string
  48. logger = logging.getLogger("synapse.app.admin_cmd")
  49. class AdminCmdSlavedStore(
  50. SlavedReceiptsStore,
  51. SlavedAccountDataStore,
  52. SlavedApplicationServiceStore,
  53. SlavedRegistrationStore,
  54. SlavedFilteringStore,
  55. SlavedPresenceStore,
  56. SlavedGroupServerStore,
  57. SlavedDeviceInboxStore,
  58. SlavedDeviceStore,
  59. SlavedPushRuleStore,
  60. SlavedEventStore,
  61. SlavedClientIpStore,
  62. RoomStore,
  63. BaseSlavedStore,
  64. ):
  65. pass
  66. class AdminCmdServer(HomeServer):
  67. DATASTORE_CLASS = AdminCmdSlavedStore
  68. def _listen_http(self, listener_config):
  69. pass
  70. def start_listening(self, listeners):
  71. pass
  72. def build_tcp_replication(self):
  73. return AdminCmdReplicationHandler(self)
  74. class AdminCmdReplicationHandler(ReplicationClientHandler):
  75. @defer.inlineCallbacks
  76. def on_rdata(self, stream_name, token, rows):
  77. pass
  78. def get_streams_to_replicate(self):
  79. return {}
  80. @defer.inlineCallbacks
  81. def export_data_command(hs, args):
  82. """Export data for a user.
  83. Args:
  84. hs (HomeServer)
  85. args (argparse.Namespace)
  86. """
  87. user_id = args.user_id
  88. directory = args.output_directory
  89. res = yield hs.get_handlers().admin_handler.export_user_data(
  90. user_id, FileExfiltrationWriter(user_id, directory=directory)
  91. )
  92. print(res)
  93. class FileExfiltrationWriter(ExfiltrationWriter):
  94. """An ExfiltrationWriter that writes the users data to a directory.
  95. Returns the directory location on completion.
  96. Note: This writes to disk on the main reactor thread.
  97. Args:
  98. user_id (str): The user whose data is being exfiltrated.
  99. directory (str|None): The directory to write the data to, if None then
  100. will write to a temporary directory.
  101. """
  102. def __init__(self, user_id, directory=None):
  103. self.user_id = user_id
  104. if directory:
  105. self.base_directory = directory
  106. else:
  107. self.base_directory = tempfile.mkdtemp(
  108. prefix="synapse-exfiltrate__%s__" % (user_id,)
  109. )
  110. os.makedirs(self.base_directory, exist_ok=True)
  111. if list(os.listdir(self.base_directory)):
  112. raise Exception("Directory must be empty")
  113. def write_events(self, room_id, events):
  114. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  115. os.makedirs(room_directory, exist_ok=True)
  116. events_file = os.path.join(room_directory, "events")
  117. with open(events_file, "a") as f:
  118. for event in events:
  119. print(json.dumps(event.get_pdu_json()), file=f)
  120. def write_state(self, room_id, event_id, state):
  121. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  122. state_directory = os.path.join(room_directory, "state")
  123. os.makedirs(state_directory, exist_ok=True)
  124. event_file = os.path.join(state_directory, event_id)
  125. with open(event_file, "a") as f:
  126. for event in state.values():
  127. print(json.dumps(event.get_pdu_json()), file=f)
  128. def write_invite(self, room_id, event, state):
  129. self.write_events(room_id, [event])
  130. # We write the invite state somewhere else as they aren't full events
  131. # and are only a subset of the state at the event.
  132. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  133. os.makedirs(room_directory, exist_ok=True)
  134. invite_state = os.path.join(room_directory, "invite_state")
  135. with open(invite_state, "a") as f:
  136. for event in state.values():
  137. print(json.dumps(event), file=f)
  138. def finished(self):
  139. return self.base_directory
  140. def start(config_options):
  141. parser = argparse.ArgumentParser(description="Synapse Admin Command")
  142. HomeServerConfig.add_arguments_to_parser(parser)
  143. subparser = parser.add_subparsers(
  144. title="Admin Commands",
  145. required=True,
  146. dest="command",
  147. metavar="<admin_command>",
  148. help="The admin command to perform.",
  149. )
  150. export_data_parser = subparser.add_parser(
  151. "export-data", help="Export all data for a user"
  152. )
  153. export_data_parser.add_argument("user_id", help="User to extra data from")
  154. export_data_parser.add_argument(
  155. "--output-directory",
  156. action="store",
  157. metavar="DIRECTORY",
  158. required=False,
  159. help="The directory to store the exported data in. Must be empty. Defaults"
  160. " to creating a temp directory.",
  161. )
  162. export_data_parser.set_defaults(func=export_data_command)
  163. try:
  164. config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
  165. except ConfigError as e:
  166. sys.stderr.write("\n" + str(e) + "\n")
  167. sys.exit(1)
  168. if config.worker_app is not None:
  169. assert config.worker_app == "synapse.app.admin_cmd"
  170. # Update the config with some basic overrides so that don't have to specify
  171. # a full worker config.
  172. config.worker_app = "synapse.app.admin_cmd"
  173. if (
  174. not config.worker_daemonize
  175. and not config.worker_log_file
  176. and not config.worker_log_config
  177. ):
  178. # Since we're meant to be run as a "command" let's not redirect stdio
  179. # unless we've actually set log config.
  180. config.no_redirect_stdio = True
  181. # Explicitly disable background processes
  182. config.update_user_directory = False
  183. config.start_pushers = False
  184. config.send_federation = False
  185. synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
  186. database_engine = create_engine(config.database_config)
  187. ss = AdminCmdServer(
  188. config.server_name,
  189. db_config=config.database_config,
  190. config=config,
  191. version_string="Synapse/" + get_version_string(synapse),
  192. database_engine=database_engine,
  193. )
  194. setup_logging(ss, config, use_worker_options=True)
  195. ss.setup()
  196. # We use task.react as the basic run command as it correctly handles tearing
  197. # down the reactor when the deferreds resolve and setting the return value.
  198. # We also make sure that `_base.start` gets run before we actually run the
  199. # command.
  200. @defer.inlineCallbacks
  201. def run(_reactor):
  202. with LoggingContext("command"):
  203. yield _base.start(ss, [])
  204. yield args.func(ss, args)
  205. _base.start_worker_reactor(
  206. "synapse-admin-cmd", config, run_command=lambda: task.react(run)
  207. )
  208. if __name__ == "__main__":
  209. with LoggingContext("main"):
  210. start(sys.argv[1:])