admin_cmd.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. #!/usr/bin/env python
  2. # Copyright 2019 Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import argparse
  16. import json
  17. import logging
  18. import os
  19. import sys
  20. import tempfile
  21. from twisted.internet import defer, task
  22. import synapse
  23. from synapse.app import _base
  24. from synapse.config._base import ConfigError
  25. from synapse.config.homeserver import HomeServerConfig
  26. from synapse.config.logger import setup_logging
  27. from synapse.handlers.admin import ExfiltrationWriter
  28. from synapse.replication.slave.storage._base import BaseSlavedStore
  29. from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
  30. from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
  31. from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
  32. from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
  33. from synapse.replication.slave.storage.devices import SlavedDeviceStore
  34. from synapse.replication.slave.storage.events import SlavedEventStore
  35. from synapse.replication.slave.storage.filtering import SlavedFilteringStore
  36. from synapse.replication.slave.storage.groups import SlavedGroupServerStore
  37. from synapse.replication.slave.storage.presence import SlavedPresenceStore
  38. from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
  39. from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
  40. from synapse.replication.slave.storage.registration import SlavedRegistrationStore
  41. from synapse.replication.slave.storage.room import RoomStore
  42. from synapse.server import HomeServer
  43. from synapse.util.logcontext import LoggingContext
  44. from synapse.util.versionstring import get_version_string
  45. logger = logging.getLogger("synapse.app.admin_cmd")
  46. class AdminCmdSlavedStore(
  47. SlavedReceiptsStore,
  48. SlavedAccountDataStore,
  49. SlavedApplicationServiceStore,
  50. SlavedRegistrationStore,
  51. SlavedFilteringStore,
  52. SlavedPresenceStore,
  53. SlavedGroupServerStore,
  54. SlavedDeviceInboxStore,
  55. SlavedDeviceStore,
  56. SlavedPushRuleStore,
  57. SlavedEventStore,
  58. SlavedClientIpStore,
  59. RoomStore,
  60. BaseSlavedStore,
  61. ):
  62. pass
  63. class AdminCmdServer(HomeServer):
  64. DATASTORE_CLASS = AdminCmdSlavedStore
  65. async def export_data_command(hs, args):
  66. """Export data for a user.
  67. Args:
  68. hs (HomeServer)
  69. args (argparse.Namespace)
  70. """
  71. user_id = args.user_id
  72. directory = args.output_directory
  73. res = await hs.get_admin_handler().export_user_data(
  74. user_id, FileExfiltrationWriter(user_id, directory=directory)
  75. )
  76. print(res)
  77. class FileExfiltrationWriter(ExfiltrationWriter):
  78. """An ExfiltrationWriter that writes the users data to a directory.
  79. Returns the directory location on completion.
  80. Note: This writes to disk on the main reactor thread.
  81. Args:
  82. user_id (str): The user whose data is being exfiltrated.
  83. directory (str|None): The directory to write the data to, if None then
  84. will write to a temporary directory.
  85. """
  86. def __init__(self, user_id, directory=None):
  87. self.user_id = user_id
  88. if directory:
  89. self.base_directory = directory
  90. else:
  91. self.base_directory = tempfile.mkdtemp(
  92. prefix="synapse-exfiltrate__%s__" % (user_id,)
  93. )
  94. os.makedirs(self.base_directory, exist_ok=True)
  95. if list(os.listdir(self.base_directory)):
  96. raise Exception("Directory must be empty")
  97. def write_events(self, room_id, events):
  98. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  99. os.makedirs(room_directory, exist_ok=True)
  100. events_file = os.path.join(room_directory, "events")
  101. with open(events_file, "a") as f:
  102. for event in events:
  103. print(json.dumps(event.get_pdu_json()), file=f)
  104. def write_state(self, room_id, event_id, state):
  105. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  106. state_directory = os.path.join(room_directory, "state")
  107. os.makedirs(state_directory, exist_ok=True)
  108. event_file = os.path.join(state_directory, event_id)
  109. with open(event_file, "a") as f:
  110. for event in state.values():
  111. print(json.dumps(event.get_pdu_json()), file=f)
  112. def write_invite(self, room_id, event, state):
  113. self.write_events(room_id, [event])
  114. # We write the invite state somewhere else as they aren't full events
  115. # and are only a subset of the state at the event.
  116. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  117. os.makedirs(room_directory, exist_ok=True)
  118. invite_state = os.path.join(room_directory, "invite_state")
  119. with open(invite_state, "a") as f:
  120. for event in state.values():
  121. print(json.dumps(event), file=f)
  122. def finished(self):
  123. return self.base_directory
  124. def start(config_options):
  125. parser = argparse.ArgumentParser(description="Synapse Admin Command")
  126. HomeServerConfig.add_arguments_to_parser(parser)
  127. subparser = parser.add_subparsers(
  128. title="Admin Commands",
  129. required=True,
  130. dest="command",
  131. metavar="<admin_command>",
  132. help="The admin command to perform.",
  133. )
  134. export_data_parser = subparser.add_parser(
  135. "export-data", help="Export all data for a user"
  136. )
  137. export_data_parser.add_argument("user_id", help="User to extra data from")
  138. export_data_parser.add_argument(
  139. "--output-directory",
  140. action="store",
  141. metavar="DIRECTORY",
  142. required=False,
  143. help="The directory to store the exported data in. Must be empty. Defaults"
  144. " to creating a temp directory.",
  145. )
  146. export_data_parser.set_defaults(func=export_data_command)
  147. try:
  148. config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
  149. except ConfigError as e:
  150. sys.stderr.write("\n" + str(e) + "\n")
  151. sys.exit(1)
  152. if config.worker_app is not None:
  153. assert config.worker_app == "synapse.app.admin_cmd"
  154. # Update the config with some basic overrides so that don't have to specify
  155. # a full worker config.
  156. config.worker_app = "synapse.app.admin_cmd"
  157. if (
  158. not config.worker_daemonize
  159. and not config.worker_log_file
  160. and not config.worker_log_config
  161. ):
  162. # Since we're meant to be run as a "command" let's not redirect stdio
  163. # unless we've actually set log config.
  164. config.no_redirect_stdio = True
  165. # Explicitly disable background processes
  166. config.update_user_directory = False
  167. config.run_background_tasks = False
  168. config.start_pushers = False
  169. config.pusher_shard_config.instances = []
  170. config.send_federation = False
  171. config.federation_shard_config.instances = []
  172. synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
  173. ss = AdminCmdServer(
  174. config.server_name,
  175. config=config,
  176. version_string="Synapse/" + get_version_string(synapse),
  177. )
  178. setup_logging(ss, config, use_worker_options=True)
  179. ss.setup()
  180. # We use task.react as the basic run command as it correctly handles tearing
  181. # down the reactor when the deferreds resolve and setting the return value.
  182. # We also make sure that `_base.start` gets run before we actually run the
  183. # command.
  184. async def run():
  185. with LoggingContext("command"):
  186. _base.start(ss)
  187. await args.func(ss, args)
  188. _base.start_worker_reactor(
  189. "synapse-admin-cmd",
  190. config,
  191. run_command=lambda: task.react(lambda _reactor: defer.ensureDeferred(run())),
  192. )
  193. if __name__ == "__main__":
  194. with LoggingContext("main"):
  195. start(sys.argv[1:])