admin_cmd.py 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248
  1. # Copyright 2019 Matrix.org Foundation C.I.C.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. import argparse
  15. import json
  16. import logging
  17. import os
  18. import sys
  19. import tempfile
  20. from twisted.internet import defer, task
  21. import synapse
  22. from synapse.app import _base
  23. from synapse.config._base import ConfigError
  24. from synapse.config.homeserver import HomeServerConfig
  25. from synapse.config.logger import setup_logging
  26. from synapse.handlers.admin import ExfiltrationWriter
  27. from synapse.replication.slave.storage._base import BaseSlavedStore
  28. from synapse.replication.slave.storage.account_data import SlavedAccountDataStore
  29. from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
  30. from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
  31. from synapse.replication.slave.storage.deviceinbox import SlavedDeviceInboxStore
  32. from synapse.replication.slave.storage.devices import SlavedDeviceStore
  33. from synapse.replication.slave.storage.events import SlavedEventStore
  34. from synapse.replication.slave.storage.filtering import SlavedFilteringStore
  35. from synapse.replication.slave.storage.groups import SlavedGroupServerStore
  36. from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
  37. from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
  38. from synapse.replication.slave.storage.registration import SlavedRegistrationStore
  39. from synapse.server import HomeServer
  40. from synapse.storage.databases.main.room import RoomWorkerStore
  41. from synapse.util.logcontext import LoggingContext
  42. from synapse.util.versionstring import get_version_string
  43. logger = logging.getLogger("synapse.app.admin_cmd")
  44. class AdminCmdSlavedStore(
  45. SlavedReceiptsStore,
  46. SlavedAccountDataStore,
  47. SlavedApplicationServiceStore,
  48. SlavedRegistrationStore,
  49. SlavedFilteringStore,
  50. SlavedGroupServerStore,
  51. SlavedDeviceInboxStore,
  52. SlavedDeviceStore,
  53. SlavedPushRuleStore,
  54. SlavedEventStore,
  55. SlavedClientIpStore,
  56. BaseSlavedStore,
  57. RoomWorkerStore,
  58. ):
  59. pass
  60. class AdminCmdServer(HomeServer):
  61. DATASTORE_CLASS = AdminCmdSlavedStore
  62. async def export_data_command(hs: HomeServer, args):
  63. """Export data for a user.
  64. Args:
  65. hs
  66. args (argparse.Namespace)
  67. """
  68. user_id = args.user_id
  69. directory = args.output_directory
  70. res = await hs.get_admin_handler().export_user_data(
  71. user_id, FileExfiltrationWriter(user_id, directory=directory)
  72. )
  73. print(res)
  74. class FileExfiltrationWriter(ExfiltrationWriter):
  75. """An ExfiltrationWriter that writes the users data to a directory.
  76. Returns the directory location on completion.
  77. Note: This writes to disk on the main reactor thread.
  78. Args:
  79. user_id (str): The user whose data is being exfiltrated.
  80. directory (str|None): The directory to write the data to, if None then
  81. will write to a temporary directory.
  82. """
  83. def __init__(self, user_id, directory=None):
  84. self.user_id = user_id
  85. if directory:
  86. self.base_directory = directory
  87. else:
  88. self.base_directory = tempfile.mkdtemp(
  89. prefix="synapse-exfiltrate__%s__" % (user_id,)
  90. )
  91. os.makedirs(self.base_directory, exist_ok=True)
  92. if list(os.listdir(self.base_directory)):
  93. raise Exception("Directory must be empty")
  94. def write_events(self, room_id, events):
  95. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  96. os.makedirs(room_directory, exist_ok=True)
  97. events_file = os.path.join(room_directory, "events")
  98. with open(events_file, "a") as f:
  99. for event in events:
  100. print(json.dumps(event.get_pdu_json()), file=f)
  101. def write_state(self, room_id, event_id, state):
  102. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  103. state_directory = os.path.join(room_directory, "state")
  104. os.makedirs(state_directory, exist_ok=True)
  105. event_file = os.path.join(state_directory, event_id)
  106. with open(event_file, "a") as f:
  107. for event in state.values():
  108. print(json.dumps(event.get_pdu_json()), file=f)
  109. def write_invite(self, room_id, event, state):
  110. self.write_events(room_id, [event])
  111. # We write the invite state somewhere else as they aren't full events
  112. # and are only a subset of the state at the event.
  113. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  114. os.makedirs(room_directory, exist_ok=True)
  115. invite_state = os.path.join(room_directory, "invite_state")
  116. with open(invite_state, "a") as f:
  117. for event in state.values():
  118. print(json.dumps(event), file=f)
  119. def write_knock(self, room_id, event, state):
  120. self.write_events(room_id, [event])
  121. # We write the knock state somewhere else as they aren't full events
  122. # and are only a subset of the state at the event.
  123. room_directory = os.path.join(self.base_directory, "rooms", room_id)
  124. os.makedirs(room_directory, exist_ok=True)
  125. knock_state = os.path.join(room_directory, "knock_state")
  126. with open(knock_state, "a") as f:
  127. for event in state.values():
  128. print(json.dumps(event), file=f)
  129. def finished(self):
  130. return self.base_directory
  131. def start(config_options):
  132. parser = argparse.ArgumentParser(description="Synapse Admin Command")
  133. HomeServerConfig.add_arguments_to_parser(parser)
  134. subparser = parser.add_subparsers(
  135. title="Admin Commands",
  136. required=True,
  137. dest="command",
  138. metavar="<admin_command>",
  139. help="The admin command to perform.",
  140. )
  141. export_data_parser = subparser.add_parser(
  142. "export-data", help="Export all data for a user"
  143. )
  144. export_data_parser.add_argument("user_id", help="User to extra data from")
  145. export_data_parser.add_argument(
  146. "--output-directory",
  147. action="store",
  148. metavar="DIRECTORY",
  149. required=False,
  150. help="The directory to store the exported data in. Must be empty. Defaults"
  151. " to creating a temp directory.",
  152. )
  153. export_data_parser.set_defaults(func=export_data_command)
  154. try:
  155. config, args = HomeServerConfig.load_config_with_parser(parser, config_options)
  156. except ConfigError as e:
  157. sys.stderr.write("\n" + str(e) + "\n")
  158. sys.exit(1)
  159. if config.worker.worker_app is not None:
  160. assert config.worker.worker_app == "synapse.app.admin_cmd"
  161. # Update the config with some basic overrides so that don't have to specify
  162. # a full worker config.
  163. config.worker.worker_app = "synapse.app.admin_cmd"
  164. if not config.worker.worker_daemonize and not config.worker.worker_log_config:
  165. # Since we're meant to be run as a "command" let's not redirect stdio
  166. # unless we've actually set log config.
  167. config.logging.no_redirect_stdio = True
  168. # Explicitly disable background processes
  169. config.server.update_user_directory = False
  170. config.worker.run_background_tasks = False
  171. config.worker.start_pushers = False
  172. config.worker.pusher_shard_config.instances = []
  173. config.worker.send_federation = False
  174. config.worker.federation_shard_config.instances = []
  175. synapse.events.USE_FROZEN_DICTS = config.server.use_frozen_dicts
  176. ss = AdminCmdServer(
  177. config.server.server_name,
  178. config=config,
  179. version_string="Synapse/" + get_version_string(synapse),
  180. )
  181. setup_logging(ss, config, use_worker_options=True)
  182. ss.setup()
  183. # We use task.react as the basic run command as it correctly handles tearing
  184. # down the reactor when the deferreds resolve and setting the return value.
  185. # We also make sure that `_base.start` gets run before we actually run the
  186. # command.
  187. async def run():
  188. with LoggingContext("command"):
  189. await _base.start(ss)
  190. await args.func(ss, args)
  191. _base.start_worker_reactor(
  192. "synapse-admin-cmd",
  193. config,
  194. run_command=lambda: task.react(lambda _reactor: defer.ensureDeferred(run())),
  195. )
  196. if __name__ == "__main__":
  197. with LoggingContext("main"):
  198. start(sys.argv[1:])