123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248 |
- # Copyright 2019 Matrix.org Foundation C.I.C.
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import argparse
- import json
- import logging
- import os
- import sys
- import tempfile
- from twisted.internet import defer, task
- import synapse
- from synapse.app import _base
- from synapse.config._base import ConfigError
- from synapse.config.homeserver import HomeServerConfig
- from synapse.config.logger import setup_logging
- from synapse.handlers.admin import ExfiltrationWriter
- from synapse.replication.slave.storage._base import BaseSlavedStore
- from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
- from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
- from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
- from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
- from synapse.replication.slave.storage.devices import SlavedDeviceStore
- from synapse.replication.slave.storage.events import SlavedEventStore
- from synapse.replication.slave.storage.filtering import SlavedFilteringStore
- from synapse.replication.slave.storage.groups import SlavedGroupServerStore
- from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
- from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
- from synapse.replication.slave.storage.registration import SlavedRegistrationStore
- from synapse.server import HomeServer
- from synapse.storage.databases.main.room import RoomWorkerStore
- from synapse.util.logcontext import LoggingContext
- from synapse.util.versionstring import get_version_string
- logger = logging.getLogger("synapse.app.admin_cmd")
- class AdminCmdSlavedStore(
- SlavedReceiptsStore,
- SlavedAccountDataStore,
- SlavedApplicationServiceStore,
- SlavedRegistrationStore,
- SlavedFilteringStore,
- SlavedGroupServerStore,
- SlavedDeviceInboxStore,
- SlavedDeviceStore,
- SlavedPushRuleStore,
- SlavedEventStore,
- SlavedClientIpStore,
- BaseSlavedStore,
- RoomWorkerStore,
- ):
- pass
- class AdminCmdServer(HomeServer):
- DATASTORE_CLASS = AdminCmdSlavedStore
- async def export_data_command(hs: HomeServer, args):
- """Export data for a user.
- Args:
- hs
- args (argparse.Namespace)
- """
- user_id = args.user_id
- directory = args.output_directory
- res = await hs.get_admin_handler().export_user_data(
- user_id, FileExfiltrationWriter(user_id, directory=directory)
- )
- print(res)
- class FileExfiltrationWriter(ExfiltrationWriter):
- """An ExfiltrationWriter that writes the users data to a directory.
- Returns the directory location on completion.
- Note: This writes to disk on the main reactor thread.
- Args:
- user_id (str): The user whose data is being exfiltrated.
- directory (str|None): The directory to write the data to, if None then
- will write to a temporary directory.
- """
- def __init__(self, user_id, directory=None):
- self.user_id = user_id
- if directory:
- self.base_directory = directory
- else:
- self.base_directory = tempfile.mkdtemp(
- prefix="synapse-exfiltrate__%s__" % (user_id,)
- )
- os.makedirs(self.base_directory, exist_ok=True)
- if list(os.listdir(self.base_directory)):
- raise Exception("Directory must be empty")
- def write_events(self, room_id, events):
- room_directory = os.path.join(self.base_directory, "rooms", room_id)
- os.makedirs(room_directory, exist_ok=True)
- events_file = os.path.join(room_directory, "events")
- with open(events_file, "a") as f:
- for event in events:
- print(json.dumps(event.get_pdu_json()), file=f)
- def write_state(self, room_id, event_id, state):
- room_directory = os.path.join(self.base_directory, "rooms", room_id)
- state_directory = os.path.join(room_directory, "state")
- os.makedirs(state_directory, exist_ok=True)
- event_file = os.path.join(state_directory, event_id)
- with open(event_file, "a") as f:
- for event in state.values():
- print(json.dumps(event.get_pdu_json()), file=f)
- def write_invite(self, room_id, event, state):
- self.write_events(room_id, [event])
- # We write the invite state somewhere else as they aren't full events
- # and are only a subset of the state at the event.
- room_directory = os.path.join(self.base_directory, "rooms", room_id)
- os.makedirs(room_directory, exist_ok=True)
- invite_state = os.path.join(room_directory, "invite_state")
- with open(invite_state, "a") as f:
- for event in state.values():
- print(json.dumps(event), file=f)
- def write_knock(self, room_id, event, state):
- self.write_events(room_id, [event])
- # We write the knock state somewhere else as they aren't full events
- # and are only a subset of the state at the event.
- room_directory = os.path.join(self.base_directory, "rooms", room_id)
- os.makedirs(room_directory, exist_ok=True)
- knock_state = os.path.join(room_directory, "knock_state")
- with open(knock_state, "a") as f:
- for event in state.values():
- print(json.dumps(event), file=f)
- def finished(self):
- return self.base_directory
- def start(config_options):
- parser = argparse.ArgumentParser(description="Synapse Admin Command")
- HomeServerConfig.add_arguments_to_parser(parser)
- subparser = parser.add_subparsers(
- title="Admin Commands",
- required=True,
- dest="command",
- metavar="<admin_command>",
- help="The admin command to perform.",
- )
- export_data_parser = subparser.add_parser(
- "export-data", help="Export all data for a user"
- )
- export_data_parser.add_argument("user_id", help="User to extra data from")
- export_data_parser.add_argument(
- "--output-directory",
- action="store",
- metavar="DIRECTORY",
- required=False,
- help="The directory to store the exported data in. Must be empty. Defaults"
- " to creating a temp directory.",
- )
- export_data_parser.set_defaults(func=export_data_command)
- try:
- config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
- except ConfigError as e:
- sys.stderr.write("\n" + str(e) + "\n")
- sys.exit(1)
- if config.worker.worker_app is not None:
- assert config.worker.worker_app == "synapse.app.admin_cmd"
- # Update the config with some basic overrides so that don't have to specify
- # a full worker config.
- config.worker.worker_app = "synapse.app.admin_cmd"
- if not config.worker.worker_daemonize and not config.worker.worker_log_config:
- # Since we're meant to be run as a "command" let's not redirect stdio
- # unless we've actually set log config.
- config.logging.no_redirect_stdio = True
- # Explicitly disable background processes
- config.server.update_user_directory = False
- config.worker.run_background_tasks = False
- config.worker.start_pushers = False
- config.worker.pusher_shard_config.instances = []
- config.worker.send_federation = False
- config.worker.federation_shard_config.instances = []
- synapse.events.USE_FROZEN_DICTS = config.server.use_frozen_dicts
- ss = AdminCmdServer(
- config.server.server_name,
- config=config,
- version_string="Synapse/" + get_version_string(synapse),
- )
- setup_logging(ss, config, use_worker_options=True)
- ss.setup()
- # We use task.react as the basic run command as it correctly handles tearing
- # down the reactor when the deferreds resolve and setting the return value.
- # We also make sure that `_base.start` gets run before we actually run the
- # command.
- async def run():
- with LoggingContext("command"):
- await _base.start(ss)
- await args.func(ss, args)
- _base.start_worker_reactor(
- "synapse-admin-cmd",
- config,
- run_command=lambda: task.react(lambda _reactor: defer.ensureDeferred(run())),
- )
- if __name__ == "__main__":
- with LoggingContext("main"):
- start(sys.argv[1:])
|