workers.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351
  1. # Copyright 2016 OpenMarket Ltd
  2. # Copyright 2021 The Matrix.org Foundation C.I.C.
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import argparse
  16. from typing import List, Union
  17. import attr
  18. from ._base import (
  19. Config,
  20. ConfigError,
  21. RoutableShardedWorkerHandlingConfig,
  22. ShardedWorkerHandlingConfig,
  23. )
  24. from .server import ListenerConfig, parse_listener_def
  25. _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
  26. The send_federation config option must be disabled in the main
  27. synapse process before they can be run in a separate worker.
  28. Please add ``send_federation: false`` to the main config
  29. """
  30. _PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
  31. The start_pushers config option must be disabled in the main
  32. synapse process before they can be run in a separate worker.
  33. Please add ``start_pushers: false`` to the main config
  34. """
  35. def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
  36. """Helper for allowing parsing a string or list of strings to a config
  37. option expecting a list of strings.
  38. """
  39. if isinstance(obj, str):
  40. return [obj]
  41. return obj
  42. @attr.s(auto_attribs=True)
  43. class InstanceLocationConfig:
  44. """The host and port to talk to an instance via HTTP replication."""
  45. host: str
  46. port: int
  47. @attr.s
  48. class WriterLocations:
  49. """Specifies the instances that write various streams.
  50. Attributes:
  51. events: The instances that write to the event and backfill streams.
  52. typing: The instances that write to the typing stream. Currently
  53. can only be a single instance.
  54. to_device: The instances that write to the to_device stream. Currently
  55. can only be a single instance.
  56. account_data: The instances that write to the account data streams. Currently
  57. can only be a single instance.
  58. receipts: The instances that write to the receipts stream. Currently
  59. can only be a single instance.
  60. presence: The instances that write to the presence stream. Currently
  61. can only be a single instance.
  62. """
  63. events: List[str] = attr.ib(
  64. default=["master"],
  65. converter=_instance_to_list_converter,
  66. )
  67. typing: List[str] = attr.ib(
  68. default=["master"],
  69. converter=_instance_to_list_converter,
  70. )
  71. to_device: List[str] = attr.ib(
  72. default=["master"],
  73. converter=_instance_to_list_converter,
  74. )
  75. account_data: List[str] = attr.ib(
  76. default=["master"],
  77. converter=_instance_to_list_converter,
  78. )
  79. receipts: List[str] = attr.ib(
  80. default=["master"],
  81. converter=_instance_to_list_converter,
  82. )
  83. presence: List[str] = attr.ib(
  84. default=["master"],
  85. converter=_instance_to_list_converter,
  86. )
  87. class WorkerConfig(Config):
  88. """The workers are processes run separately to the main synapse process.
  89. They have their own pid_file and listener configuration. They use the
  90. replication_url to talk to the main synapse process."""
  91. section = "worker"
  92. def read_config(self, config, **kwargs):
  93. self.worker_app = config.get("worker_app")
  94. # Canonicalise worker_app so that master always has None
  95. if self.worker_app == "synapse.app.homeserver":
  96. self.worker_app = None
  97. self.worker_listeners = [
  98. parse_listener_def(x) for x in config.get("worker_listeners", [])
  99. ]
  100. self.worker_daemonize = config.get("worker_daemonize")
  101. self.worker_pid_file = config.get("worker_pid_file")
  102. self.worker_log_config = config.get("worker_log_config")
  103. # The host used to connect to the main synapse
  104. self.worker_replication_host = config.get("worker_replication_host", None)
  105. # The port on the main synapse for TCP replication
  106. self.worker_replication_port = config.get("worker_replication_port", None)
  107. # The port on the main synapse for HTTP replication endpoint
  108. self.worker_replication_http_port = config.get("worker_replication_http_port")
  109. # The shared secret used for authentication when connecting to the main synapse.
  110. self.worker_replication_secret = config.get("worker_replication_secret", None)
  111. self.worker_name = config.get("worker_name", self.worker_app)
  112. self.instance_name = self.worker_name or "master"
  113. self.worker_main_http_uri = config.get("worker_main_http_uri", None)
  114. # This option is really only here to support `--manhole` command line
  115. # argument.
  116. manhole = config.get("worker_manhole")
  117. if manhole:
  118. self.worker_listeners.append(
  119. ListenerConfig(
  120. port=manhole,
  121. bind_addresses=["127.0.0.1"],
  122. type="manhole",
  123. )
  124. )
  125. # Handle federation sender configuration.
  126. #
  127. # There are two ways of configuring which instances handle federation
  128. # sending:
  129. # 1. The old way where "send_federation" is set to false and running a
  130. # `synapse.app.federation_sender` worker app.
  131. # 2. Specifying the workers sending federation in
  132. # `federation_sender_instances`.
  133. #
  134. send_federation = config.get("send_federation", True)
  135. federation_sender_instances = config.get("federation_sender_instances")
  136. if federation_sender_instances is None:
  137. # Default to an empty list, which means "another, unknown, worker is
  138. # responsible for it".
  139. federation_sender_instances = []
  140. # If no federation sender instances are set we check if
  141. # `send_federation` is set, which means use master
  142. if send_federation:
  143. federation_sender_instances = ["master"]
  144. if self.worker_app == "synapse.app.federation_sender":
  145. if send_federation:
  146. # If we're running federation senders, and not using
  147. # `federation_sender_instances`, then we should have
  148. # explicitly set `send_federation` to false.
  149. raise ConfigError(
  150. _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
  151. )
  152. federation_sender_instances = [self.worker_name]
  153. self.send_federation = self.instance_name in federation_sender_instances
  154. self.federation_shard_config = ShardedWorkerHandlingConfig(
  155. federation_sender_instances
  156. )
  157. # A map from instance name to host/port of their HTTP replication endpoint.
  158. instance_map = config.get("instance_map") or {}
  159. self.instance_map = {
  160. name: InstanceLocationConfig(**c) for name, c in instance_map.items()
  161. }
  162. # Map from type of streams to source, c.f. WriterLocations.
  163. writers = config.get("stream_writers") or {}
  164. self.writers = WriterLocations(**writers)
  165. # Check that the configured writers for events and typing also appears in
  166. # `instance_map`.
  167. for stream in (
  168. "events",
  169. "typing",
  170. "to_device",
  171. "account_data",
  172. "receipts",
  173. "presence",
  174. ):
  175. instances = _instance_to_list_converter(getattr(self.writers, stream))
  176. for instance in instances:
  177. if instance != "master" and instance not in self.instance_map:
  178. raise ConfigError(
  179. "Instance %r is configured to write %s but does not appear in `instance_map` config."
  180. % (instance, stream)
  181. )
  182. if len(self.writers.typing) != 1:
  183. raise ConfigError(
  184. "Must only specify one instance to handle `typing` messages."
  185. )
  186. if len(self.writers.to_device) != 1:
  187. raise ConfigError(
  188. "Must only specify one instance to handle `to_device` messages."
  189. )
  190. if len(self.writers.account_data) != 1:
  191. raise ConfigError(
  192. "Must only specify one instance to handle `account_data` messages."
  193. )
  194. if len(self.writers.receipts) != 1:
  195. raise ConfigError(
  196. "Must only specify one instance to handle `receipts` messages."
  197. )
  198. if len(self.writers.events) == 0:
  199. raise ConfigError("Must specify at least one instance to handle `events`.")
  200. if len(self.writers.presence) != 1:
  201. raise ConfigError(
  202. "Must only specify one instance to handle `presence` messages."
  203. )
  204. self.events_shard_config = RoutableShardedWorkerHandlingConfig(
  205. self.writers.events
  206. )
  207. # Handle sharded push
  208. start_pushers = config.get("start_pushers", True)
  209. pusher_instances = config.get("pusher_instances")
  210. if pusher_instances is None:
  211. # Default to an empty list, which means "another, unknown, worker is
  212. # responsible for it".
  213. pusher_instances = []
  214. # If no pushers instances are set we check if `start_pushers` is
  215. # set, which means use master
  216. if start_pushers:
  217. pusher_instances = ["master"]
  218. if self.worker_app == "synapse.app.pusher":
  219. if start_pushers:
  220. # If we're running pushers, and not using
  221. # `pusher_instances`, then we should have explicitly set
  222. # `start_pushers` to false.
  223. raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
  224. pusher_instances = [self.instance_name]
  225. self.start_pushers = self.instance_name in pusher_instances
  226. self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
  227. # Whether this worker should run background tasks or not.
  228. #
  229. # As a note for developers, the background tasks guarded by this should
  230. # be able to run on only a single instance (meaning that they don't
  231. # depend on any in-memory state of a particular worker).
  232. #
  233. # No effort is made to ensure only a single instance of these tasks is
  234. # running.
  235. background_tasks_instance = config.get("run_background_tasks_on") or "master"
  236. self.run_background_tasks = (
  237. self.worker_name is None and background_tasks_instance == "master"
  238. ) or self.worker_name == background_tasks_instance
  239. def generate_config_section(self, config_dir_path, server_name, **kwargs):
  240. return """\
  241. ## Workers ##
  242. # Disables sending of outbound federation transactions on the main process.
  243. # Uncomment if using a federation sender worker.
  244. #
  245. #send_federation: false
  246. # It is possible to run multiple federation sender workers, in which case the
  247. # work is balanced across them.
  248. #
  249. # This configuration must be shared between all federation sender workers, and if
  250. # changed all federation sender workers must be stopped at the same time and then
  251. # started, to ensure that all instances are running with the same config (otherwise
  252. # events may be dropped).
  253. #
  254. #federation_sender_instances:
  255. # - federation_sender1
  256. # When using workers this should be a map from `worker_name` to the
  257. # HTTP replication listener of the worker, if configured.
  258. #
  259. #instance_map:
  260. # worker1:
  261. # host: localhost
  262. # port: 8034
  263. # Experimental: When using workers you can define which workers should
  264. # handle event persistence and typing notifications. Any worker
  265. # specified here must also be in the `instance_map`.
  266. #
  267. #stream_writers:
  268. # events: worker1
  269. # typing: worker1
  270. # The worker that is used to run background tasks (e.g. cleaning up expired
  271. # data). If not provided this defaults to the main process.
  272. #
  273. #run_background_tasks_on: worker1
  274. # A shared secret used by the replication APIs to authenticate HTTP requests
  275. # from workers.
  276. #
  277. # By default this is unused and traffic is not authenticated.
  278. #
  279. #worker_replication_secret: ""
  280. """
  281. def read_arguments(self, args: argparse.Namespace) -> None:
  282. # We support a bunch of command line arguments that override options in
  283. # the config. A lot of these options have a worker_* prefix when running
  284. # on workers so we also have to override them when command line options
  285. # are specified.
  286. if args.daemonize is not None:
  287. self.worker_daemonize = args.daemonize
  288. if args.manhole is not None:
  289. self.worker_manhole = args.worker_manhole