Browse Source

Merge pull request #2854 from matrix-org/erikj/event_create_worker

Create a worker for event creation
Erik Johnston 6 years ago
parent
commit
c0c9327fe0

+ 34 - 5
docs/workers.rst

@@ -30,17 +30,29 @@ requests made to the federation port. The caveats regarding running a
 reverse-proxy on the federation port still apply (see
 https://github.com/matrix-org/synapse/blob/master/README.rst#reverse-proxying-the-federation-port).
 
-To enable workers, you need to add a replication listener to the master synapse, e.g.::
+To enable workers, you need to add two replication listeners to the master
+synapse, e.g.::
 
     listeners:
+      # The TCP replication port
       - port: 9092
         bind_address: '127.0.0.1'
         type: replication
+      # The HTTP replication port
+      - port: 9093
+        bind_address: '127.0.0.1'
+        type: http
+        resources:
+         - names: [replication]
 
-Under **no circumstances** should this replication API listener be exposed to the
-public internet; it currently implements no authentication whatsoever and is
+Under **no circumstances** should these replication API listeners be exposed to
+the public internet; it currently implements no authentication whatsoever and is
 unencrypted.
 
+(Roughly, the TCP port is used for streaming data from the master to the
+workers, and the HTTP port for the workers to send data to the main
+synapse process.)
+
 You then create a set of configs for the various worker processes.  These
 should be worker configuration files, and should be stored in a dedicated
 subdirectory, to allow synctl to manipulate them.
@@ -52,8 +64,13 @@ You should minimise the number of overrides though to maintain a usable config.
 
 You must specify the type of worker application (``worker_app``). The currently
 available worker applications are listed below. You must also specify the
-replication endpoint that it's talking to on the main synapse process
-(``worker_replication_host`` and ``worker_replication_port``).
+replication endpoints that it's talking to on the main synapse process.
+``worker_replication_host`` should specify the host of the main synapse,
+``worker_replication_port`` should point to the TCP replication listener port and
+``worker_replication_http_port`` should point to the HTTP replication port.
+
+Currently, only the ``event_creator`` worker requires specifying
+``worker_replication_http_port``.
 
 For instance::
 
@@ -62,6 +79,7 @@ For instance::
     # The replication listener on the synapse to talk to.
     worker_replication_host: 127.0.0.1
     worker_replication_port: 9092
+    worker_replication_http_port: 9093
 
     worker_listeners:
      - type: http
@@ -207,3 +225,14 @@ the ``worker_main_http_uri`` setting in the frontend_proxy worker configuration
 file. For example::
 
     worker_main_http_uri: http://127.0.0.1:8008
+
+
+``synapse.app.event_creator``
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+Handles non-state event creation. It can handle REST endpoints matching:
+
+    ^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/send
+
+It will create events locally and then send them on to the main synapse
+instance to be persisted and handled.

+ 170 - 0
synapse/app/event_creator.py

@@ -0,0 +1,170 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import sys
+
+import synapse
+from synapse import events
+from synapse.app import _base
+from synapse.config._base import ConfigError
+from synapse.config.homeserver import HomeServerConfig
+from synapse.config.logger import setup_logging
+from synapse.crypto import context_factory
+from synapse.http.server import JsonResource
+from synapse.http.site import SynapseSite
+from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
+from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
+from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
+from synapse.replication.slave.storage.devices import SlavedDeviceStore
+from synapse.replication.slave.storage.events import SlavedEventStore
+from synapse.replication.slave.storage.registration import SlavedRegistrationStore
+from synapse.replication.slave.storage.room import RoomStore
+from synapse.replication.tcp.client import ReplicationClientHandler
+from synapse.rest.client.v1.room import RoomSendEventRestServlet
+from synapse.server import HomeServer
+from synapse.storage.engines import create_engine
+from synapse.util.httpresourcetree import create_resource_tree
+from synapse.util.logcontext import LoggingContext
+from synapse.util.manhole import manhole
+from synapse.util.versionstring import get_version_string
+from twisted.internet import reactor
+from twisted.web.resource import Resource
+
+logger = logging.getLogger("synapse.app.event_creator")
+
+
+class EventCreatorSlavedStore(
+    SlavedDeviceStore,
+    SlavedClientIpStore,
+    SlavedApplicationServiceStore,
+    SlavedEventStore,
+    SlavedRegistrationStore,
+    RoomStore,
+    BaseSlavedStore,
+):
+    pass
+
+
+class EventCreatorServer(HomeServer):
+    def setup(self):
+        logger.info("Setting up.")
+        self.datastore = EventCreatorSlavedStore(self.get_db_conn(), self)
+        logger.info("Finished setting up.")
+
+    def _listen_http(self, listener_config):
+        port = listener_config["port"]
+        bind_addresses = listener_config["bind_addresses"]
+        site_tag = listener_config.get("tag", port)
+        resources = {}
+        for res in listener_config["resources"]:
+            for name in res["names"]:
+                if name == "metrics":
+                    resources[METRICS_PREFIX] = MetricsResource(self)
+                elif name == "client":
+                    resource = JsonResource(self, canonical_json=False)
+                    RoomSendEventRestServlet(self).register(resource)
+                    resources.update({
+                        "/_matrix/client/r0": resource,
+                        "/_matrix/client/unstable": resource,
+                        "/_matrix/client/v2_alpha": resource,
+                        "/_matrix/client/api/v1": resource,
+                    })
+
+        root_resource = create_resource_tree(resources, Resource())
+
+        _base.listen_tcp(
+            bind_addresses,
+            port,
+            SynapseSite(
+                "synapse.access.http.%s" % (site_tag,),
+                site_tag,
+                listener_config,
+                root_resource,
+            )
+        )
+
+        logger.info("Synapse event creator now listening on port %d", port)
+
+    def start_listening(self, listeners):
+        for listener in listeners:
+            if listener["type"] == "http":
+                self._listen_http(listener)
+            elif listener["type"] == "manhole":
+                _base.listen_tcp(
+                    listener["bind_addresses"],
+                    listener["port"],
+                    manhole(
+                        username="matrix",
+                        password="rabbithole",
+                        globals={"hs": self},
+                    )
+                )
+            else:
+                logger.warn("Unrecognized listener type: %s", listener["type"])
+
+        self.get_tcp_replication().start_replication(self)
+
+    def build_tcp_replication(self):
+        return ReplicationClientHandler(self.get_datastore())
+
+
+def start(config_options):
+    try:
+        config = HomeServerConfig.load_config(
+            "Synapse event creator", config_options
+        )
+    except ConfigError as e:
+        sys.stderr.write("\n" + e.message + "\n")
+        sys.exit(1)
+
+    assert config.worker_app == "synapse.app.event_creator"
+
+    assert config.worker_replication_http_port is not None
+
+    setup_logging(config, use_worker_options=True)
+
+    events.USE_FROZEN_DICTS = config.use_frozen_dicts
+
+    database_engine = create_engine(config.database_config)
+
+    tls_server_context_factory = context_factory.ServerContextFactory(config)
+
+    ss = EventCreatorServer(
+        config.server_name,
+        db_config=config.database_config,
+        tls_server_context_factory=tls_server_context_factory,
+        config=config,
+        version_string="Synapse/" + get_version_string(synapse),
+        database_engine=database_engine,
+    )
+
+    ss.setup()
+    ss.get_handlers()
+    ss.start_listening(config.worker_listeners)
+
+    def start():
+        ss.get_state_handler().start_caching()
+        ss.get_datastore().start_profiling()
+
+    reactor.callWhenRunning(start)
+
+    _base.start_worker_reactor("synapse-event-creator", config)
+
+
+if __name__ == '__main__':
+    with LoggingContext("main"):
+        start(sys.argv[1:])

+ 4 - 0
synapse/app/homeserver.py

@@ -38,6 +38,7 @@ from synapse.metrics import register_memory_metrics
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.python_dependencies import CONDITIONAL_REQUIREMENTS, \
     check_requirements
+from synapse.replication.http import ReplicationRestResource, REPLICATION_PREFIX
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.rest import ClientRestResource
 from synapse.rest.key.v1.server_key_resource import LocalKey
@@ -219,6 +220,9 @@ class SynapseHomeServer(HomeServer):
         if name == "metrics" and self.get_config().enable_metrics:
             resources[METRICS_PREFIX] = MetricsResource(self)
 
+        if name == "replication":
+            resources[REPLICATION_PREFIX] = ReplicationRestResource(self)
+
         return resources
 
     def start_listening(self):

+ 8 - 0
synapse/config/workers.py

@@ -33,8 +33,16 @@ class WorkerConfig(Config):
         self.worker_pid_file = config.get("worker_pid_file")
         self.worker_log_file = config.get("worker_log_file")
         self.worker_log_config = config.get("worker_log_config")
+
+        # The host used to connect to the main synapse
         self.worker_replication_host = config.get("worker_replication_host", None)
+
+        # The port on the main synapse for TCP replication
         self.worker_replication_port = config.get("worker_replication_port", None)
+
+        # The port on the main synapse for HTTP replication endpoint
+        self.worker_replication_http_port = config.get("worker_replication_http_port")
+
         self.worker_name = config.get("worker_name", self.worker_app)
 
         self.worker_main_http_uri = config.get("worker_main_http_uri", None)

+ 72 - 0
synapse/events/snapshot.py

@@ -14,6 +14,9 @@
 # limitations under the License.
 
 
+from frozendict import frozendict
+
+
 class EventContext(object):
     """
     Attributes:
@@ -73,3 +76,72 @@ class EventContext(object):
         self.prev_state_events = None
 
         self.app_service = None
+
+    def serialize(self):
+        """Converts self to a type that can be serialized as JSON, and then
+        deserialized by `deserialize`
+
+        Returns:
+            dict
+        """
+        return {
+            "current_state_ids": _encode_state_dict(self.current_state_ids),
+            "prev_state_ids": _encode_state_dict(self.prev_state_ids),
+            "state_group": self.state_group,
+            "rejected": self.rejected,
+            "push_actions": self.push_actions,
+            "prev_group": self.prev_group,
+            "delta_ids": _encode_state_dict(self.delta_ids),
+            "prev_state_events": self.prev_state_events,
+            "app_service_id": self.app_service.id if self.app_service else None
+        }
+
+    @staticmethod
+    def deserialize(store, input):
+        """Converts a dict that was produced by `serialize` back into a
+        EventContext.
+
+        Args:
+            store (DataStore): Used to convert AS ID to AS object
+            input (dict): A dict produced by `serialize`
+
+        Returns:
+            EventContext
+        """
+        context = EventContext()
+        context.current_state_ids = _decode_state_dict(input["current_state_ids"])
+        context.prev_state_ids = _decode_state_dict(input["prev_state_ids"])
+        context.state_group = input["state_group"]
+        context.rejected = input["rejected"]
+        context.push_actions = input["push_actions"]
+        context.prev_group = input["prev_group"]
+        context.delta_ids = _decode_state_dict(input["delta_ids"])
+        context.prev_state_events = input["prev_state_events"]
+
+        app_service_id = input["app_service_id"]
+        if app_service_id:
+            context.app_service = store.get_app_service_by_id(app_service_id)
+
+        return context
+
+
+def _encode_state_dict(state_dict):
+    """Since dicts of (type, state_key) -> event_id cannot be serialized in
+    JSON we need to convert them to a form that can.
+    """
+    if state_dict is None:
+        return None
+
+    return [
+        (etype, state_key, v)
+        for (etype, state_key), v in state_dict.iteritems()
+    ]
+
+
+def _decode_state_dict(input):
+    """Decodes a state dict encoded using `_encode_state_dict` above
+    """
+    if input is None:
+        return None
+
+    return frozendict({(etype, state_key,): v for etype, state_key, v in input})

+ 22 - 6
synapse/handlers/message.py

@@ -28,6 +28,7 @@ from synapse.util.logcontext import preserve_fn
 from synapse.util.metrics import measure_func
 from synapse.util.frozenutils import unfreeze
 from synapse.visibility import filter_events_for_client
+from synapse.replication.http.send_event import send_event_to_master
 
 from ._base import BaseHandler
 
@@ -312,6 +313,9 @@ class EventCreationHandler(object):
         self.server_name = hs.hostname
         self.ratelimiter = hs.get_ratelimiter()
         self.notifier = hs.get_notifier()
+        self.config = hs.config
+
+        self.http_client = hs.get_simple_http_client()
 
         # This is only used to get at ratelimit function, and maybe_kick_guest_users
         self.base_handler = BaseHandler(hs)
@@ -419,12 +423,6 @@ class EventCreationHandler(object):
             ratelimit=ratelimit,
         )
 
-        if event.type == EventTypes.Message:
-            presence = self.hs.get_presence_handler()
-            # We don't want to block sending messages on any presence code. This
-            # matters as sometimes presence code can take a while.
-            preserve_fn(presence.bump_presence_active_time)(user)
-
     @defer.inlineCallbacks
     def deduplicate_state_event(self, event, context):
         """
@@ -559,6 +557,18 @@ class EventCreationHandler(object):
     ):
         # We now need to go and hit out to wherever we need to hit out to.
 
+        # If we're a worker we need to hit out to the master.
+        if self.config.worker_app:
+            yield send_event_to_master(
+                self.http_client,
+                host=self.config.worker_replication_host,
+                port=self.config.worker_replication_http_port,
+                requester=requester,
+                event=event,
+                context=context,
+            )
+            return
+
         if ratelimit:
             yield self.base_handler.ratelimit(requester)
 
@@ -692,3 +702,9 @@ class EventCreationHandler(object):
             )
 
         preserve_fn(_notify)()
+
+        if event.type == EventTypes.Message:
+            presence = self.hs.get_presence_handler()
+            # We don't want to block sending messages on any presence code. This
+            # matters as sometimes presence code can take a while.
+            preserve_fn(presence.bump_presence_active_time)(requester.user)

+ 31 - 0
synapse/replication/http/__init__.py

@@ -0,0 +1,31 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import send_event
+
+from synapse.http.server import JsonResource
+
+
+REPLICATION_PREFIX = "/_synapse/replication"
+
+
+class ReplicationRestResource(JsonResource):
+    def __init__(self, hs):
+        JsonResource.__init__(self, hs, canonical_json=False)
+        self.register_servlets(hs)
+
+    def register_servlets(self, hs):
+        send_event.register_servlets(hs, self)

+ 108 - 0
synapse/replication/http/send_event.py

@@ -0,0 +1,108 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext
+from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.util.metrics import Measure
+from synapse.types import Requester
+
+import logging
+import re
+
+logger = logging.getLogger(__name__)
+
+
+def send_event_to_master(client, host, port, requester, event, context):
+    """Send event to be handled on the master
+
+    Args:
+        client (SimpleHttpClient)
+        host (str): host of master
+        port (int): port on master listening for HTTP replication
+        requester (Requester)
+        event (FrozenEvent)
+        context (EventContext)
+    """
+    uri = "http://%s:%s/_synapse/replication/send_event" % (host, port,)
+
+    payload = {
+        "event": event.get_pdu_json(),
+        "internal_metadata": event.internal_metadata.get_dict(),
+        "rejected_reason": event.rejected_reason,
+        "context": context.serialize(),
+        "requester": requester.serialize(),
+    }
+
+    return client.post_json_get_json(uri, payload)
+
+
+class ReplicationSendEventRestServlet(RestServlet):
+    """Handles events newly created on workers, including persisting and
+    notifying.
+
+    The API looks like:
+
+        POST /_synapse/replication/send_event
+
+        {
+            "event": { .. serialized event .. },
+            "internal_metadata": { .. serialized internal_metadata .. },
+            "rejected_reason": ..,   // The event.rejected_reason field
+            "context": { .. serialized event context .. },
+            "requester": { .. serialized requester .. },
+        }
+    """
+    PATTERNS = [re.compile("^/_synapse/replication/send_event$")]
+
+    def __init__(self, hs):
+        super(ReplicationSendEventRestServlet, self).__init__()
+
+        self.event_creation_handler = hs.get_event_creation_handler()
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        with Measure(self.clock, "repl_send_event_parse"):
+            content = parse_json_object_from_request(request)
+
+            event_dict = content["event"]
+            internal_metadata = content["internal_metadata"]
+            rejected_reason = content["rejected_reason"]
+            event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+            requester = Requester.deserialize(self.store, content["requester"])
+            context = EventContext.deserialize(self.store, content["context"])
+
+        if requester.user:
+            request.authenticated_entity = requester.user.to_string()
+
+        logger.info(
+            "Got event to send with ID: %s into room: %s",
+            event.event_id, event.room_id,
+        )
+
+        yield self.event_creation_handler.handle_new_client_event(
+            requester, event, context,
+        )
+
+        defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+    ReplicationSendEventRestServlet(hs).register(http_server)

+ 20 - 0
synapse/replication/slave/storage/events.py

@@ -21,6 +21,7 @@ from synapse.storage.event_push_actions import EventPushActionsStore
 from synapse.storage.roommember import RoomMemberStore
 from synapse.storage.state import StateGroupWorkerStore
 from synapse.storage.stream import StreamStore
+from synapse.storage.signatures import SignatureStore
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from ._base import BaseSlavedStore
 from ._slaved_id_tracker import SlavedIdTracker
@@ -170,6 +171,25 @@ class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):
     get_federation_out_pos = DataStore.get_federation_out_pos.__func__
     update_federation_out_pos = DataStore.update_federation_out_pos.__func__
 
+    get_latest_event_ids_and_hashes_in_room = (
+        DataStore.get_latest_event_ids_and_hashes_in_room.__func__
+    )
+    _get_latest_event_ids_and_hashes_in_room = (
+        DataStore._get_latest_event_ids_and_hashes_in_room.__func__
+    )
+    _get_event_reference_hashes_txn = (
+        DataStore._get_event_reference_hashes_txn.__func__
+    )
+    add_event_hashes = (
+        DataStore.add_event_hashes.__func__
+    )
+    get_event_reference_hashes = (
+        SignatureStore.__dict__["get_event_reference_hashes"]
+    )
+    get_event_reference_hash = (
+        SignatureStore.__dict__["get_event_reference_hash"]
+    )
+
     def stream_positions(self):
         result = super(SlavedEventStore, self).stream_positions()
         result["events"] = self._stream_id_gen.get_current_token()

+ 0 - 1
synapse/rest/client/v1/room.py

@@ -186,7 +186,6 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
 
     def __init__(self, hs):
         super(RoomSendEventRestServlet, self).__init__(hs)
-        self.handlers = hs.get_handlers()
         self.event_creation_hander = hs.get_event_creation_handler()
 
     def register(self, http_server):

+ 13 - 0
synapse/storage/appservice.py

@@ -99,6 +99,19 @@ class ApplicationServiceStore(SQLBaseStore):
                 return service
         return None
 
+    def get_app_service_by_id(self, as_id):
+        """Get the application service with the given appservice ID.
+
+        Args:
+            as_id (str): The application service ID.
+        Returns:
+            synapse.appservice.ApplicationService or None.
+        """
+        for service in self.services_cache:
+            if service.id == as_id:
+                return service
+        return None
+
     def get_app_service_rooms(self, service):
         """Get a list of RoomsForUser for this application service.
 

+ 52 - 13
synapse/types.py

@@ -19,20 +19,59 @@ from synapse.api.errors import SynapseError
 from collections import namedtuple
 
 
-Requester = namedtuple("Requester", [
+class Requester(namedtuple("Requester", [
     "user", "access_token_id", "is_guest", "device_id", "app_service",
-])
-"""
-Represents the user making a request
-
-Attributes:
-    user (UserID):  id of the user making the request
-    access_token_id (int|None):  *ID* of the access token used for this
-        request, or None if it came via the appservice API or similar
-    is_guest (bool):  True if the user making this request is a guest user
-    device_id (str|None):  device_id which was set at authentication time
-    app_service (ApplicationService|None):  the AS requesting on behalf of the user
-"""
+])):
+    """
+    Represents the user making a request
+
+    Attributes:
+        user (UserID):  id of the user making the request
+        access_token_id (int|None):  *ID* of the access token used for this
+            request, or None if it came via the appservice API or similar
+        is_guest (bool):  True if the user making this request is a guest user
+        device_id (str|None):  device_id which was set at authentication time
+        app_service (ApplicationService|None):  the AS requesting on behalf of the user
+    """
+
+    def serialize(self):
+        """Converts self to a type that can be serialized as JSON, and then
+        deserialized by `deserialize`
+
+        Returns:
+            dict
+        """
+        return {
+            "user_id": self.user.to_string(),
+            "access_token_id": self.access_token_id,
+            "is_guest": self.is_guest,
+            "device_id": self.device_id,
+            "app_server_id": self.app_service.id if self.app_service else None,
+        }
+
+    @staticmethod
+    def deserialize(store, input):
+        """Converts a dict that was produced by `serialize` back into a
+        Requester.
+
+        Args:
+            store (DataStore): Used to convert AS ID to AS object
+            input (dict): A dict produced by `serialize`
+
+        Returns:
+            Requester
+        """
+        appservice = None
+        if input["app_server_id"]:
+            appservice = store.get_app_service_by_id(input["app_server_id"])
+
+        return Requester(
+            user=UserID.from_string(input["user_id"]),
+            access_token_id=input["access_token_id"],
+            is_guest=input["is_guest"],
+            device_id=input["device_id"],
+            app_service=appservice,
+        )
 
 
 def create_requester(user_id, access_token_id=None, is_guest=False,