workers.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from typing import List, Union
  16. import attr
  17. from ._base import (
  18. Config,
  19. ConfigError,
  20. RoutableShardedWorkerHandlingConfig,
  21. ShardedWorkerHandlingConfig,
  22. )
  23. from .server import ListenerConfig, parse_listener_def
  24. _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
  25. The send_federation config option must be disabled in the main
  26. synapse process before they can be run in a separate worker.
  27. Please add ``send_federation: false`` to the main config
  28. """
  29. _PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
  30. The start_pushers config option must be disabled in the main
  31. synapse process before they can be run in a separate worker.
  32. Please add ``start_pushers: false`` to the main config
  33. """
  34. def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
  35. """Helper for allowing parsing a string or list of strings to a config
  36. option expecting a list of strings.
  37. """
  38. if isinstance(obj, str):
  39. return [obj]
  40. return obj
  41. @attr.s
  42. class InstanceLocationConfig:
  43. """The host and port to talk to an instance via HTTP replication."""
  44. host = attr.ib(type=str)
  45. port = attr.ib(type=int)
  46. @attr.s
  47. class WriterLocations:
  48. """Specifies the instances that write various streams.
  49. Attributes:
  50. events: The instances that write to the event and backfill streams.
  51. typing: The instance that writes to the typing stream.
  52. """
  53. events = attr.ib(
  54. default=["master"], type=List[str], converter=_instance_to_list_converter
  55. )
  56. typing = attr.ib(default="master", type=str)
  57. to_device = attr.ib(
  58. default=["master"],
  59. type=List[str],
  60. converter=_instance_to_list_converter,
  61. )
  62. account_data = attr.ib(
  63. default=["master"],
  64. type=List[str],
  65. converter=_instance_to_list_converter,
  66. )
  67. receipts = attr.ib(
  68. default=["master"],
  69. type=List[str],
  70. converter=_instance_to_list_converter,
  71. )
  72. class WorkerConfig(Config):
  73. """The workers are processes run separately to the main synapse process.
  74. They have their own pid_file and listener configuration. They use the
  75. replication_url to talk to the main synapse process."""
  76. section = "worker"
  77. def read_config(self, config, **kwargs):
  78. self.worker_app = config.get("worker_app")
  79. # Canonicalise worker_app so that master always has None
  80. if self.worker_app == "synapse.app.homeserver":
  81. self.worker_app = None
  82. self.worker_listeners = [
  83. parse_listener_def(x) for x in config.get("worker_listeners", [])
  84. ]
  85. self.worker_daemonize = config.get("worker_daemonize")
  86. self.worker_pid_file = config.get("worker_pid_file")
  87. self.worker_log_config = config.get("worker_log_config")
  88. # The host used to connect to the main synapse
  89. self.worker_replication_host = config.get("worker_replication_host", None)
  90. # The port on the main synapse for TCP replication
  91. self.worker_replication_port = config.get("worker_replication_port", None)
  92. # The port on the main synapse for HTTP replication endpoint
  93. self.worker_replication_http_port = config.get("worker_replication_http_port")
  94. # The shared secret used for authentication when connecting to the main synapse.
  95. self.worker_replication_secret = config.get("worker_replication_secret", None)
  96. self.worker_name = config.get("worker_name", self.worker_app)
  97. self.instance_name = self.worker_name or "master"
  98. self.worker_main_http_uri = config.get("worker_main_http_uri", None)
  99. # This option is really only here to support `--manhole` command line
  100. # argument.
  101. manhole = config.get("worker_manhole")
  102. if manhole:
  103. self.worker_listeners.append(
  104. ListenerConfig(
  105. port=manhole,
  106. bind_addresses=["127.0.0.1"],
  107. type="manhole",
  108. )
  109. )
  110. # Handle federation sender configuration.
  111. #
  112. # There are two ways of configuring which instances handle federation
  113. # sending:
  114. # 1. The old way where "send_federation" is set to false and running a
  115. # `synapse.app.federation_sender` worker app.
  116. # 2. Specifying the workers sending federation in
  117. # `federation_sender_instances`.
  118. #
  119. send_federation = config.get("send_federation", True)
  120. federation_sender_instances = config.get("federation_sender_instances")
  121. if federation_sender_instances is None:
  122. # Default to an empty list, which means "another, unknown, worker is
  123. # responsible for it".
  124. federation_sender_instances = []
  125. # If no federation sender instances are set we check if
  126. # `send_federation` is set, which means use master
  127. if send_federation:
  128. federation_sender_instances = ["master"]
  129. if self.worker_app == "synapse.app.federation_sender":
  130. if send_federation:
  131. # If we're running federation senders, and not using
  132. # `federation_sender_instances`, then we should have
  133. # explicitly set `send_federation` to false.
  134. raise ConfigError(
  135. _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
  136. )
  137. federation_sender_instances = [self.worker_name]
  138. self.send_federation = self.instance_name in federation_sender_instances
  139. self.federation_shard_config = ShardedWorkerHandlingConfig(
  140. federation_sender_instances
  141. )
  142. # A map from instance name to host/port of their HTTP replication endpoint.
  143. instance_map = config.get("instance_map") or {}
  144. self.instance_map = {
  145. name: InstanceLocationConfig(**c) for name, c in instance_map.items()
  146. }
  147. # Map from type of streams to source, c.f. WriterLocations.
  148. writers = config.get("stream_writers") or {}
  149. self.writers = WriterLocations(**writers)
  150. # Check that the configured writers for events and typing also appears in
  151. # `instance_map`.
  152. for stream in ("events", "typing", "to_device", "account_data", "receipts"):
  153. instances = _instance_to_list_converter(getattr(self.writers, stream))
  154. for instance in instances:
  155. if instance != "master" and instance not in self.instance_map:
  156. raise ConfigError(
  157. "Instance %r is configured to write %s but does not appear in `instance_map` config."
  158. % (instance, stream)
  159. )
  160. if len(self.writers.to_device) != 1:
  161. raise ConfigError(
  162. "Must only specify one instance to handle `to_device` messages."
  163. )
  164. if len(self.writers.account_data) != 1:
  165. raise ConfigError(
  166. "Must only specify one instance to handle `account_data` messages."
  167. )
  168. if len(self.writers.receipts) != 1:
  169. raise ConfigError(
  170. "Must only specify one instance to handle `receipts` messages."
  171. )
  172. if len(self.writers.events) == 0:
  173. raise ConfigError("Must specify at least one instance to handle `events`.")
  174. self.events_shard_config = RoutableShardedWorkerHandlingConfig(
  175. self.writers.events
  176. )
  177. # Handle sharded push
  178. start_pushers = config.get("start_pushers", True)
  179. pusher_instances = config.get("pusher_instances")
  180. if pusher_instances is None:
  181. # Default to an empty list, which means "another, unknown, worker is
  182. # responsible for it".
  183. pusher_instances = []
  184. # If no pushers instances are set we check if `start_pushers` is
  185. # set, which means use master
  186. if start_pushers:
  187. pusher_instances = ["master"]
  188. if self.worker_app == "synapse.app.pusher":
  189. if start_pushers:
  190. # If we're running pushers, and not using
  191. # `pusher_instances`, then we should have explicitly set
  192. # `start_pushers` to false.
  193. raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
  194. pusher_instances = [self.instance_name]
  195. self.start_pushers = self.instance_name in pusher_instances
  196. self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
  197. # Whether this worker should run background tasks or not.
  198. #
  199. # As a note for developers, the background tasks guarded by this should
  200. # be able to run on only a single instance (meaning that they don't
  201. # depend on any in-memory state of a particular worker).
  202. #
  203. # No effort is made to ensure only a single instance of these tasks is
  204. # running.
  205. background_tasks_instance = config.get("run_background_tasks_on") or "master"
  206. self.run_background_tasks = (
  207. self.worker_name is None and background_tasks_instance == "master"
  208. ) or self.worker_name == background_tasks_instance
  209. def generate_config_section(self, config_dir_path, server_name, **kwargs):
  210. return """\
  211. ## Workers ##
  212. # Disables sending of outbound federation transactions on the main process.
  213. # Uncomment if using a federation sender worker.
  214. #
  215. #send_federation: false
  216. # It is possible to run multiple federation sender workers, in which case the
  217. # work is balanced across them.
  218. #
  219. # This configuration must be shared between all federation sender workers, and if
  220. # changed all federation sender workers must be stopped at the same time and then
  221. # started, to ensure that all instances are running with the same config (otherwise
  222. # events may be dropped).
  223. #
  224. #federation_sender_instances:
  225. # - federation_sender1
  226. # When using workers this should be a map from `worker_name` to the
  227. # HTTP replication listener of the worker, if configured.
  228. #
  229. #instance_map:
  230. # worker1:
  231. # host: localhost
  232. # port: 8034
  233. # Experimental: When using workers you can define which workers should
  234. # handle event persistence and typing notifications. Any worker
  235. # specified here must also be in the `instance_map`.
  236. #
  237. #stream_writers:
  238. # events: worker1
  239. # typing: worker1
  240. # The worker that is used to run background tasks (e.g. cleaning up expired
  241. # data). If not provided this defaults to the main process.
  242. #
  243. #run_background_tasks_on: worker1
  244. # A shared secret used by the replication APIs to authenticate HTTP requests
  245. # from workers.
  246. #
  247. # By default this is unused and traffic is not authenticated.
  248. #
  249. #worker_replication_secret: ""
  250. """
  251. def read_arguments(self, args):
  252. # We support a bunch of command line arguments that override options in
  253. # the config. A lot of these options have a worker_* prefix when running
  254. # on workers so we also have to override them when command line options
  255. # are specified.
  256. if args.daemonize is not None:
  257. self.worker_daemonize = args.daemonize
  258. if args.manhole is not None:
  259. self.worker_manhole = args.worker_manhole