1
0

configure_workers_and_start.py 20 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558
  1. #!/usr/bin/env python
  2. # Copyright 2021 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. # This script reads environment variables and generates a shared Synapse worker,
  16. # nginx and supervisord configs depending on the workers requested.
  17. #
  18. # The environment variables it reads are:
  19. # * SYNAPSE_SERVER_NAME: The desired server_name of the homeserver.
  20. # * SYNAPSE_REPORT_STATS: Whether to report stats.
  21. # * SYNAPSE_WORKER_TYPES: A comma separated list of worker names as specified in WORKER_CONFIG
  22. # below. Leave empty for no workers, or set to '*' for all possible workers.
  23. #
  24. # NOTE: According to Complement's ENTRYPOINT expectations for a homeserver image (as defined
  25. # in the project's README), this script may be run multiple times, and functionality should
  26. # continue to work if so.
  27. import os
  28. import subprocess
  29. import sys
  30. import jinja2
  31. import yaml
  32. MAIN_PROCESS_HTTP_LISTENER_PORT = 8080
  33. WORKERS_CONFIG = {
  34. "pusher": {
  35. "app": "synapse.app.pusher",
  36. "listener_resources": [],
  37. "endpoint_patterns": [],
  38. "shared_extra_conf": {"start_pushers": False},
  39. "worker_extra_conf": "",
  40. },
  41. "user_dir": {
  42. "app": "synapse.app.user_dir",
  43. "listener_resources": ["client"],
  44. "endpoint_patterns": [
  45. "^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$"
  46. ],
  47. "shared_extra_conf": {"update_user_directory": False},
  48. "worker_extra_conf": "",
  49. },
  50. "media_repository": {
  51. "app": "synapse.app.media_repository",
  52. "listener_resources": ["media"],
  53. "endpoint_patterns": [
  54. "^/_matrix/media/",
  55. "^/_synapse/admin/v1/purge_media_cache$",
  56. "^/_synapse/admin/v1/room/.*/media.*$",
  57. "^/_synapse/admin/v1/user/.*/media.*$",
  58. "^/_synapse/admin/v1/media/.*$",
  59. "^/_synapse/admin/v1/quarantine_media/.*$",
  60. ],
  61. "shared_extra_conf": {"enable_media_repo": False},
  62. "worker_extra_conf": "enable_media_repo: true",
  63. },
  64. "appservice": {
  65. "app": "synapse.app.appservice",
  66. "listener_resources": [],
  67. "endpoint_patterns": [],
  68. "shared_extra_conf": {"notify_appservices": False},
  69. "worker_extra_conf": "",
  70. },
  71. "federation_sender": {
  72. "app": "synapse.app.federation_sender",
  73. "listener_resources": [],
  74. "endpoint_patterns": [],
  75. "shared_extra_conf": {"send_federation": False},
  76. "worker_extra_conf": "",
  77. },
  78. "synchrotron": {
  79. "app": "synapse.app.generic_worker",
  80. "listener_resources": ["client"],
  81. "endpoint_patterns": [
  82. "^/_matrix/client/(v2_alpha|r0)/sync$",
  83. "^/_matrix/client/(api/v1|v2_alpha|r0)/events$",
  84. "^/_matrix/client/(api/v1|r0)/initialSync$",
  85. "^/_matrix/client/(api/v1|r0)/rooms/[^/]+/initialSync$",
  86. ],
  87. "shared_extra_conf": {},
  88. "worker_extra_conf": "",
  89. },
  90. "federation_reader": {
  91. "app": "synapse.app.generic_worker",
  92. "listener_resources": ["federation"],
  93. "endpoint_patterns": [
  94. "^/_matrix/federation/(v1|v2)/event/",
  95. "^/_matrix/federation/(v1|v2)/state/",
  96. "^/_matrix/federation/(v1|v2)/state_ids/",
  97. "^/_matrix/federation/(v1|v2)/backfill/",
  98. "^/_matrix/federation/(v1|v2)/get_missing_events/",
  99. "^/_matrix/federation/(v1|v2)/publicRooms",
  100. "^/_matrix/federation/(v1|v2)/query/",
  101. "^/_matrix/federation/(v1|v2)/make_join/",
  102. "^/_matrix/federation/(v1|v2)/make_leave/",
  103. "^/_matrix/federation/(v1|v2)/send_join/",
  104. "^/_matrix/federation/(v1|v2)/send_leave/",
  105. "^/_matrix/federation/(v1|v2)/invite/",
  106. "^/_matrix/federation/(v1|v2)/query_auth/",
  107. "^/_matrix/federation/(v1|v2)/event_auth/",
  108. "^/_matrix/federation/(v1|v2)/exchange_third_party_invite/",
  109. "^/_matrix/federation/(v1|v2)/user/devices/",
  110. "^/_matrix/federation/(v1|v2)/get_groups_publicised$",
  111. "^/_matrix/key/v2/query",
  112. ],
  113. "shared_extra_conf": {},
  114. "worker_extra_conf": "",
  115. },
  116. "federation_inbound": {
  117. "app": "synapse.app.generic_worker",
  118. "listener_resources": ["federation"],
  119. "endpoint_patterns": ["/_matrix/federation/(v1|v2)/send/"],
  120. "shared_extra_conf": {},
  121. "worker_extra_conf": "",
  122. },
  123. "event_persister": {
  124. "app": "synapse.app.generic_worker",
  125. "listener_resources": ["replication"],
  126. "endpoint_patterns": [],
  127. "shared_extra_conf": {},
  128. "worker_extra_conf": "",
  129. },
  130. "background_worker": {
  131. "app": "synapse.app.generic_worker",
  132. "listener_resources": [],
  133. "endpoint_patterns": [],
  134. # This worker cannot be sharded. Therefore there should only ever be one background
  135. # worker, and it should be named background_worker1
  136. "shared_extra_conf": {"run_background_tasks_on": "background_worker1"},
  137. "worker_extra_conf": "",
  138. },
  139. "event_creator": {
  140. "app": "synapse.app.generic_worker",
  141. "listener_resources": ["client"],
  142. "endpoint_patterns": [
  143. "^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/redact",
  144. "^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send",
  145. "^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/(join|invite|leave|ban|unban|kick)$",
  146. "^/_matrix/client/(api/v1|r0|unstable)/join/",
  147. "^/_matrix/client/(api/v1|r0|unstable)/profile/",
  148. ],
  149. "shared_extra_conf": {},
  150. "worker_extra_conf": "",
  151. },
  152. "frontend_proxy": {
  153. "app": "synapse.app.frontend_proxy",
  154. "listener_resources": ["client", "replication"],
  155. "endpoint_patterns": ["^/_matrix/client/(api/v1|r0|unstable)/keys/upload"],
  156. "shared_extra_conf": {},
  157. "worker_extra_conf": (
  158. "worker_main_http_uri: http://127.0.0.1:%d"
  159. % (MAIN_PROCESS_HTTP_LISTENER_PORT,)
  160. ),
  161. },
  162. }
  163. # Templates for sections that may be inserted multiple times in config files
  164. SUPERVISORD_PROCESS_CONFIG_BLOCK = """
  165. [program:synapse_{name}]
  166. command=/usr/local/bin/python -m {app} \
  167. --config-path="{config_path}" \
  168. --config-path=/conf/workers/shared.yaml \
  169. --config-path=/conf/workers/{name}.yaml
  170. autorestart=unexpected
  171. priority=500
  172. exitcodes=0
  173. stdout_logfile=/dev/stdout
  174. stdout_logfile_maxbytes=0
  175. stderr_logfile=/dev/stderr
  176. stderr_logfile_maxbytes=0
  177. """
  178. NGINX_LOCATION_CONFIG_BLOCK = """
  179. location ~* {endpoint} {{
  180. proxy_pass {upstream};
  181. proxy_set_header X-Forwarded-For $remote_addr;
  182. proxy_set_header X-Forwarded-Proto $scheme;
  183. proxy_set_header Host $host;
  184. }}
  185. """
  186. NGINX_UPSTREAM_CONFIG_BLOCK = """
  187. upstream {upstream_worker_type} {{
  188. {body}
  189. }}
  190. """
  191. # Utility functions
  192. def log(txt: str):
  193. """Log something to the stdout.
  194. Args:
  195. txt: The text to log.
  196. """
  197. print(txt)
  198. def error(txt: str):
  199. """Log something and exit with an error code.
  200. Args:
  201. txt: The text to log in error.
  202. """
  203. log(txt)
  204. sys.exit(2)
  205. def convert(src: str, dst: str, **template_vars):
  206. """Generate a file from a template
  207. Args:
  208. src: Path to the input file.
  209. dst: Path to write to.
  210. template_vars: The arguments to replace placeholder variables in the template with.
  211. """
  212. # Read the template file
  213. with open(src) as infile:
  214. template = infile.read()
  215. # Generate a string from the template. We disable autoescape to prevent template
  216. # variables from being escaped.
  217. rendered = jinja2.Template(template, autoescape=False).render(**template_vars)
  218. # Write the generated contents to a file
  219. #
  220. # We use append mode in case the files have already been written to by something else
  221. # (for instance, as part of the instructions in a dockerfile).
  222. with open(dst, "a") as outfile:
  223. # In case the existing file doesn't end with a newline
  224. outfile.write("\n")
  225. outfile.write(rendered)
  226. def add_sharding_to_shared_config(
  227. shared_config: dict,
  228. worker_type: str,
  229. worker_name: str,
  230. worker_port: int,
  231. ) -> None:
  232. """Given a dictionary representing a config file shared across all workers,
  233. append sharded worker information to it for the current worker_type instance.
  234. Args:
  235. shared_config: The config dict that all worker instances share (after being converted to YAML)
  236. worker_type: The type of worker (one of those defined in WORKERS_CONFIG).
  237. worker_name: The name of the worker instance.
  238. worker_port: The HTTP replication port that the worker instance is listening on.
  239. """
  240. # The instance_map config field marks the workers that write to various replication streams
  241. instance_map = shared_config.setdefault("instance_map", {})
  242. # Worker-type specific sharding config
  243. if worker_type == "pusher":
  244. shared_config.setdefault("pusher_instances", []).append(worker_name)
  245. elif worker_type == "federation_sender":
  246. shared_config.setdefault("federation_sender_instances", []).append(worker_name)
  247. elif worker_type == "event_persister":
  248. # Event persisters write to the events stream, so we need to update
  249. # the list of event stream writers
  250. shared_config.setdefault("stream_writers", {}).setdefault("events", []).append(
  251. worker_name
  252. )
  253. # Map of stream writer instance names to host/ports combos
  254. instance_map[worker_name] = {
  255. "host": "localhost",
  256. "port": worker_port,
  257. }
  258. elif worker_type == "media_repository":
  259. # The first configured media worker will run the media background jobs
  260. shared_config.setdefault("media_instance_running_background_jobs", worker_name)
  261. def generate_base_homeserver_config():
  262. """Starts Synapse and generates a basic homeserver config, which will later be
  263. modified for worker support.
  264. Raises: CalledProcessError if calling start.py returned a non-zero exit code.
  265. """
  266. # start.py already does this for us, so just call that.
  267. # note that this script is copied in in the official, monolith dockerfile
  268. os.environ["SYNAPSE_HTTP_PORT"] = str(MAIN_PROCESS_HTTP_LISTENER_PORT)
  269. subprocess.check_output(["/usr/local/bin/python", "/start.py", "migrate_config"])
  270. def generate_worker_files(environ, config_path: str, data_dir: str):
  271. """Read the desired list of workers from environment variables and generate
  272. shared homeserver, nginx and supervisord configs.
  273. Args:
  274. environ: _Environ[str]
  275. config_path: Where to output the generated Synapse main worker config file.
  276. data_dir: The location of the synapse data directory. Where log and
  277. user-facing config files live.
  278. """
  279. # Note that yaml cares about indentation, so care should be taken to insert lines
  280. # into files at the correct indentation below.
  281. # shared_config is the contents of a Synapse config file that will be shared amongst
  282. # the main Synapse process as well as all workers.
  283. # It is intended mainly for disabling functionality when certain workers are spun up,
  284. # and adding a replication listener.
  285. # First read the original config file and extract the listeners block. Then we'll add
  286. # another listener for replication. Later we'll write out the result.
  287. listeners = [
  288. {
  289. "port": 9093,
  290. "bind_address": "127.0.0.1",
  291. "type": "http",
  292. "resources": [{"names": ["replication"]}],
  293. }
  294. ]
  295. with open(config_path) as file_stream:
  296. original_config = yaml.safe_load(file_stream)
  297. original_listeners = original_config.get("listeners")
  298. if original_listeners:
  299. listeners += original_listeners
  300. # The shared homeserver config. The contents of which will be inserted into the
  301. # base shared worker jinja2 template.
  302. #
  303. # This config file will be passed to all workers, included Synapse's main process.
  304. shared_config = {"listeners": listeners}
  305. # The supervisord config. The contents of which will be inserted into the
  306. # base supervisord jinja2 template.
  307. #
  308. # Supervisord will be in charge of running everything, from redis to nginx to Synapse
  309. # and all of its worker processes. Load the config template, which defines a few
  310. # services that are necessary to run.
  311. supervisord_config = ""
  312. # Upstreams for load-balancing purposes. This dict takes the form of a worker type to the
  313. # ports of each worker. For example:
  314. # {
  315. # worker_type: {1234, 1235, ...}}
  316. # }
  317. # and will be used to construct 'upstream' nginx directives.
  318. nginx_upstreams = {}
  319. # A map of: {"endpoint": "upstream"}, where "upstream" is a str representing what will be
  320. # placed after the proxy_pass directive. The main benefit to representing this data as a
  321. # dict over a str is that we can easily deduplicate endpoints across multiple instances
  322. # of the same worker.
  323. #
  324. # An nginx site config that will be amended to depending on the workers that are
  325. # spun up. To be placed in /etc/nginx/conf.d.
  326. nginx_locations = {}
  327. # Read the desired worker configuration from the environment
  328. worker_types = environ.get("SYNAPSE_WORKER_TYPES")
  329. if worker_types is None:
  330. # No workers, just the main process
  331. worker_types = []
  332. else:
  333. # Split type names by comma
  334. worker_types = worker_types.split(",")
  335. # Create the worker configuration directory if it doesn't already exist
  336. os.makedirs("/conf/workers", exist_ok=True)
  337. # Start worker ports from this arbitrary port
  338. worker_port = 18009
  339. # A counter of worker_type -> int. Used for determining the name for a given
  340. # worker type when generating its config file, as each worker's name is just
  341. # worker_type + instance #
  342. worker_type_counter = {}
  343. # For each worker type specified by the user, create config values
  344. for worker_type in worker_types:
  345. worker_type = worker_type.strip()
  346. worker_config = WORKERS_CONFIG.get(worker_type)
  347. if worker_config:
  348. worker_config = worker_config.copy()
  349. else:
  350. log(worker_type + " is an unknown worker type! It will be ignored")
  351. continue
  352. new_worker_count = worker_type_counter.setdefault(worker_type, 0) + 1
  353. worker_type_counter[worker_type] = new_worker_count
  354. # Name workers by their type concatenated with an incrementing number
  355. # e.g. federation_reader1
  356. worker_name = worker_type + str(new_worker_count)
  357. worker_config.update(
  358. {"name": worker_name, "port": worker_port, "config_path": config_path}
  359. )
  360. # Update the shared config with any worker-type specific options
  361. shared_config.update(worker_config["shared_extra_conf"])
  362. # Check if more than one instance of this worker type has been specified
  363. worker_type_total_count = worker_types.count(worker_type)
  364. if worker_type_total_count > 1:
  365. # Update the shared config with sharding-related options if necessary
  366. add_sharding_to_shared_config(
  367. shared_config, worker_type, worker_name, worker_port
  368. )
  369. # Enable the worker in supervisord
  370. supervisord_config += SUPERVISORD_PROCESS_CONFIG_BLOCK.format_map(worker_config)
  371. # Add nginx location blocks for this worker's endpoints (if any are defined)
  372. for pattern in worker_config["endpoint_patterns"]:
  373. # Determine whether we need to load-balance this worker
  374. if worker_type_total_count > 1:
  375. # Create or add to a load-balanced upstream for this worker
  376. nginx_upstreams.setdefault(worker_type, set()).add(worker_port)
  377. # Upstreams are named after the worker_type
  378. upstream = "http://" + worker_type
  379. else:
  380. upstream = "http://localhost:%d" % (worker_port,)
  381. # Note that this endpoint should proxy to this upstream
  382. nginx_locations[pattern] = upstream
  383. # Write out the worker's logging config file
  384. # Check whether we should write worker logs to disk, in addition to the console
  385. extra_log_template_args = {}
  386. if environ.get("SYNAPSE_WORKERS_WRITE_LOGS_TO_DISK"):
  387. extra_log_template_args["LOG_FILE_PATH"] = "{dir}/logs/{name}.log".format(
  388. dir=data_dir, name=worker_name
  389. )
  390. # Render and write the file
  391. log_config_filepath = "/conf/workers/{name}.log.config".format(name=worker_name)
  392. convert(
  393. "/conf/log.config",
  394. log_config_filepath,
  395. worker_name=worker_name,
  396. **extra_log_template_args,
  397. )
  398. # Then a worker config file
  399. convert(
  400. "/conf/worker.yaml.j2",
  401. "/conf/workers/{name}.yaml".format(name=worker_name),
  402. **worker_config,
  403. worker_log_config_filepath=log_config_filepath,
  404. )
  405. worker_port += 1
  406. # Build the nginx location config blocks
  407. nginx_location_config = ""
  408. for endpoint, upstream in nginx_locations.items():
  409. nginx_location_config += NGINX_LOCATION_CONFIG_BLOCK.format(
  410. endpoint=endpoint,
  411. upstream=upstream,
  412. )
  413. # Determine the load-balancing upstreams to configure
  414. nginx_upstream_config = ""
  415. for upstream_worker_type, upstream_worker_ports in nginx_upstreams.items():
  416. body = ""
  417. for port in upstream_worker_ports:
  418. body += " server localhost:%d;\n" % (port,)
  419. # Add to the list of configured upstreams
  420. nginx_upstream_config += NGINX_UPSTREAM_CONFIG_BLOCK.format(
  421. upstream_worker_type=upstream_worker_type,
  422. body=body,
  423. )
  424. # Finally, we'll write out the config files.
  425. # Shared homeserver config
  426. convert(
  427. "/conf/shared.yaml.j2",
  428. "/conf/workers/shared.yaml",
  429. shared_worker_config=yaml.dump(shared_config),
  430. )
  431. # Nginx config
  432. convert(
  433. "/conf/nginx.conf.j2",
  434. "/etc/nginx/conf.d/matrix-synapse.conf",
  435. worker_locations=nginx_location_config,
  436. upstream_directives=nginx_upstream_config,
  437. )
  438. # Supervisord config
  439. convert(
  440. "/conf/supervisord.conf.j2",
  441. "/etc/supervisor/conf.d/supervisord.conf",
  442. main_config_path=config_path,
  443. worker_config=supervisord_config,
  444. )
  445. # Ensure the logging directory exists
  446. log_dir = data_dir + "/logs"
  447. if not os.path.exists(log_dir):
  448. os.mkdir(log_dir)
  449. def start_supervisord():
  450. """Starts up supervisord which then starts and monitors all other necessary processes
  451. Raises: CalledProcessError if calling start.py return a non-zero exit code.
  452. """
  453. subprocess.run(["/usr/bin/supervisord"], stdin=subprocess.PIPE)
  454. def main(args, environ):
  455. config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data")
  456. config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml")
  457. data_dir = environ.get("SYNAPSE_DATA_DIR", "/data")
  458. # override SYNAPSE_NO_TLS, we don't support TLS in worker mode,
  459. # this needs to be handled by a frontend proxy
  460. environ["SYNAPSE_NO_TLS"] = "yes"
  461. # Generate the base homeserver config if one does not yet exist
  462. if not os.path.exists(config_path):
  463. log("Generating base homeserver config")
  464. generate_base_homeserver_config()
  465. # This script may be run multiple times (mostly by Complement, see note at top of file).
  466. # Don't re-configure workers in this instance.
  467. mark_filepath = "/conf/workers_have_been_configured"
  468. if not os.path.exists(mark_filepath):
  469. # Always regenerate all other config files
  470. generate_worker_files(environ, config_path, data_dir)
  471. # Mark workers as being configured
  472. with open(mark_filepath, "w") as f:
  473. f.write("")
  474. # Start supervisord, which will start Synapse, all of the configured worker
  475. # processes, redis, nginx etc. according to the config we created above.
  476. start_supervisord()
  477. if __name__ == "__main__":
  478. main(sys.argv, os.environ)