workers.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. # Copyright 2016 OpenMarket Ltd
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License");
  4. # you may not use this file except in compliance with the License.
  5. # You may obtain a copy of the License at
  6. #
  7. # http://www.apache.org/licenses/LICENSE-2.0
  8. #
  9. # Unless required by applicable law or agreed to in writing, software
  10. # distributed under the License is distributed on an "AS IS" BASIS,
  11. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  12. # See the License for the specific language governing permissions and
  13. # limitations under the License.
  14. from typing import List, Union
  15. import attr
  16. from ._base import (
  17. Config,
  18. ConfigError,
  19. RoutableShardedWorkerHandlingConfig,
  20. ShardedWorkerHandlingConfig,
  21. )
  22. from .server import ListenerConfig, parse_listener_def
  23. _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
  24. The send_federation config option must be disabled in the main
  25. synapse process before they can be run in a separate worker.
  26. Please add ``send_federation: false`` to the main config
  27. """
  28. _PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
  29. The start_pushers config option must be disabled in the main
  30. synapse process before they can be run in a separate worker.
  31. Please add ``start_pushers: false`` to the main config
  32. """
  33. def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
  34. """Helper for allowing parsing a string or list of strings to a config
  35. option expecting a list of strings.
  36. """
  37. if isinstance(obj, str):
  38. return [obj]
  39. return obj
  40. @attr.s
  41. class InstanceLocationConfig:
  42. """The host and port to talk to an instance via HTTP replication."""
  43. host = attr.ib(type=str)
  44. port = attr.ib(type=int)
  45. @attr.s
  46. class WriterLocations:
  47. """Specifies the instances that write various streams.
  48. Attributes:
  49. events: The instances that write to the event and backfill streams.
  50. typing: The instances that write to the typing stream. Currently
  51. can only be a single instance.
  52. to_device: The instances that write to the to_device stream. Currently
  53. can only be a single instance.
  54. account_data: The instances that write to the account data streams. Currently
  55. can only be a single instance.
  56. receipts: The instances that write to the receipts stream. Currently
  57. can only be a single instance.
  58. presence: The instances that write to the presence stream. Currently
  59. can only be a single instance.
  60. """
  61. events = attr.ib(
  62. default=["master"],
  63. type=List[str],
  64. converter=_instance_to_list_converter,
  65. )
  66. typing = attr.ib(
  67. default=["master"],
  68. type=List[str],
  69. converter=_instance_to_list_converter,
  70. )
  71. to_device = attr.ib(
  72. default=["master"],
  73. type=List[str],
  74. converter=_instance_to_list_converter,
  75. )
  76. account_data = attr.ib(
  77. default=["master"],
  78. type=List[str],
  79. converter=_instance_to_list_converter,
  80. )
  81. receipts = attr.ib(
  82. default=["master"],
  83. type=List[str],
  84. converter=_instance_to_list_converter,
  85. )
  86. presence = attr.ib(
  87. default=["master"],
  88. type=List[str],
  89. converter=_instance_to_list_converter,
  90. )
  91. class WorkerConfig(Config):
  92. """The workers are processes run separately to the main synapse process.
  93. They have their own pid_file and listener configuration. They use the
  94. replication_url to talk to the main synapse process."""
  95. section = "worker"
  96. def read_config(self, config, **kwargs):
  97. self.worker_app = config.get("worker_app")
  98. # Canonicalise worker_app so that master always has None
  99. if self.worker_app == "synapse.app.homeserver":
  100. self.worker_app = None
  101. self.worker_listeners = [
  102. parse_listener_def(x) for x in config.get("worker_listeners", [])
  103. ]
  104. self.worker_daemonize = config.get("worker_daemonize")
  105. self.worker_pid_file = config.get("worker_pid_file")
  106. self.worker_log_config = config.get("worker_log_config")
  107. # The host used to connect to the main synapse
  108. self.worker_replication_host = config.get("worker_replication_host", None)
  109. # The port on the main synapse for TCP replication
  110. self.worker_replication_port = config.get("worker_replication_port", None)
  111. # The port on the main synapse for HTTP replication endpoint
  112. self.worker_replication_http_port = config.get("worker_replication_http_port")
  113. # The shared secret used for authentication when connecting to the main synapse.
  114. self.worker_replication_secret = config.get("worker_replication_secret", None)
  115. self.worker_name = config.get("worker_name", self.worker_app)
  116. self.instance_name = self.worker_name or "master"
  117. self.worker_main_http_uri = config.get("worker_main_http_uri", None)
  118. # This option is really only here to support `--manhole` command line
  119. # argument.
  120. manhole = config.get("worker_manhole")
  121. if manhole:
  122. self.worker_listeners.append(
  123. ListenerConfig(
  124. port=manhole,
  125. bind_addresses=["127.0.0.1"],
  126. type="manhole",
  127. )
  128. )
  129. # Handle federation sender configuration.
  130. #
  131. # There are two ways of configuring which instances handle federation
  132. # sending:
  133. # 1. The old way where "send_federation" is set to false and running a
  134. # `synapse.app.federation_sender` worker app.
  135. # 2. Specifying the workers sending federation in
  136. # `federation_sender_instances`.
  137. #
  138. send_federation = config.get("send_federation", True)
  139. federation_sender_instances = config.get("federation_sender_instances")
  140. if federation_sender_instances is None:
  141. # Default to an empty list, which means "another, unknown, worker is
  142. # responsible for it".
  143. federation_sender_instances = []
  144. # If no federation sender instances are set we check if
  145. # `send_federation` is set, which means use master
  146. if send_federation:
  147. federation_sender_instances = ["master"]
  148. if self.worker_app == "synapse.app.federation_sender":
  149. if send_federation:
  150. # If we're running federation senders, and not using
  151. # `federation_sender_instances`, then we should have
  152. # explicitly set `send_federation` to false.
  153. raise ConfigError(
  154. _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
  155. )
  156. federation_sender_instances = [self.worker_name]
  157. self.send_federation = self.instance_name in federation_sender_instances
  158. self.federation_shard_config = ShardedWorkerHandlingConfig(
  159. federation_sender_instances
  160. )
  161. # A map from instance name to host/port of their HTTP replication endpoint.
  162. instance_map = config.get("instance_map") or {}
  163. self.instance_map = {
  164. name: InstanceLocationConfig(**c) for name, c in instance_map.items()
  165. }
  166. # Map from type of streams to source, c.f. WriterLocations.
  167. writers = config.get("stream_writers") or {}
  168. self.writers = WriterLocations(**writers)
  169. # Check that the configured writers for events and typing also appears in
  170. # `instance_map`.
  171. for stream in (
  172. "events",
  173. "typing",
  174. "to_device",
  175. "account_data",
  176. "receipts",
  177. "presence",
  178. ):
  179. instances = _instance_to_list_converter(getattr(self.writers, stream))
  180. for instance in instances:
  181. if instance != "master" and instance not in self.instance_map:
  182. raise ConfigError(
  183. "Instance %r is configured to write %s but does not appear in `instance_map` config."
  184. % (instance, stream)
  185. )
  186. if len(self.writers.typing) != 1:
  187. raise ConfigError(
  188. "Must only specify one instance to handle `typing` messages."
  189. )
  190. if len(self.writers.to_device) != 1:
  191. raise ConfigError(
  192. "Must only specify one instance to handle `to_device` messages."
  193. )
  194. if len(self.writers.account_data) != 1:
  195. raise ConfigError(
  196. "Must only specify one instance to handle `account_data` messages."
  197. )
  198. if len(self.writers.receipts) != 1:
  199. raise ConfigError(
  200. "Must only specify one instance to handle `receipts` messages."
  201. )
  202. if len(self.writers.events) == 0:
  203. raise ConfigError("Must specify at least one instance to handle `events`.")
  204. if len(self.writers.presence) != 1:
  205. raise ConfigError(
  206. "Must only specify one instance to handle `presence` messages."
  207. )
  208. self.events_shard_config = RoutableShardedWorkerHandlingConfig(
  209. self.writers.events
  210. )
  211. # Handle sharded push
  212. start_pushers = config.get("start_pushers", True)
  213. pusher_instances = config.get("pusher_instances")
  214. if pusher_instances is None:
  215. # Default to an empty list, which means "another, unknown, worker is
  216. # responsible for it".
  217. pusher_instances = []
  218. # If no pushers instances are set we check if `start_pushers` is
  219. # set, which means use master
  220. if start_pushers:
  221. pusher_instances = ["master"]
  222. if self.worker_app == "synapse.app.pusher":
  223. if start_pushers:
  224. # If we're running pushers, and not using
  225. # `pusher_instances`, then we should have explicitly set
  226. # `start_pushers` to false.
  227. raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
  228. pusher_instances = [self.instance_name]
  229. self.start_pushers = self.instance_name in pusher_instances
  230. self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
  231. # Whether this worker should run background tasks or not.
  232. #
  233. # As a note for developers, the background tasks guarded by this should
  234. # be able to run on only a single instance (meaning that they don't
  235. # depend on any in-memory state of a particular worker).
  236. #
  237. # No effort is made to ensure only a single instance of these tasks is
  238. # running.
  239. background_tasks_instance = config.get("run_background_tasks_on") or "master"
  240. self.run_background_tasks = (
  241. self.worker_name is None and background_tasks_instance == "master"
  242. ) or self.worker_name == background_tasks_instance
  243. def generate_config_section(self, config_dir_path, server_name, **kwargs):
  244. return """\
  245. ## Workers ##
  246. # Disables sending of outbound federation transactions on the main process.
  247. # Uncomment if using a federation sender worker.
  248. #
  249. #send_federation: false
  250. # It is possible to run multiple federation sender workers, in which case the
  251. # work is balanced across them.
  252. #
  253. # This configuration must be shared between all federation sender workers, and if
  254. # changed all federation sender workers must be stopped at the same time and then
  255. # started, to ensure that all instances are running with the same config (otherwise
  256. # events may be dropped).
  257. #
  258. #federation_sender_instances:
  259. # - federation_sender1
  260. # When using workers this should be a map from `worker_name` to the
  261. # HTTP replication listener of the worker, if configured.
  262. #
  263. #instance_map:
  264. # worker1:
  265. # host: localhost
  266. # port: 8034
  267. # Experimental: When using workers you can define which workers should
  268. # handle event persistence and typing notifications. Any worker
  269. # specified here must also be in the `instance_map`.
  270. #
  271. #stream_writers:
  272. # events: worker1
  273. # typing: worker1
  274. # The worker that is used to run background tasks (e.g. cleaning up expired
  275. # data). If not provided this defaults to the main process.
  276. #
  277. #run_background_tasks_on: worker1
  278. # A shared secret used by the replication APIs to authenticate HTTP requests
  279. # from workers.
  280. #
  281. # By default this is unused and traffic is not authenticated.
  282. #
  283. #worker_replication_secret: ""
  284. """
  285. def read_arguments(self, args):
  286. # We support a bunch of command line arguments that override options in
  287. # the config. A lot of these options have a worker_* prefix when running
  288. # on workers so we also have to override them when command line options
  289. # are specified.
  290. if args.daemonize is not None:
  291. self.worker_daemonize = args.daemonize
  292. if args.manhole is not None:
  293. self.worker_manhole = args.worker_manhole