#!/usr/bin/env python # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import collections import errno import glob import os import os.path import signal import subprocess import sys import time import yaml from synapse.config import find_config_files SYNAPSE = [sys.executable, "-m", "synapse.app.homeserver"] GREEN = "\x1b[1;32m" YELLOW = "\x1b[1;33m" RED = "\x1b[1;31m" NORMAL = "\x1b[m" def pid_running(pid): try: os.kill(pid, 0) return True except OSError as err: if err.errno == errno.EPERM: return True return False def write(message, colour=NORMAL, stream=sys.stdout): # Lets check if we're writing to a TTY before colouring should_colour = False try: should_colour = stream.isatty() except AttributeError: # Just in case `isatty` isn't defined on everything. The python # docs are incredibly vague. pass if not should_colour: stream.write(message + "\n") else: stream.write(colour + message + NORMAL + "\n") def abort(message, colour=RED, stream=sys.stderr): write(message, colour, stream) sys.exit(1) def start(configfile: str, daemonize: bool = True) -> bool: """Attempts to start synapse. Args: configfile: path to a yaml synapse config file daemonize: whether to daemonize synapse or keep it attached to the current session Returns: True if the process started successfully False if there was an error starting the process If deamonize is False it will only return once synapse exits. """ write("Starting ...") args = SYNAPSE if daemonize: args.extend(["--daemonize", "-c", configfile]) else: args.extend(["-c", configfile]) try: subprocess.check_call(args) write("started synapse.app.homeserver(%r)" % (configfile,), colour=GREEN) return True except subprocess.CalledProcessError as e: write( "error starting (exit code: %d); see above for logs" % e.returncode, colour=RED, ) return False def start_worker(app: str, configfile: str, worker_configfile: str) -> bool: """Attempts to start a synapse worker. Args: app: name of the worker's appservice configfile: path to a yaml synapse config file worker_configfile: path to worker specific yaml synapse file Returns: True if the process started successfully False if there was an error starting the process """ args = [ sys.executable, "-m", app, "-c", configfile, "-c", worker_configfile, "--daemonize", ] try: subprocess.check_call(args) write("started %s(%r)" % (app, worker_configfile), colour=GREEN) return True except subprocess.CalledProcessError as e: write( "error starting %s(%r) (exit code: %d); see above for logs" % (app, worker_configfile, e.returncode), colour=RED, ) return False def stop(pidfile: str, app: str) -> bool: """Attempts to kill a synapse worker from the pidfile. Args: pidfile: path to file containing worker's pid app: name of the worker's appservice Returns: True if the process stopped successfully False if process was already stopped or an error occured """ if os.path.exists(pidfile): pid = int(open(pidfile).read()) try: os.kill(pid, signal.SIGTERM) write("stopped %s" % (app,), colour=GREEN) return True except OSError as err: if err.errno == errno.ESRCH: write("%s not running" % (app,), colour=YELLOW) elif err.errno == errno.EPERM: abort("Cannot stop %s: Operation not permitted" % (app,)) else: abort("Cannot stop %s: Unknown error" % (app,)) return False else: write( "No running worker of %s found (from %s)\nThe process might be managed by another controller (e.g. systemd)" % (app, pidfile), colour=YELLOW, ) return False Worker = collections.namedtuple( "Worker", ["app", "configfile", "pidfile", "cache_factor", "cache_factors"] ) def main(): parser = argparse.ArgumentParser() parser.add_argument( "action", choices=["start", "stop", "restart"], help="whether to start, stop or restart the synapse", ) parser.add_argument( "configfile", nargs="?", default="homeserver.yaml", help="the homeserver config file. Defaults to homeserver.yaml. May also be" " a directory with *.yaml files", ) parser.add_argument( "-w", "--worker", metavar="WORKERCONFIG", help="start or stop a single worker" ) parser.add_argument( "-a", "--all-processes", metavar="WORKERCONFIGDIR", help="start or stop all the workers in the given directory" " and the main synapse process", ) parser.add_argument( "--no-daemonize", action="store_false", dest="daemonize", help="Run synapse in the foreground for debugging. " "Will work only if the daemonize option is not set in the config.", ) options = parser.parse_args() if options.worker and options.all_processes: write('Cannot use "--worker" with "--all-processes"', stream=sys.stderr) sys.exit(1) if not options.daemonize and options.all_processes: write('Cannot use "--no-daemonize" with "--all-processes"', stream=sys.stderr) sys.exit(1) configfile = options.configfile if not os.path.exists(configfile): write( "No config file found\n" "To generate a config file, run '%s -c %s --generate-config" " --server-name= --report-stats='\n" % (" ".join(SYNAPSE), options.configfile), stream=sys.stderr, ) sys.exit(1) config_files = find_config_files([configfile]) config = {} for config_file in config_files: with open(config_file) as file_stream: yaml_config = yaml.safe_load(file_stream) if yaml_config is not None: config.update(yaml_config) pidfile = config["pid_file"] cache_factor = config.get("synctl_cache_factor") start_stop_synapse = True if cache_factor: os.environ["SYNAPSE_CACHE_FACTOR"] = str(cache_factor) cache_factors = config.get("synctl_cache_factors", {}) for cache_name, factor in cache_factors.items(): os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) worker_configfiles = [] if options.worker: start_stop_synapse = False worker_configfile = options.worker if not os.path.exists(worker_configfile): write( "No worker config found at %r" % (worker_configfile,), stream=sys.stderr ) sys.exit(1) worker_configfiles.append(worker_configfile) if options.all_processes: # To start the main synapse with -a you need to add a worker file # with worker_app == "synapse.app.homeserver" start_stop_synapse = False worker_configdir = options.all_processes if not os.path.isdir(worker_configdir): write( "No worker config directory found at %r" % (worker_configdir,), stream=sys.stderr, ) sys.exit(1) worker_configfiles.extend( sorted(glob.glob(os.path.join(worker_configdir, "*.yaml"))) ) workers = [] for worker_configfile in worker_configfiles: with open(worker_configfile) as stream: worker_config = yaml.safe_load(stream) worker_app = worker_config["worker_app"] if worker_app == "synapse.app.homeserver": # We need to special case all of this to pick up options that may # be set in the main config file or in this worker config file. worker_pidfile = worker_config.get("pid_file") or pidfile worker_cache_factor = ( worker_config.get("synctl_cache_factor") or cache_factor ) worker_cache_factors = ( worker_config.get("synctl_cache_factors") or cache_factors ) # The master process doesn't support using worker_* config. for key in worker_config: if key == "worker_app": # But we allow worker_app continue assert not key.startswith( "worker_" ), "Main process cannot use worker_* config" else: worker_pidfile = worker_config["worker_pid_file"] worker_cache_factor = worker_config.get("synctl_cache_factor") worker_cache_factors = worker_config.get("synctl_cache_factors", {}) workers.append( Worker( worker_app, worker_configfile, worker_pidfile, worker_cache_factor, worker_cache_factors, ) ) action = options.action if action == "stop" or action == "restart": has_stopped = True for worker in workers: if not stop(worker.pidfile, worker.app): # A worker could not be stopped. has_stopped = False if start_stop_synapse: if not stop(pidfile, "synapse.app.homeserver"): has_stopped = False if not has_stopped and action == "stop": sys.exit(1) # Wait for synapse to actually shutdown before starting it again if action == "restart": running_pids = [] if start_stop_synapse and os.path.exists(pidfile): running_pids.append(int(open(pidfile).read())) for worker in workers: if os.path.exists(worker.pidfile): running_pids.append(int(open(worker.pidfile).read())) if len(running_pids) > 0: write("Waiting for process to exit before restarting...") for running_pid in running_pids: while pid_running(running_pid): time.sleep(0.2) write("All processes exited; now restarting...") if action == "start" or action == "restart": error = False if start_stop_synapse: # Check if synapse is already running if os.path.exists(pidfile) and pid_running(int(open(pidfile).read())): abort("synapse.app.homeserver already running") if not start(configfile, bool(options.daemonize)): error = True for worker in workers: env = os.environ.copy() # Skip starting a worker if its already running if os.path.exists(worker.pidfile) and pid_running( int(open(worker.pidfile).read()) ): print(worker.app + " already running") continue if worker.cache_factor: os.environ["SYNAPSE_CACHE_FACTOR"] = str(worker.cache_factor) for cache_name, factor in worker.cache_factors.items(): os.environ["SYNAPSE_CACHE_FACTOR_" + cache_name.upper()] = str(factor) if not start_worker(worker.app, configfile, worker.configfile): error = True # Reset env back to the original os.environ.clear() os.environ.update(env) if error: exit(1) if __name__ == "__main__": main()