Browse Source

Merge branch 'develop' of github.com:matrix-org/synapse into neilj/update_limits_error_codes

Neil Johnson 5 years ago
parent
commit
4d1a8718b5

+ 1 - 0
changelog.d/3653.feature

@@ -0,0 +1 @@
+Support more federation endpoints on workers

+ 1 - 0
changelog.d/3661.bugfix

@@ -0,0 +1 @@
+Fix bug on deleting 3pid when using identity servers that don't support unbind API

+ 1 - 1
changelog.d/3687.feature

@@ -1 +1 @@
-set admin email via config, to be used in error messages where the user should contact the administrator
+set admin uri via config, to be used in error messages where the user should contact the administrator

+ 1 - 0
changelog.d/3690.misc

@@ -0,0 +1 @@
+Rename MAU prometheus metrics

+ 0 - 1
changelog.d/3697.misc

@@ -1 +0,0 @@
-update resource limit error codes

+ 13 - 0
docs/workers.rst

@@ -173,10 +173,23 @@ endpoints matching the following regular expressions::
     ^/_matrix/federation/v1/backfill/
     ^/_matrix/federation/v1/get_missing_events/
     ^/_matrix/federation/v1/publicRooms
+    ^/_matrix/federation/v1/query/
+    ^/_matrix/federation/v1/make_join/
+    ^/_matrix/federation/v1/make_leave/
+    ^/_matrix/federation/v1/send_join/
+    ^/_matrix/federation/v1/send_leave/
+    ^/_matrix/federation/v1/invite/
+    ^/_matrix/federation/v1/query_auth/
+    ^/_matrix/federation/v1/event_auth/
+    ^/_matrix/federation/v1/exchange_third_party_invite/
+    ^/_matrix/federation/v1/send/
 
 The above endpoints should all be routed to the federation_reader worker by the
 reverse-proxy configuration.
 
+The `^/_matrix/federation/v1/send/` endpoint must only be handled by a single
+instance.
+
 ``synapse.app.federation_sender``
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

+ 3 - 7
synapse/api/errors.py

@@ -224,13 +224,9 @@ class NotFoundError(SynapseError):
 
 class AuthError(SynapseError):
     """An error raised when there was a problem authorising an event."""
-    def __init__(self, *args, **kwargs):
-        if "errcode" not in kwargs:
-            kwargs["errcode"] = Codes.FORBIDDEN
-        self.admin_uri = kwargs.get('admin_uri')
-        self.msg = kwargs.get('msg')
-        self.errcode = kwargs.get('errcode')
-        super(AuthError, self).__init__(*args, errcode=kwargs["errcode"])
+    def __init__(self, code, msg, errcode=Codes.FORBIDDEN, admin_uri=None):
+        self.admin_uri = admin_uri
+        super(AuthError, self).__init__(code, msg, errcode=errcode)
 
     def error_dict(self):
         return cs_error(

+ 2 - 2
synapse/app/client_reader.py

@@ -39,7 +39,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import (
     JoinedRoomMemberListRestServlet,
@@ -66,7 +66,7 @@ class ClientReaderSlavedStore(
     DirectoryStore,
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
-    TransactionStore,
+    SlavedTransactionStore,
     SlavedClientIpStore,
     BaseSlavedStore,
 ):

+ 2 - 2
synapse/app/event_creator.py

@@ -43,7 +43,7 @@ from synapse.replication.slave.storage.pushers import SlavedPusherStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.client.v1.room import (
     JoinRoomAliasServlet,
@@ -63,7 +63,7 @@ logger = logging.getLogger("synapse.app.event_creator")
 
 class EventCreatorSlavedStore(
     DirectoryStore,
-    TransactionStore,
+    SlavedTransactionStore,
     SlavedProfileStore,
     SlavedAccountDataStore,
     SlavedPusherStore,

+ 12 - 2
synapse/app/federation_reader.py

@@ -32,11 +32,16 @@ from synapse.http.site import SynapseSite
 from synapse.metrics import RegistryProxy
 from synapse.metrics.resource import METRICS_PREFIX, MetricsResource
 from synapse.replication.slave.storage._base import BaseSlavedStore
+from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.directory import DirectoryStore
 from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
+from synapse.replication.slave.storage.profile import SlavedProfileStore
+from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
+from synapse.replication.slave.storage.pushers import SlavedPusherStore
+from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.room import RoomStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -49,11 +54,16 @@ logger = logging.getLogger("synapse.app.federation_reader")
 
 
 class FederationReaderSlavedStore(
+    SlavedProfileStore,
+    SlavedApplicationServiceStore,
+    SlavedPusherStore,
+    SlavedPushRuleStore,
+    SlavedReceiptsStore,
     SlavedEventStore,
     SlavedKeyStore,
     RoomStore,
     DirectoryStore,
-    TransactionStore,
+    SlavedTransactionStore,
     BaseSlavedStore,
 ):
     pass

+ 2 - 2
synapse/app/federation_sender.py

@@ -36,7 +36,7 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.server import HomeServer
 from synapse.storage.engines import create_engine
@@ -50,7 +50,7 @@ logger = logging.getLogger("synapse.app.federation_sender")
 
 
 class FederationSenderSlaveStore(
-    SlavedDeviceInboxStore, TransactionStore, SlavedReceiptsStore, SlavedEventStore,
+    SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
     SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
 ):
     def __init__(self, db_conn, hs):

+ 3 - 3
synapse/app/homeserver.py

@@ -303,8 +303,8 @@ class SynapseHomeServer(HomeServer):
 
 
 # Gauges to expose monthly active user control metrics
-current_mau_gauge = Gauge("synapse_admin_current_mau", "Current MAU")
-max_mau_value_gauge = Gauge("synapse_admin_max_mau_value", "MAU Limit")
+current_mau_gauge = Gauge("synapse_admin_mau:current", "Current MAU")
+max_mau_gauge = Gauge("synapse_admin_mau:max", "MAU Limit")
 
 
 def setup(config_options):
@@ -532,7 +532,7 @@ def run(hs):
         if hs.config.limit_usage_by_mau:
             count = yield hs.get_datastore().get_monthly_active_count()
         current_mau_gauge.set(float(count))
-        max_mau_value_gauge.set(float(hs.config.max_mau_value))
+        max_mau_gauge.set(float(hs.config.max_mau_value))
 
     hs.get_datastore().initialise_reserved_users(
         hs.config.mau_limits_reserved_threepids

+ 2 - 2
synapse/app/media_repository.py

@@ -34,7 +34,7 @@ from synapse.replication.slave.storage._base import BaseSlavedStore
 from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore
 from synapse.replication.slave.storage.client_ips import SlavedClientIpStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
-from synapse.replication.slave.storage.transactions import TransactionStore
+from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
 from synapse.rest.media.v0.content_repository import ContentRepoResource
 from synapse.server import HomeServer
@@ -52,7 +52,7 @@ class MediaRepositorySlavedStore(
     SlavedApplicationServiceStore,
     SlavedRegistrationStore,
     SlavedClientIpStore,
-    TransactionStore,
+    SlavedTransactionStore,
     BaseSlavedStore,
     MediaRepositoryStore,
 ):

+ 1 - 1
synapse/config/server.py

@@ -82,7 +82,7 @@ class ServerConfig(Config):
         self.hs_disabled = config.get("hs_disabled", False)
         self.hs_disabled_message = config.get("hs_disabled_message", "")
 
-        # Admin email to direct users at should their instance become blocked
+        # Admin uri to direct users at should their instance become blocked
         # due to resource constraints
         self.admin_uri = config.get("admin_uri", None)
 

+ 54 - 0
synapse/federation/federation_server.py

@@ -39,6 +39,10 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 from synapse.http.endpoint import parse_server_name
+from synapse.replication.http.federation import (
+    ReplicationFederationSendEduRestServlet,
+    ReplicationGetQueryRestServlet,
+)
 from synapse.types import get_domain_from_id
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
@@ -760,6 +764,8 @@ class FederationHandlerRegistry(object):
         if edu_type in self.edu_handlers:
             raise KeyError("Already have an EDU handler for %s" % (edu_type,))
 
+        logger.info("Registering federation EDU handler for %r", edu_type)
+
         self.edu_handlers[edu_type] = handler
 
     def register_query_handler(self, query_type, handler):
@@ -778,6 +784,8 @@ class FederationHandlerRegistry(object):
                 "Already have a Query handler for %s" % (query_type,)
             )
 
+        logger.info("Registering federation query handler for %r", query_type)
+
         self.query_handlers[query_type] = handler
 
     @defer.inlineCallbacks
@@ -800,3 +808,49 @@ class FederationHandlerRegistry(object):
             raise NotFoundError("No handler for Query type '%s'" % (query_type,))
 
         return handler(args)
+
+
+class ReplicationFederationHandlerRegistry(FederationHandlerRegistry):
+    """A FederationHandlerRegistry for worker processes.
+
+    When receiving EDU or queries it will check if an appropriate handler has
+    been registered on the worker, if there isn't one then it calls off to the
+    master process.
+    """
+
+    def __init__(self, hs):
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+        self.clock = hs.get_clock()
+
+        self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
+        self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
+
+        super(ReplicationFederationHandlerRegistry, self).__init__()
+
+    def on_edu(self, edu_type, origin, content):
+        """Overrides FederationHandlerRegistry
+        """
+        handler = self.edu_handlers.get(edu_type)
+        if handler:
+            return super(ReplicationFederationHandlerRegistry, self).on_edu(
+                edu_type, origin, content,
+            )
+
+        return self._send_edu(
+                edu_type=edu_type,
+                origin=origin,
+                content=content,
+        )
+
+    def on_query(self, query_type, args):
+        """Overrides FederationHandlerRegistry
+        """
+        handler = self.query_handlers.get(query_type)
+        if handler:
+            return handler(args)
+
+        return self._get_query_client(
+                query_type=query_type,
+                args=args,
+        )

+ 17 - 3
synapse/handlers/auth.py

@@ -828,12 +828,26 @@ class AuthHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def delete_threepid(self, user_id, medium, address):
+        """Attempts to unbind the 3pid on the identity servers and deletes it
+        from the local database.
+
+        Args:
+            user_id (str)
+            medium (str)
+            address (str)
+
+        Returns:
+            Deferred[bool]: Returns True if successfully unbound the 3pid on
+            the identity server, False if identity server doesn't support the
+            unbind API.
+        """
+
         # 'Canonicalise' email addresses as per above
         if medium == 'email':
             address = address.lower()
 
         identity_handler = self.hs.get_handlers().identity_handler
-        yield identity_handler.unbind_threepid(
+        result = yield identity_handler.try_unbind_threepid(
             user_id,
             {
                 'medium': medium,
@@ -841,10 +855,10 @@ class AuthHandler(BaseHandler):
             },
         )
 
-        ret = yield self.store.user_delete_threepid(
+        yield self.store.user_delete_threepid(
             user_id, medium, address,
         )
-        defer.returnValue(ret)
+        defer.returnValue(result)
 
     def _save_session(self, session):
         # TODO: Persistent storage

+ 11 - 2
synapse/handlers/deactivate_account.py

@@ -51,7 +51,8 @@ class DeactivateAccountHandler(BaseHandler):
             erase_data (bool): whether to GDPR-erase the user's data
 
         Returns:
-            Deferred
+            Deferred[bool]: True if identity server supports removing
+            threepids, otherwise False.
         """
         # FIXME: Theoretically there is a race here wherein user resets
         # password using threepid.
@@ -60,16 +61,22 @@ class DeactivateAccountHandler(BaseHandler):
         # leave the user still active so they can try again.
         # Ideally we would prevent password resets and then do this in the
         # background thread.
+
+        # This will be set to false if the identity server doesn't support
+        # unbinding
+        identity_server_supports_unbinding = True
+
         threepids = yield self.store.user_get_threepids(user_id)
         for threepid in threepids:
             try:
-                yield self._identity_handler.unbind_threepid(
+                result = yield self._identity_handler.try_unbind_threepid(
                     user_id,
                     {
                         'medium': threepid['medium'],
                         'address': threepid['address'],
                     },
                 )
+                identity_server_supports_unbinding &= result
             except Exception:
                 # Do we want this to be a fatal error or should we carry on?
                 logger.exception("Failed to remove threepid from ID server")
@@ -103,6 +110,8 @@ class DeactivateAccountHandler(BaseHandler):
         # parts users from rooms (if it isn't already running)
         self._start_user_parting()
 
+        defer.returnValue(identity_server_supports_unbinding)
+
     def _start_user_parting(self):
         """
         Start the process that goes through the table of users

+ 57 - 17
synapse/handlers/federation.py

@@ -49,6 +49,11 @@ from synapse.crypto.event_signing import (
     compute_event_signature,
 )
 from synapse.events.validator import EventValidator
+from synapse.replication.http.federation import (
+    ReplicationCleanRoomRestServlet,
+    ReplicationFederationSendEventsRestServlet,
+)
+from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet
 from synapse.state import resolve_events_with_factory
 from synapse.types import UserID, get_domain_from_id
 from synapse.util import logcontext, unwrapFirstError
@@ -91,6 +96,18 @@ class FederationHandler(BaseHandler):
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self._server_notices_mxid = hs.config.server_notices_mxid
+        self.config = hs.config
+        self.http_client = hs.get_simple_http_client()
+
+        self._send_events_to_master = (
+            ReplicationFederationSendEventsRestServlet.make_client(hs)
+        )
+        self._notify_user_membership_change = (
+            ReplicationUserJoinedLeftRoomRestServlet.make_client(hs)
+        )
+        self._clean_room_for_join_client = (
+            ReplicationCleanRoomRestServlet.make_client(hs)
+        )
 
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
@@ -1158,7 +1175,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = yield self.state_handler.compute_event_context(event)
-        yield self._persist_events([(event, context)])
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
@@ -1189,7 +1206,7 @@ class FederationHandler(BaseHandler):
         )
 
         context = yield self.state_handler.compute_event_context(event)
-        yield self._persist_events([(event, context)])
+        yield self.persist_events_and_notify([(event, context)])
 
         defer.returnValue(event)
 
@@ -1432,7 +1449,7 @@ class FederationHandler(BaseHandler):
                     event, context
                 )
 
-            yield self._persist_events(
+            yield self.persist_events_and_notify(
                 [(event, context)],
                 backfilled=backfilled,
             )
@@ -1470,7 +1487,7 @@ class FederationHandler(BaseHandler):
             ], consumeErrors=True,
         ))
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [
                 (ev_info["event"], context)
                 for ev_info, context in zip(event_infos, contexts)
@@ -1558,7 +1575,7 @@ class FederationHandler(BaseHandler):
                     raise
                 events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [
                 (e, events_to_context[e.event_id])
                 for e in itertools.chain(auth_events, state)
@@ -1569,7 +1586,7 @@ class FederationHandler(BaseHandler):
             event, old_state=state
         )
 
-        yield self._persist_events(
+        yield self.persist_events_and_notify(
             [(event, new_event_context)],
         )
 
@@ -2297,7 +2314,7 @@ class FederationHandler(BaseHandler):
                 for revocation.
         """
         try:
-            response = yield self.hs.get_simple_http_client().get_json(
+            response = yield self.http_client.get_json(
                 url,
                 {"public_key": public_key}
             )
@@ -2310,7 +2327,7 @@ class FederationHandler(BaseHandler):
             raise AuthError(403, "Third party certificate was invalid")
 
     @defer.inlineCallbacks
-    def _persist_events(self, event_and_contexts, backfilled=False):
+    def persist_events_and_notify(self, event_and_contexts, backfilled=False):
         """Persists events and tells the notifier/pushers about them, if
         necessary.
 
@@ -2322,14 +2339,21 @@ class FederationHandler(BaseHandler):
         Returns:
             Deferred
         """
-        max_stream_id = yield self.store.persist_events(
-            event_and_contexts,
-            backfilled=backfilled,
-        )
+        if self.config.worker_app:
+            yield self._send_events_to_master(
+                store=self.store,
+                event_and_contexts=event_and_contexts,
+                backfilled=backfilled
+            )
+        else:
+            max_stream_id = yield self.store.persist_events(
+                event_and_contexts,
+                backfilled=backfilled,
+            )
 
-        if not backfilled:  # Never notify for backfilled events
-            for event, _ in event_and_contexts:
-                self._notify_persisted_event(event, max_stream_id)
+            if not backfilled:  # Never notify for backfilled events
+                for event, _ in event_and_contexts:
+                    self._notify_persisted_event(event, max_stream_id)
 
     def _notify_persisted_event(self, event, max_stream_id):
         """Checks to see if notifier/pushers should be notified about the
@@ -2368,9 +2392,25 @@ class FederationHandler(BaseHandler):
         )
 
     def _clean_room_for_join(self, room_id):
-        return self.store.clean_room_for_join(room_id)
+        """Called to clean up any data in DB for a given room, ready for the
+        server to join the room.
+
+        Args:
+            room_id (str)
+        """
+        if self.config.worker_app:
+            return self._clean_room_for_join_client(room_id)
+        else:
+            return self.store.clean_room_for_join(room_id)
 
     def user_joined_room(self, user, room_id):
         """Called when a new user has joined the room
         """
-        return user_joined_room(self.distributor, user, room_id)
+        if self.config.worker_app:
+            return self._notify_user_membership_change(
+                room_id=room_id,
+                user_id=user.to_string(),
+                change="joined",
+            )
+        else:
+            return user_joined_room(self.distributor, user, room_id)

+ 23 - 9
synapse/handlers/identity.py

@@ -137,15 +137,19 @@ class IdentityHandler(BaseHandler):
         defer.returnValue(data)
 
     @defer.inlineCallbacks
-    def unbind_threepid(self, mxid, threepid):
-        """
-        Removes a binding from an identity server
+    def try_unbind_threepid(self, mxid, threepid):
+        """Removes a binding from an identity server
+
         Args:
             mxid (str): Matrix user ID of binding to be removed
             threepid (dict): Dict with medium & address of binding to be removed
 
+        Raises:
+            SynapseError: If we failed to contact the identity server
+
         Returns:
-            Deferred[bool]: True on success, otherwise False
+            Deferred[bool]: True on success, otherwise False if the identity
+            server doesn't support unbinding
         """
         logger.debug("unbinding threepid %r from %s", threepid, mxid)
         if not self.trusted_id_servers:
@@ -175,11 +179,21 @@ class IdentityHandler(BaseHandler):
             content=content,
             destination_is=id_server,
         )
-        yield self.http_client.post_json_get_json(
-            url,
-            content,
-            headers,
-        )
+        try:
+            yield self.http_client.post_json_get_json(
+                url,
+                content,
+                headers,
+            )
+        except HttpResponseException as e:
+            if e.code in (400, 404, 501,):
+                # The remote server probably doesn't support unbinding (yet)
+                logger.warn("Received %d response while unbinding threepid", e.code)
+                defer.returnValue(False)
+            else:
+                logger.error("Failed to unbind threepid on identity server: %s", e)
+                raise SynapseError(502, "Failed to contact identity server")
+
         defer.returnValue(True)
 
     @defer.inlineCallbacks

+ 2 - 1
synapse/replication/http/__init__.py

@@ -14,7 +14,7 @@
 # limitations under the License.
 
 from synapse.http.server import JsonResource
-from synapse.replication.http import membership, send_event
+from synapse.replication.http import federation, membership, send_event
 
 REPLICATION_PREFIX = "/_synapse/replication"
 
@@ -27,3 +27,4 @@ class ReplicationRestResource(JsonResource):
     def register_servlets(self, hs):
         send_event.register_servlets(hs, self)
         membership.register_servlets(hs, self)
+        federation.register_servlets(hs, self)

+ 259 - 0
synapse/replication/http/federation.py

@@ -0,0 +1,259 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from synapse.events import FrozenEvent
+from synapse.events.snapshot import EventContext
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.util.metrics import Measure
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint):
+    """Handles events newly received from federation, including persisting and
+    notifying.
+
+    The API looks like:
+
+        POST /_synapse/replication/fed_send_events/:txn_id
+
+        {
+            "events": [{
+                "event": { .. serialized event .. },
+                "internal_metadata": { .. serialized internal_metadata .. },
+                "rejected_reason": ..,   // The event.rejected_reason field
+                "context": { .. serialized event context .. },
+            }],
+            "backfilled": false
+    """
+
+    NAME = "fed_send_events"
+    PATH_ARGS = ()
+
+    def __init__(self, hs):
+        super(ReplicationFederationSendEventsRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.federation_handler = hs.get_handlers().federation_handler
+
+    @staticmethod
+    @defer.inlineCallbacks
+    def _serialize_payload(store, event_and_contexts, backfilled):
+        """
+        Args:
+            store
+            event_and_contexts (list[tuple[FrozenEvent, EventContext]])
+            backfilled (bool): Whether or not the events are the result of
+                backfilling
+        """
+        event_payloads = []
+        for event, context in event_and_contexts:
+            serialized_context = yield context.serialize(event, store)
+
+            event_payloads.append({
+                "event": event.get_pdu_json(),
+                "internal_metadata": event.internal_metadata.get_dict(),
+                "rejected_reason": event.rejected_reason,
+                "context": serialized_context,
+            })
+
+        payload = {
+            "events": event_payloads,
+            "backfilled": backfilled,
+        }
+
+        defer.returnValue(payload)
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request):
+        with Measure(self.clock, "repl_fed_send_events_parse"):
+            content = parse_json_object_from_request(request)
+
+            backfilled = content["backfilled"]
+
+            event_payloads = content["events"]
+
+            event_and_contexts = []
+            for event_payload in event_payloads:
+                event_dict = event_payload["event"]
+                internal_metadata = event_payload["internal_metadata"]
+                rejected_reason = event_payload["rejected_reason"]
+                event = FrozenEvent(event_dict, internal_metadata, rejected_reason)
+
+                context = yield EventContext.deserialize(
+                    self.store, event_payload["context"],
+                )
+
+                event_and_contexts.append((event, context))
+
+        logger.info(
+            "Got %d events from federation",
+            len(event_and_contexts),
+        )
+
+        yield self.federation_handler.persist_events_and_notify(
+            event_and_contexts, backfilled,
+        )
+
+        defer.returnValue((200, {}))
+
+
+class ReplicationFederationSendEduRestServlet(ReplicationEndpoint):
+    """Handles EDUs newly received from federation, including persisting and
+    notifying.
+
+    Request format:
+
+        POST /_synapse/replication/fed_send_edu/:edu_type/:txn_id
+
+        {
+            "origin": ...,
+            "content: { ... }
+        }
+    """
+
+    NAME = "fed_send_edu"
+    PATH_ARGS = ("edu_type",)
+
+    def __init__(self, hs):
+        super(ReplicationFederationSendEduRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.registry = hs.get_federation_registry()
+
+    @staticmethod
+    def _serialize_payload(edu_type, origin, content):
+        return {
+            "origin": origin,
+            "content": content,
+        }
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request, edu_type):
+        with Measure(self.clock, "repl_fed_send_edu_parse"):
+            content = parse_json_object_from_request(request)
+
+            origin = content["origin"]
+            edu_content = content["content"]
+
+        logger.info(
+            "Got %r edu from $s",
+            edu_type, origin,
+        )
+
+        result = yield self.registry.on_edu(edu_type, origin, edu_content)
+
+        defer.returnValue((200, result))
+
+
+class ReplicationGetQueryRestServlet(ReplicationEndpoint):
+    """Handle responding to queries from federation.
+
+    Request format:
+
+        POST /_synapse/replication/fed_query/:query_type
+
+        {
+            "args": { ... }
+        }
+    """
+
+    NAME = "fed_query"
+    PATH_ARGS = ("query_type",)
+
+    # This is a query, so let's not bother caching
+    CACHE = False
+
+    def __init__(self, hs):
+        super(ReplicationGetQueryRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+        self.clock = hs.get_clock()
+        self.registry = hs.get_federation_registry()
+
+    @staticmethod
+    def _serialize_payload(query_type, args):
+        """
+        Args:
+            query_type (str)
+            args (dict): The arguments received for the given query type
+        """
+        return {
+            "args": args,
+        }
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request, query_type):
+        with Measure(self.clock, "repl_fed_query_parse"):
+            content = parse_json_object_from_request(request)
+
+            args = content["args"]
+
+        logger.info(
+            "Got %r query",
+            query_type,
+        )
+
+        result = yield self.registry.on_query(query_type, args)
+
+        defer.returnValue((200, result))
+
+
+class ReplicationCleanRoomRestServlet(ReplicationEndpoint):
+    """Called to clean up any data in DB for a given room, ready for the
+    server to join the room.
+
+    Request format:
+
+        POST /_synapse/replication/fed_query/:fed_cleanup_room/:txn_id
+
+        {}
+    """
+
+    NAME = "fed_cleanup_room"
+    PATH_ARGS = ("room_id",)
+
+    def __init__(self, hs):
+        super(ReplicationCleanRoomRestServlet, self).__init__(hs)
+
+        self.store = hs.get_datastore()
+
+    @staticmethod
+    def _serialize_payload(room_id, args):
+        """
+        Args:
+            room_id (str)
+        """
+        return {}
+
+    @defer.inlineCallbacks
+    def _handle_request(self, request, room_id):
+        yield self.store.clean_room_for_join(room_id)
+
+        defer.returnValue((200, {}))
+
+
+def register_servlets(hs, http_server):
+    ReplicationFederationSendEventsRestServlet(hs).register(http_server)
+    ReplicationFederationSendEduRestServlet(hs).register(http_server)
+    ReplicationGetQueryRestServlet(hs).register(http_server)
+    ReplicationCleanRoomRestServlet(hs).register(http_server)

+ 2 - 11
synapse/replication/slave/storage/transactions.py

@@ -13,19 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage import DataStore
 from synapse.storage.transactions import TransactionStore
 
 from ._base import BaseSlavedStore
 
 
-class TransactionStore(BaseSlavedStore):
-    get_destination_retry_timings = TransactionStore.__dict__[
-        "get_destination_retry_timings"
-    ]
-    _get_destination_retry_timings = DataStore._get_destination_retry_timings.__func__
-    set_destination_retry_timings = DataStore.set_destination_retry_timings.__func__
-    _set_destination_retry_timings = DataStore._set_destination_retry_timings.__func__
-
-    prep_send_transaction = DataStore.prep_send_transaction.__func__
-    delivered_txn = DataStore.delivered_txn.__func__
+class SlavedTransactionStore(TransactionStore, BaseSlavedStore):
+    pass

+ 9 - 2
synapse/rest/client/v1/admin.py

@@ -391,10 +391,17 @@ class DeactivateAccountRestServlet(ClientV1RestServlet):
         if not is_admin:
             raise AuthError(403, "You are not a server admin")
 
-        yield self._deactivate_account_handler.deactivate_account(
+        result = yield self._deactivate_account_handler.deactivate_account(
             target_user_id, erase,
         )
-        defer.returnValue((200, {}))
+        if result:
+            id_server_unbind_result = "success"
+        else:
+            id_server_unbind_result = "no-support"
+
+        defer.returnValue((200, {
+            "id_server_unbind_result": id_server_unbind_result,
+        }))
 
 
 class ShutdownRoomRestServlet(ClientV1RestServlet):

+ 18 - 4
synapse/rest/client/v2_alpha/account.py

@@ -209,10 +209,17 @@ class DeactivateAccountRestServlet(RestServlet):
         yield self.auth_handler.validate_user_via_ui_auth(
             requester, body, self.hs.get_ip_from_request(request),
         )
-        yield self._deactivate_account_handler.deactivate_account(
+        result = yield self._deactivate_account_handler.deactivate_account(
             requester.user.to_string(), erase,
         )
-        defer.returnValue((200, {}))
+        if result:
+            id_server_unbind_result = "success"
+        else:
+            id_server_unbind_result = "no-support"
+
+        defer.returnValue((200, {
+            "id_server_unbind_result": id_server_unbind_result,
+        }))
 
 
 class EmailThreepidRequestTokenRestServlet(RestServlet):
@@ -364,7 +371,7 @@ class ThreepidDeleteRestServlet(RestServlet):
         user_id = requester.user.to_string()
 
         try:
-            yield self.auth_handler.delete_threepid(
+            ret = yield self.auth_handler.delete_threepid(
                 user_id, body['medium'], body['address']
             )
         except Exception:
@@ -374,7 +381,14 @@ class ThreepidDeleteRestServlet(RestServlet):
             logger.exception("Failed to remove threepid")
             raise SynapseError(500, "Failed to remove threepid")
 
-        defer.returnValue((200, {}))
+        if ret:
+            id_server_unbind_result = "success"
+        else:
+            id_server_unbind_result = "no-support"
+
+        defer.returnValue((200, {
+            "id_server_unbind_result": id_server_unbind_result,
+        }))
 
 
 class WhoamiRestServlet(RestServlet):

+ 5 - 1
synapse/server.py

@@ -36,6 +36,7 @@ from synapse.federation.federation_client import FederationClient
 from synapse.federation.federation_server import (
     FederationHandlerRegistry,
     FederationServer,
+    ReplicationFederationHandlerRegistry,
 )
 from synapse.federation.send_queue import FederationRemoteSendQueue
 from synapse.federation.transaction_queue import TransactionQueue
@@ -423,7 +424,10 @@ class HomeServer(object):
         return RoomMemberMasterHandler(self)
 
     def build_federation_registry(self):
-        return FederationHandlerRegistry()
+        if self.config.worker_app:
+            return ReplicationFederationHandlerRegistry(self)
+        else:
+            return FederationHandlerRegistry()
 
     def build_server_notices_manager(self):
         if self.config.worker_app:

+ 0 - 82
synapse/storage/events.py

@@ -1435,88 +1435,6 @@ class EventsStore(EventFederationStore, EventsWorkerStore, BackgroundUpdateStore
             (event.event_id, event.redacts)
         )
 
-    @defer.inlineCallbacks
-    def have_events_in_timeline(self, event_ids):
-        """Given a list of event ids, check if we have already processed and
-        stored them as non outliers.
-        """
-        rows = yield self._simple_select_many_batch(
-            table="events",
-            retcols=("event_id",),
-            column="event_id",
-            iterable=list(event_ids),
-            keyvalues={"outlier": False},
-            desc="have_events_in_timeline",
-        )
-
-        defer.returnValue(set(r["event_id"] for r in rows))
-
-    @defer.inlineCallbacks
-    def have_seen_events(self, event_ids):
-        """Given a list of event ids, check if we have already processed them.
-
-        Args:
-            event_ids (iterable[str]):
-
-        Returns:
-            Deferred[set[str]]: The events we have already seen.
-        """
-        results = set()
-
-        def have_seen_events_txn(txn, chunk):
-            sql = (
-                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
-                % (",".join("?" * len(chunk)), )
-            )
-            txn.execute(sql, chunk)
-            for (event_id, ) in txn:
-                results.add(event_id)
-
-        # break the input up into chunks of 100
-        input_iterator = iter(event_ids)
-        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
-                          []):
-            yield self.runInteraction(
-                "have_seen_events",
-                have_seen_events_txn,
-                chunk,
-            )
-        defer.returnValue(results)
-
-    def get_seen_events_with_rejections(self, event_ids):
-        """Given a list of event ids, check if we rejected them.
-
-        Args:
-            event_ids (list[str])
-
-        Returns:
-            Deferred[dict[str, str|None):
-                Has an entry for each event id we already have seen. Maps to
-                the rejected reason string if we rejected the event, else maps
-                to None.
-        """
-        if not event_ids:
-            return defer.succeed({})
-
-        def f(txn):
-            sql = (
-                "SELECT e.event_id, reason FROM events as e "
-                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
-                "WHERE e.event_id = ?"
-            )
-
-            res = {}
-            for event_id in event_ids:
-                txn.execute(sql, (event_id,))
-                row = txn.fetchone()
-                if row:
-                    _, rejected = row
-                    res[event_id] = rejected
-
-            return res
-
-        return self.runInteraction("get_rejection_reasons", f)
-
     @defer.inlineCallbacks
     def count_daily_messages(self):
         """

+ 83 - 0
synapse/storage/events_worker.py

@@ -12,6 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import itertools
 import logging
 from collections import namedtuple
 
@@ -442,3 +443,85 @@ class EventsWorkerStore(SQLBaseStore):
             self._get_event_cache.prefill((original_ev.event_id,), cache_entry)
 
         defer.returnValue(cache_entry)
+
+    @defer.inlineCallbacks
+    def have_events_in_timeline(self, event_ids):
+        """Given a list of event ids, check if we have already processed and
+        stored them as non outliers.
+        """
+        rows = yield self._simple_select_many_batch(
+            table="events",
+            retcols=("event_id",),
+            column="event_id",
+            iterable=list(event_ids),
+            keyvalues={"outlier": False},
+            desc="have_events_in_timeline",
+        )
+
+        defer.returnValue(set(r["event_id"] for r in rows))
+
+    @defer.inlineCallbacks
+    def have_seen_events(self, event_ids):
+        """Given a list of event ids, check if we have already processed them.
+
+        Args:
+            event_ids (iterable[str]):
+
+        Returns:
+            Deferred[set[str]]: The events we have already seen.
+        """
+        results = set()
+
+        def have_seen_events_txn(txn, chunk):
+            sql = (
+                "SELECT event_id FROM events as e WHERE e.event_id IN (%s)"
+                % (",".join("?" * len(chunk)), )
+            )
+            txn.execute(sql, chunk)
+            for (event_id, ) in txn:
+                results.add(event_id)
+
+        # break the input up into chunks of 100
+        input_iterator = iter(event_ids)
+        for chunk in iter(lambda: list(itertools.islice(input_iterator, 100)),
+                          []):
+            yield self.runInteraction(
+                "have_seen_events",
+                have_seen_events_txn,
+                chunk,
+            )
+        defer.returnValue(results)
+
+    def get_seen_events_with_rejections(self, event_ids):
+        """Given a list of event ids, check if we rejected them.
+
+        Args:
+            event_ids (list[str])
+
+        Returns:
+            Deferred[dict[str, str|None):
+                Has an entry for each event id we already have seen. Maps to
+                the rejected reason string if we rejected the event, else maps
+                to None.
+        """
+        if not event_ids:
+            return defer.succeed({})
+
+        def f(txn):
+            sql = (
+                "SELECT e.event_id, reason FROM events as e "
+                "LEFT JOIN rejections as r ON e.event_id = r.event_id "
+                "WHERE e.event_id = ?"
+            )
+
+            res = {}
+            for event_id in event_ids:
+                txn.execute(sql, (event_id,))
+                row = txn.fetchone()
+                if row:
+                    _, rejected = row
+                    res[event_id] = rejected
+
+            return res
+
+        return self.runInteraction("get_rejection_reasons", f)

+ 16 - 16
synapse/storage/room.py

@@ -41,6 +41,22 @@ RatelimitOverride = collections.namedtuple(
 
 
 class RoomWorkerStore(SQLBaseStore):
+    def get_room(self, room_id):
+        """Retrieve a room.
+
+        Args:
+            room_id (str): The ID of the room to retrieve.
+        Returns:
+            A namedtuple containing the room information, or an empty list.
+        """
+        return self._simple_select_one(
+            table="rooms",
+            keyvalues={"room_id": room_id},
+            retcols=("room_id", "is_public", "creator"),
+            desc="get_room",
+            allow_none=True,
+        )
+
     def get_public_room_ids(self):
         return self._simple_select_onecol(
             table="rooms",
@@ -215,22 +231,6 @@ class RoomStore(RoomWorkerStore, SearchStore):
             logger.error("store_room with room_id=%s failed: %s", room_id, e)
             raise StoreError(500, "Problem creating room.")
 
-    def get_room(self, room_id):
-        """Retrieve a room.
-
-        Args:
-            room_id (str): The ID of the room to retrieve.
-        Returns:
-            A namedtuple containing the room information, or an empty list.
-        """
-        return self._simple_select_one(
-            table="rooms",
-            keyvalues={"room_id": room_id},
-            retcols=("room_id", "is_public", "creator"),
-            desc="get_room",
-            allow_none=True,
-        )
-
     @defer.inlineCallbacks
     def set_room_is_public(self, room_id, is_public):
         def set_room_is_public_txn(txn, next_id):