workers.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import List, Union
  15. import attr
  16. from ._base import (
  17. Config,
  18. ConfigError,
  19. RoutableShardedWorkerHandlingConfig,
  20. ShardedWorkerHandlingConfig,
  21. )
  22. from .server import ListenerConfig, parse_listener_def
  23. _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
  24. The send_federation config option must be disabled in the main
  25. synapse process before they can be run in a separate worker.
  26. Please add ``send_federation: false`` to the main config
  27. """
  28. _PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
  29. The start_pushers config option must be disabled in the main
  30. synapse process before they can be run in a separate worker.
  31. Please add ``start_pushers: false`` to the main config
  32. """
  33. def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
  34. """Helper for allowing parsing a string or list of strings to a config
  35. option expecting a list of strings.
  36. """
  37. if isinstance(obj, str):
  38. return [obj]
  39. return obj
  40. @attr.s
  41. class InstanceLocationConfig:
  42. """The host and port to talk to an instance via HTTP replication."""
  43. host = attr.ib(type=str)
  44. port = attr.ib(type=int)
  45. @attr.s
  46. class WriterLocations:
  47. """Specifies the instances that write various streams.
  48. Attributes:
  49. events: The instances that write to the event and backfill streams.
  50. typing: The instance that writes to the typing stream.
  51. to_device: The instances that write to the to_device stream. Currently
  52. can only be a single instance.
  53. account_data: The instances that write to the account data streams. Currently
  54. can only be a single instance.
  55. receipts: The instances that write to the receipts stream. Currently
  56. can only be a single instance.
  57. presence: The instances that write to the presence stream. Currently
  58. can only be a single instance.
  59. """
  60. events = attr.ib(
  61. default=["master"], type=List[str], converter=_instance_to_list_converter
  62. )
  63. typing = attr.ib(default="master", type=str)
  64. to_device = attr.ib(
  65. default=["master"],
  66. type=List[str],
  67. converter=_instance_to_list_converter,
  68. )
  69. account_data = attr.ib(
  70. default=["master"],
  71. type=List[str],
  72. converter=_instance_to_list_converter,
  73. )
  74. receipts = attr.ib(
  75. default=["master"],
  76. type=List[str],
  77. converter=_instance_to_list_converter,
  78. )
  79. presence = attr.ib(
  80. default=["master"],
  81. type=List[str],
  82. converter=_instance_to_list_converter,
  83. )
  84. class WorkerConfig(Config):
  85. """The workers are processes run separately to the main synapse process.
  86. They have their own pid_file and listener configuration. They use the
  87. replication_url to talk to the main synapse process."""
  88. section = "worker"
  89. def read_config(self, config, **kwargs):
  90. self.worker_app = config.get("worker_app")
  91. # Canonicalise worker_app so that master always has None
  92. if self.worker_app == "synapse.app.homeserver":
  93. self.worker_app = None
  94. self.worker_listeners = [
  95. parse_listener_def(x) for x in config.get("worker_listeners", [])
  96. ]
  97. self.worker_daemonize = config.get("worker_daemonize")
  98. self.worker_pid_file = config.get("worker_pid_file")
  99. self.worker_log_config = config.get("worker_log_config")
  100. # The host used to connect to the main synapse
  101. self.worker_replication_host = config.get("worker_replication_host", None)
  102. # The port on the main synapse for TCP replication
  103. self.worker_replication_port = config.get("worker_replication_port", None)
  104. # The port on the main synapse for HTTP replication endpoint
  105. self.worker_replication_http_port = config.get("worker_replication_http_port")
  106. # The shared secret used for authentication when connecting to the main synapse.
  107. self.worker_replication_secret = config.get("worker_replication_secret", None)
  108. self.worker_name = config.get("worker_name", self.worker_app)
  109. self.instance_name = self.worker_name or "master"
  110. self.worker_main_http_uri = config.get("worker_main_http_uri", None)
  111. # This option is really only here to support `--manhole` command line
  112. # argument.
  113. manhole = config.get("worker_manhole")
  114. if manhole:
  115. self.worker_listeners.append(
  116. ListenerConfig(
  117. port=manhole,
  118. bind_addresses=["127.0.0.1"],
  119. type="manhole",
  120. )
  121. )
  122. # Handle federation sender configuration.
  123. #
  124. # There are two ways of configuring which instances handle federation
  125. # sending:
  126. # 1. The old way where "send_federation" is set to false and running a
  127. # `synapse.app.federation_sender` worker app.
  128. # 2. Specifying the workers sending federation in
  129. # `federation_sender_instances`.
  130. #
  131. send_federation = config.get("send_federation", True)
  132. federation_sender_instances = config.get("federation_sender_instances")
  133. if federation_sender_instances is None:
  134. # Default to an empty list, which means "another, unknown, worker is
  135. # responsible for it".
  136. federation_sender_instances = []
  137. # If no federation sender instances are set we check if
  138. # `send_federation` is set, which means use master
  139. if send_federation:
  140. federation_sender_instances = ["master"]
  141. if self.worker_app == "synapse.app.federation_sender":
  142. if send_federation:
  143. # If we're running federation senders, and not using
  144. # `federation_sender_instances`, then we should have
  145. # explicitly set `send_federation` to false.
  146. raise ConfigError(
  147. _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
  148. )
  149. federation_sender_instances = [self.worker_name]
  150. self.send_federation = self.instance_name in federation_sender_instances
  151. self.federation_shard_config = ShardedWorkerHandlingConfig(
  152. federation_sender_instances
  153. )
  154. # A map from instance name to host/port of their HTTP replication endpoint.
  155. instance_map = config.get("instance_map") or {}
  156. self.instance_map = {
  157. name: InstanceLocationConfig(**c) for name, c in instance_map.items()
  158. }
  159. # Map from type of streams to source, c.f. WriterLocations.
  160. writers = config.get("stream_writers") or {}
  161. self.writers = WriterLocations(**writers)
  162. # Check that the configured writers for events and typing also appears in
  163. # `instance_map`.
  164. for stream in (
  165. "events",
  166. "typing",
  167. "to_device",
  168. "account_data",
  169. "receipts",
  170. "presence",
  171. ):
  172. instances = _instance_to_list_converter(getattr(self.writers, stream))
  173. for instance in instances:
  174. if instance != "master" and instance not in self.instance_map:
  175. raise ConfigError(
  176. "Instance %r is configured to write %s but does not appear in `instance_map` config."
  177. % (instance, stream)
  178. )
  179. if len(self.writers.to_device) != 1:
  180. raise ConfigError(
  181. "Must only specify one instance to handle `to_device` messages."
  182. )
  183. if len(self.writers.account_data) != 1:
  184. raise ConfigError(
  185. "Must only specify one instance to handle `account_data` messages."
  186. )
  187. if len(self.writers.receipts) != 1:
  188. raise ConfigError(
  189. "Must only specify one instance to handle `receipts` messages."
  190. )
  191. if len(self.writers.events) == 0:
  192. raise ConfigError("Must specify at least one instance to handle `events`.")
  193. if len(self.writers.presence) != 1:
  194. raise ConfigError(
  195. "Must only specify one instance to handle `presence` messages."
  196. )
  197. self.events_shard_config = RoutableShardedWorkerHandlingConfig(
  198. self.writers.events
  199. )
  200. # Handle sharded push
  201. start_pushers = config.get("start_pushers", True)
  202. pusher_instances = config.get("pusher_instances")
  203. if pusher_instances is None:
  204. # Default to an empty list, which means "another, unknown, worker is
  205. # responsible for it".
  206. pusher_instances = []
  207. # If no pushers instances are set we check if `start_pushers` is
  208. # set, which means use master
  209. if start_pushers:
  210. pusher_instances = ["master"]
  211. if self.worker_app == "synapse.app.pusher":
  212. if start_pushers:
  213. # If we're running pushers, and not using
  214. # `pusher_instances`, then we should have explicitly set
  215. # `start_pushers` to false.
  216. raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
  217. pusher_instances = [self.instance_name]
  218. self.start_pushers = self.instance_name in pusher_instances
  219. self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
  220. # Whether this worker should run background tasks or not.
  221. #
  222. # As a note for developers, the background tasks guarded by this should
  223. # be able to run on only a single instance (meaning that they don't
  224. # depend on any in-memory state of a particular worker).
  225. #
  226. # No effort is made to ensure only a single instance of these tasks is
  227. # running.
  228. background_tasks_instance = config.get("run_background_tasks_on") or "master"
  229. self.run_background_tasks = (
  230. self.worker_name is None and background_tasks_instance == "master"
  231. ) or self.worker_name == background_tasks_instance
  232. def generate_config_section(self, config_dir_path, server_name, **kwargs):
  233. return """\
  234. ## Workers ##
  235. # Disables sending of outbound federation transactions on the main process.
  236. # Uncomment if using a federation sender worker.
  237. #
  238. #send_federation: false
  239. # It is possible to run multiple federation sender workers, in which case the
  240. # work is balanced across them.
  241. #
  242. # This configuration must be shared between all federation sender workers, and if
  243. # changed all federation sender workers must be stopped at the same time and then
  244. # started, to ensure that all instances are running with the same config (otherwise
  245. # events may be dropped).
  246. #
  247. #federation_sender_instances:
  248. # - federation_sender1
  249. # When using workers this should be a map from `worker_name` to the
  250. # HTTP replication listener of the worker, if configured.
  251. #
  252. #instance_map:
  253. # worker1:
  254. # host: localhost
  255. # port: 8034
  256. # Experimental: When using workers you can define which workers should
  257. # handle event persistence and typing notifications. Any worker
  258. # specified here must also be in the `instance_map`.
  259. #
  260. #stream_writers:
  261. # events: worker1
  262. # typing: worker1
  263. # The worker that is used to run background tasks (e.g. cleaning up expired
  264. # data). If not provided this defaults to the main process.
  265. #
  266. #run_background_tasks_on: worker1
  267. # A shared secret used by the replication APIs to authenticate HTTP requests
  268. # from workers.
  269. #
  270. # By default this is unused and traffic is not authenticated.
  271. #
  272. #worker_replication_secret: ""
  273. """
  274. def read_arguments(self, args):
  275. # We support a bunch of command line arguments that override options in
  276. # the config. A lot of these options have a worker_* prefix when running
  277. # on workers so we also have to override them when command line options
  278. # are specified.
  279. if args.daemonize is not None:
  280. self.worker_daemonize = args.daemonize
  281. if args.manhole is not None:
  282. self.worker_manhole = args.worker_manhole