|
@@ -17,9 +17,28 @@ from typing import List, Union
|
|
|
|
|
|
import attr
|
|
|
|
|
|
-from ._base import Config, ConfigError, ShardedWorkerHandlingConfig
|
|
|
+from ._base import (
|
|
|
+ Config,
|
|
|
+ ConfigError,
|
|
|
+ RoutableShardedWorkerHandlingConfig,
|
|
|
+ ShardedWorkerHandlingConfig,
|
|
|
+)
|
|
|
from .server import ListenerConfig, parse_listener_def
|
|
|
|
|
|
+_FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR = """
|
|
|
+The send_federation config option must be disabled in the main
|
|
|
+synapse process before they can be run in a separate worker.
|
|
|
+
|
|
|
+Please add ``send_federation: false`` to the main config
|
|
|
+"""
|
|
|
+
|
|
|
+_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR = """
|
|
|
+The start_pushers config option must be disabled in the main
|
|
|
+synapse process before they can be run in a separate worker.
|
|
|
+
|
|
|
+Please add ``start_pushers: false`` to the main config
|
|
|
+"""
|
|
|
+
|
|
|
|
|
|
def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]:
|
|
|
"""Helper for allowing parsing a string or list of strings to a config
|
|
@@ -103,6 +122,7 @@ class WorkerConfig(Config):
|
|
|
self.worker_replication_secret = config.get("worker_replication_secret", None)
|
|
|
|
|
|
self.worker_name = config.get("worker_name", self.worker_app)
|
|
|
+ self.instance_name = self.worker_name or "master"
|
|
|
|
|
|
self.worker_main_http_uri = config.get("worker_main_http_uri", None)
|
|
|
|
|
@@ -118,12 +138,41 @@ class WorkerConfig(Config):
|
|
|
)
|
|
|
)
|
|
|
|
|
|
- # Whether to send federation traffic out in this process. This only
|
|
|
- # applies to some federation traffic, and so shouldn't be used to
|
|
|
- # "disable" federation
|
|
|
- self.send_federation = config.get("send_federation", True)
|
|
|
+ # Handle federation sender configuration.
|
|
|
+ #
|
|
|
+ # There are two ways of configuring which instances handle federation
|
|
|
+ # sending:
|
|
|
+ # 1. The old way where "send_federation" is set to false and running a
|
|
|
+ # `synapse.app.federation_sender` worker app.
|
|
|
+ # 2. Specifying the workers sending federation in
|
|
|
+ # `federation_sender_instances`.
|
|
|
+ #
|
|
|
+
|
|
|
+ send_federation = config.get("send_federation", True)
|
|
|
+
|
|
|
+ federation_sender_instances = config.get("federation_sender_instances")
|
|
|
+ if federation_sender_instances is None:
|
|
|
+ # Default to an empty list, which means "another, unknown, worker is
|
|
|
+ # responsible for it".
|
|
|
+ federation_sender_instances = []
|
|
|
|
|
|
- federation_sender_instances = config.get("federation_sender_instances") or []
|
|
|
+ # If no federation sender instances are set we check if
|
|
|
+ # `send_federation` is set, which means use master
|
|
|
+ if send_federation:
|
|
|
+ federation_sender_instances = ["master"]
|
|
|
+
|
|
|
+ if self.worker_app == "synapse.app.federation_sender":
|
|
|
+ if send_federation:
|
|
|
+ # If we're running federation senders, and not using
|
|
|
+ # `federation_sender_instances`, then we should have
|
|
|
+ # explicitly set `send_federation` to false.
|
|
|
+ raise ConfigError(
|
|
|
+ _FEDERATION_SENDER_WITH_SEND_FEDERATION_ENABLED_ERROR
|
|
|
+ )
|
|
|
+
|
|
|
+ federation_sender_instances = [self.worker_name]
|
|
|
+
|
|
|
+ self.send_federation = self.instance_name in federation_sender_instances
|
|
|
self.federation_shard_config = ShardedWorkerHandlingConfig(
|
|
|
federation_sender_instances
|
|
|
)
|
|
@@ -164,7 +213,37 @@ class WorkerConfig(Config):
|
|
|
"Must only specify one instance to handle `receipts` messages."
|
|
|
)
|
|
|
|
|
|
- self.events_shard_config = ShardedWorkerHandlingConfig(self.writers.events)
|
|
|
+ if len(self.writers.events) == 0:
|
|
|
+ raise ConfigError("Must specify at least one instance to handle `events`.")
|
|
|
+
|
|
|
+ self.events_shard_config = RoutableShardedWorkerHandlingConfig(
|
|
|
+ self.writers.events
|
|
|
+ )
|
|
|
+
|
|
|
+ # Handle sharded push
|
|
|
+ start_pushers = config.get("start_pushers", True)
|
|
|
+ pusher_instances = config.get("pusher_instances")
|
|
|
+ if pusher_instances is None:
|
|
|
+ # Default to an empty list, which means "another, unknown, worker is
|
|
|
+ # responsible for it".
|
|
|
+ pusher_instances = []
|
|
|
+
|
|
|
+ # If no pushers instances are set we check if `start_pushers` is
|
|
|
+ # set, which means use master
|
|
|
+ if start_pushers:
|
|
|
+ pusher_instances = ["master"]
|
|
|
+
|
|
|
+ if self.worker_app == "synapse.app.pusher":
|
|
|
+ if start_pushers:
|
|
|
+ # If we're running pushers, and not using
|
|
|
+ # `pusher_instances`, then we should have explicitly set
|
|
|
+ # `start_pushers` to false.
|
|
|
+ raise ConfigError(_PUSHER_WITH_START_PUSHERS_ENABLED_ERROR)
|
|
|
+
|
|
|
+ pusher_instances = [self.instance_name]
|
|
|
+
|
|
|
+ self.start_pushers = self.instance_name in pusher_instances
|
|
|
+ self.pusher_shard_config = ShardedWorkerHandlingConfig(pusher_instances)
|
|
|
|
|
|
# Whether this worker should run background tasks or not.
|
|
|
#
|