123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350 |
- #!/usr/bin/env python
- # Copyright 2014-2016 OpenMarket Ltd
- # Copyright 2018 New Vector Ltd
- #
- # Licensed under the Apache License, Version 2.0 (the "License");
- # you may not use this file except in compliance with the License.
- # You may obtain a copy of the License at
- #
- # http://www.apache.org/licenses/LICENSE-2.0
- #
- # Unless required by applicable law or agreed to in writing, software
- # distributed under the License is distributed on an "AS IS" BASIS,
- # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- # See the License for the specific language governing permissions and
- # limitations under the License.
- import argparse
- import collections
- import errno
- import glob
- import os
- import os.path
- import signal
- import subprocess
- import sys
- import time
- from typing import Iterable
- import yaml
- from synapse.config import find_config_files
- MAIN_PROCESS = "synapse.app.homeserver"
- GREEN = "\x1b[1;32m"
- YELLOW = "\x1b[1;33m"
- RED = "\x1b[1;31m"
- NORMAL = "\x1b[m"
- def pid_running(pid):
- try:
- os.kill(pid, 0)
- return True
- except OSError as err:
- if err.errno == errno.EPERM:
- return True
- return False
- def write(message, colour=NORMAL, stream=sys.stdout):
- # Lets check if we're writing to a TTY before colouring
- should_colour = False
- try:
- should_colour = stream.isatty()
- except AttributeError:
- # Just in case `isatty` isn't defined on everything. The python
- # docs are incredibly vague.
- pass
- if not should_colour:
- stream.write(message + "\n")
- else:
- stream.write(colour + message + NORMAL + "\n")
- def abort(message, colour=RED, stream=sys.stderr):
- write(message, colour, stream)
- sys.exit(1)
- def start(pidfile: str, app: str, config_files: Iterable[str], daemonize: bool) -> bool:
- """Attempts to start a synapse main or worker process.
- Args:
- pidfile: the pidfile we expect the process to create
- app: the python module to run
- config_files: config files to pass to synapse
- daemonize: if True, will include a --daemonize argument to synapse
- Returns:
- True if the process started successfully or was already running
- False if there was an error starting the process
- """
- if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())):
- print(app + " already running")
- return True
- args = [sys.executable, "-m", app]
- for c in config_files:
- args += ["-c", c]
- if daemonize:
- args.append("--daemonize")
- try:
- subprocess.check_call(args)
- write("started %s(%s)" % (app, ",".join(config_files)), colour=GREEN)
- return True
- except subprocess.CalledProcessError as e:
- err = "%s(%s) failed to start (exit code: %d). Check the Synapse logfile" % (
- app,
- ",".join(config_files),
- e.returncode,
- )
- if daemonize:
- err += ", or run synctl with --no-daemonize"
- err += "."
- write(err, colour=RED, stream=sys.stderr)
- return False
- def stop(pidfile: str, app: str) -> bool:
- """Attempts to kill a synapse worker from the pidfile.
- Args:
- pidfile: path to file containing worker's pid
- app: name of the worker's appservice
- Returns:
- True if the process stopped successfully
- False if process was already stopped or an error occured
- """
- if os.path.exists(pidfile):
- pid = int(open(pidfile).read())
- try:
- os.kill(pid, signal.SIGTERM)
- write("stopped %s" % (app,), colour=GREEN)
- return True
- except OSError as err:
- if err.errno == errno.ESRCH:
- write("%s not running" % (app,), colour=YELLOW)
- elif err.errno == errno.EPERM:
- abort("Cannot stop %s: Operation not permitted" % (app,))
- else:
- abort("Cannot stop %s: Unknown error" % (app,))
- return False
- else:
- write(
- "No running worker of %s found (from %s)\nThe process might be managed by another controller (e.g. systemd)"
- % (app, pidfile),
- colour=YELLOW,
- )
- return False
- Worker = collections.namedtuple(
- "Worker", ["app", "configfile", "pidfile", "cache_factor", "cache_factors"]
- )
- def main():
- parser = argparse.ArgumentParser()
- parser.add_argument(
- "action",
- choices=["start", "stop", "restart"],
- help="whether to start, stop or restart the synapse",
- )
- parser.add_argument(
- "configfile",
- nargs="?",
- default="homeserver.yaml",
- help="the homeserver config file. Defaults to homeserver.yaml. May also be"
- " a directory with *.yaml files",
- )
- parser.add_argument(
- "-w", "--worker", metavar="WORKERCONFIG", help="start or stop a single worker"
- )
- parser.add_argument(
- "-a",
- "--all-processes",
- metavar="WORKERCONFIGDIR",
- help="start or stop all the workers in the given directory"
- " and the main synapse process",
- )
- parser.add_argument(
- "--no-daemonize",
- action="store_false",
- dest="daemonize",
- help="Run synapse in the foreground for debugging. "
- "Will work only if the daemonize option is not set in the config.",
- )
- options = parser.parse_args()
- if options.worker and options.all_processes:
- write('Cannot use "--worker" with "--all-processes"', stream=sys.stderr)
- sys.exit(1)
- if not options.daemonize and options.all_processes:
- write('Cannot use "--no-daemonize" with "--all-processes"', stream=sys.stderr)
- sys.exit(1)
- configfile = options.configfile
- if not os.path.exists(configfile):
- write(
- f"Config file {configfile} does not exist.\n"
- f"To generate a config file, run:\n"
- f" {sys.executable} -m {MAIN_PROCESS}"
- f" -c {configfile} --generate-config"
- f" --server-name=<server name> --report-stats=<yes/no>\n",
- stream=sys.stderr,
- )
- sys.exit(1)
- config_files = find_config_files([configfile])
- config = {}
- for config_file in config_files:
- with open(config_file) as file_stream:
- yaml_config = yaml.safe_load(file_stream)
- if yaml_config is not None:
- config.update(yaml_config)
- pidfile = config["pid_file"]
- cache_factor = config.get("synctl_cache_factor")
- start_stop_synapse = True
- if cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor)
- cache_factors = config.get("synctl_cache_factors", {})
- for cache_name, factor in cache_factors.items():
- os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
- worker_configfiles = []
- if options.worker:
- start_stop_synapse = False
- worker_configfile = options.worker
- if not os.path.exists(worker_configfile):
- write(
- "No worker config found at %r" % (worker_configfile,), stream=sys.stderr
- )
- sys.exit(1)
- worker_configfiles.append(worker_configfile)
- if options.all_processes:
- # To start the main synapse with -a you need to add a worker file
- # with worker_app == "synapse.app.homeserver"
- start_stop_synapse = False
- worker_configdir = options.all_processes
- if not os.path.isdir(worker_configdir):
- write(
- "No worker config directory found at %r" % (worker_configdir,),
- stream=sys.stderr,
- )
- sys.exit(1)
- worker_configfiles.extend(
- sorted(glob.glob(os.path.join(worker_configdir, "*.yaml")))
- )
- workers = []
- for worker_configfile in worker_configfiles:
- with open(worker_configfile) as stream:
- worker_config = yaml.safe_load(stream)
- worker_app = worker_config["worker_app"]
- if worker_app == "synapse.app.homeserver":
- # We need to special case all of this to pick up options that may
- # be set in the main config file or in this worker config file.
- worker_pidfile = worker_config.get("pid_file") or pidfile
- worker_cache_factor = (
- worker_config.get("synctl_cache_factor") or cache_factor
- )
- worker_cache_factors = (
- worker_config.get("synctl_cache_factors") or cache_factors
- )
- # The master process doesn't support using worker_* config.
- for key in worker_config:
- if key == "worker_app": # But we allow worker_app
- continue
- assert not key.startswith(
- "worker_"
- ), "Main process cannot use worker_* config"
- else:
- worker_pidfile = worker_config["worker_pid_file"]
- worker_cache_factor = worker_config.get("synctl_cache_factor")
- worker_cache_factors = worker_config.get("synctl_cache_factors", {})
- workers.append(
- Worker(
- worker_app,
- worker_configfile,
- worker_pidfile,
- worker_cache_factor,
- worker_cache_factors,
- )
- )
- action = options.action
- if action == "stop" or action == "restart":
- has_stopped = True
- for worker in workers:
- if not stop(worker.pidfile, worker.app):
- # A worker could not be stopped.
- has_stopped = False
- if start_stop_synapse:
- if not stop(pidfile, MAIN_PROCESS):
- has_stopped = False
- if not has_stopped and action == "stop":
- sys.exit(1)
- # Wait for synapse to actually shutdown before starting it again
- if action == "restart":
- running_pids = []
- if start_stop_synapse and os.path.exists(pidfile):
- running_pids.append(int(open(pidfile).read()))
- for worker in workers:
- if os.path.exists(worker.pidfile):
- running_pids.append(int(open(worker.pidfile).read()))
- if len(running_pids) > 0:
- write("Waiting for process to exit before restarting...")
- for running_pid in running_pids:
- while pid_running(running_pid):
- time.sleep(0.2)
- write("All processes exited; now restarting...")
- if action == "start" or action == "restart":
- error = False
- if start_stop_synapse:
- if not start(pidfile, MAIN_PROCESS, (configfile,), options.daemonize):
- error = True
- for worker in workers:
- env = os.environ.copy()
- if worker.cache_factor:
- os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor)
- for cache_name, factor in worker.cache_factors.items():
- os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor)
- if not start(
- worker.pidfile,
- worker.app,
- (configfile, worker.configfile),
- options.daemonize,
- ):
- error = True
- # Reset env back to the original
- os.environ.clear()
- os.environ.update(env)
- if error:
- exit(1)
- if __name__ == "__main__":
- main()
|