Преглед на файлове

Rename replicationpushservlet back to replication

Andrew Morgan преди 5 години
променени са 3 файла, в които са добавени 145 реда и са изтрити 246 реда
  1. 144 38
  2. 0 207
  3. 1 1

+ 144 - 38

@@ -16,22 +16,51 @@
 import twisted.python.log
 from twisted.web.resource import Resource
+from twisted.web import server
+from twisted.internet import defer
 from sydent.http.servlets import jsonwrap
 from sydent.threepid import threePidAssocFromDict
 from sydent.db.peers import PeerStore
 from sydent.db.threepid_associations import GlobalAssociationStore
+from sydent.db.invite_tokens import JoinTokenStore
+from sydent.replication.peer import NoMatchingSignatureException, NoSignaturesException, RemotePeerError
+from signedjson.sign import SignatureVerifyException
 import logging
 import json
 logger = logging.getLogger(__name__)
 class ReplicationPushServlet(Resource):
     def __init__(self, sydent):
         self.sydent = sydent
-    @jsonwrap
     def render_POST(self, request):
+        self._async_render_POST(request)
+        return server.NOT_DONE_YET
+    @defer.inlineCallbacks
+    def _async_render_POST(self, request):
+        """Verify and store replicated information from trusted peer identity servers.
+        To prevent data sent from erroneous servers from being stored, we
+        initially verify that the sender's certificate contains a commonName
+        that we trust. This is checked against the peers stored in the local
+        DB. Data is then ingested.
+        Replicated associations must each be individually signed by the
+        signing key of the remote peer, which we verify using the verifykey
+        stored in the local DB.
+        Other data does not need to be signed.
+        :params request: The HTTPS request.
+        """
         peerCert = request.transport.getPeerCertificate()
         peerCertCn = peerCert.get_subject().commonName
@@ -40,9 +69,11 @@ class ReplicationPushServlet(Resource):
         peer = peerStore.getPeerByName(peerCertCn)
         if not peer:
-            logger.warn("Got connection from %s but no peer found by that name", peerCertCn)
+            logger.exception("Got connection from %s but no peer found by that name", peerCertCn)
-            return {'errcode': 'M_UNKNOWN_PEER', 'error': 'This peer is not known to this server'}
+            request.write(json.dumps({'errcode': 'M_UNKNOWN_PEER', 'error': 'This peer is not known to this server'}))
+            request.finish()
+            return
         logger.info("Push connection made from peer %s", peer.servername)
@@ -50,52 +81,127 @@ class ReplicationPushServlet(Resource):
                 request.requestHeaders.getRawHeaders('Content-Type')[0] != 'application/json':
             logger.warn("Peer %s made push connection with non-JSON content (type: %s)",
                         peer.servername, request.requestHeaders.getRawHeaders('Content-Type')[0])
-            return {'errcode': 'M_NOT_JSON', 'error': 'This endpoint expects JSON'}
+            request.setResponseCode(400)
+            request.write(json.dumps({'errcode': 'M_NOT_JSON', 'error': 'This endpoint expects JSON'}))
+            request.finish()
+            return
             inJson = json.load(request.content)
         except ValueError:
             logger.warn("Peer %s made push connection with malformed JSON", peer.servername)
-            return {'errcode': 'M_BAD_JSON', 'error': 'Malformed JSON'}
-        if 'sgAssocs' not in inJson:
-            logger.warn("Peer %s made push connection with no 'sgAssocs' key in JSON", peer.servername)
-            return {'errcode': 'M_BAD_JSON', 'error': 'No "sgAssocs" key in JSON'}
-        failedIds = []
-        globalAssocsStore = GlobalAssociationStore(self.sydent)
-        for originId,sgAssoc in inJson['sgAssocs'].items():
-            try:
-                peer.verifySignedAssociation(sgAssoc)
-                logger.debug("Signed association from %s with origin ID %s verified", peer.servername, originId)
+            request.setResponseCode(400)
+            request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': 'Malformed JSON'}))
+            request.finish()
+            return
-                # Don't bother adding if one has already failed: we add all of them or none so we're only going to
-                # roll back the transaction anyway (but we continue to try & verify the rest so we can give a
-                # complete list of the ones that don't verify)
-                if len(failedIds) > 0:
-                    continue
+        # Ensure there is data we are able to process
+        if 'sg_assocs' not in inJson and 'invite_tokens' not in inJson and 'ephemeral_public_keys' not in inJson:
+            logger.warn("Peer %s made push connection with no 'sg_assocs', 'invite_tokens' or 'ephemeral_public_keys' keys in JSON", peer.servername)
+            request.setResponseCode(400)
+            request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': 'No "sg_assocs", "invite_tokens" or "ephemeral_public_keys" key in JSON'}))
+            request.finish()
+            return
+        # Check for any signed associations
+        if 'sg_assocs' in inJson and len(inJson['sg_assocs']) > 0:
+            if len(inJson['sg_assocs']) > MAX_SG_ASSOCS_LIMIT:
+                request.setResponseCode(400)
+                request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': '"sg_assocs" has more than %d keys' % MAX_SG_ASSOCS_LIMIT}))
+                request.finish()
+                return
+            globalAssocsStore = GlobalAssociationStore(self.sydent)
+            # Check that this message is signed by one of our trusted associated peers
+            for originId, sgAssoc in inJson['sg_assocs'].items():
+                try:
+                    yield peer.verifySignedAssociation(sgAssoc)
+                except (NoSignaturesException, NoMatchingSignatureException, RemotePeerError, SignatureVerifyException):
+                    self.sydent.db.rollback()
+                    logger.warn("Failed to verify JSON from %s", peer.servername)
+                    request.setResponseCode(400)
+                    request.write(json.dumps({'errcode': 'M_VERIFICATION_FAILED', 'error': 'Signature verification failed'}))
+                    request.finish()
+                    return
+                except Exception:
+                    self.sydent.db.rollback()
+                    logger.exception("Failed to verify JSON from %s", peer.servername)
+                    request.setResponseCode(500)
+                    request.write(json.dumps({'errcode': 'M_INTERNAL_SERVER_ERROR', 'error': 'Signature verification failed'}))
+                    request.finish()
+                    return
                 assocObj = threePidAssocFromDict(sgAssoc)
                 if assocObj.mxid is not None:
+                    # Add the association components and the original signed
+                    # object (as assocs must be signed when requested by clients)
                     globalAssocsStore.addAssociation(assocObj, json.dumps(sgAssoc), peer.servername, originId, commit=False)
                     logger.info("Incoming deletion: removing associations for %s / %s", assocObj.medium, assocObj.address)
                     globalAssocsStore.removeAssociation(assocObj.medium, assocObj.address)
-                logger.info("Stored association origin ID %s from %s", originId, peer.servername)
-            except:
-                failedIds.append(originId)
-                logger.warn("Failed to verify signed association from %s with origin ID %s",
-                            peer.servername, originId)
-                twisted.python.log.err()
-        if len(failedIds) > 0:
-            self.sydent.db.rollback()
-            request.setResponseCode(400)
-            return {'errcode': 'M_VERIFICATION_FAILED', 'error': 'Verification failed for one or more associations',
-                    'failed_ids':failedIds}
-        else:
-            self.sydent.db.commit()
-            return {'success':True}
+                logger.info("Stored association with origin ID %s from %s", originId, peer.servername)
+                # if this is an association that matches one of our invite_tokens then we should call the onBind callback
+                # at this point, in order to tell the inviting HS that someone out there has just bound the 3PID.
+                self.sydent.threepidBinder.notifyPendingInvites(assocObj)
+        # Check for any invite tokens
+        if 'invite_tokens' in inJson or 'ephemeral_public_keys' in inJson:
+            tokensStore = JoinTokenStore(self.sydent)
+            if 'invite_tokens' in inJson and len(inJson['invite_tokens']) > 0:
+                if len(inJson['invite_tokens']) > MAX_INVITE_TOKENS_LIMIT:
+                    self.sydent.db.rollback()
+                    request.setResponseCode(400)
+                    request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': '"invite_tokens" has more than %d keys' % MAX_INVITE_TOKENS_LIMIT}))
+                    request.finish()
+                    return
+                last_processed_id = tokensStore.getLastTokenIdFromServer(peer.servername)
+                for originId, inviteToken in inJson["invite_tokens"].items():
+                    # Make sure we haven't processed this token already
+                    # If so, back out of all incoming tokens and return an error
+                    if int(originId) <= int(last_processed_id):
+                        self.sydent.db.rollback()
+                        request.setResponseCode(200)
+                        request.write(json.dumps({'success': True, 'message': 'Already processed token ID %s' % str(originId)}))
+                        request.finish()
+                        return
+                    tokensStore.storeToken(inviteToken['medium'], inviteToken['address'], inviteToken['room_id'],
+                                        inviteToken['sender'], inviteToken['token'],
+                                        originServer=peer.servername, originId=originId, commit=False)
+                    logger.info("Stored invite token with origin ID %s from %s", originId, peer.servername)
+            # Check for any ephemeral public keys
+            if 'ephemeral_public_keys' in inJson and len(inJson['ephemeral_public_keys']) > 0:
+                if len(inJson['ephemeral_public_keys']) > MAX_EPHEMERAL_PUBLIC_KEYS_LIMIT:
+                    self.sydent.db.rollback()
+                    request.setResponseCode(400)
+                    request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': '"ephemeral_public_keys" has more than %d keys' % MAX_EPHEMERAL_PUBLIC_KEYS_LIMIT}))
+                    request.finish()
+                    return
+                last_processed_id = tokensStore.getLastEphemeralPublicKeyIdFromServer(peer.servername)
+                for originId, ephemeralKey in inJson["ephemeral_public_keys"].items():
+                    # Make sure we haven't processed this key already
+                    # If so, back out of all incoming keys and return an error
+                    if int(originId) <= int(last_processed_id):
+                        self.sydent.db.rollback()
+                        request.setResponseCode(200)
+                        request.write(json.dumps({'success': True, 'message': 'Already processed key ID %s' % str(originId)}))
+                        request.finish()
+                        return
+                    tokensStore.storeEphemeralPublicKey(
+                        ephemeralKey['public_key'], persistenceTs=ephemeralKey['persistence_ts'],
+                        originServer=peer.servername, originId=originId, commit=False)
+                    logger.info("Stored ephemeral key with origin ID %s from %s", originId, peer.servername)
+        self.sydent.db.commit()
+        request.write(json.dumps({'success':True}))
+        request.finish()
+        return

+ 0 - 207

@@ -1,207 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2014 OpenMarket Ltd
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#     http://www.apache.org/licenses/LICENSE-2.0
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import twisted.python.log
-from twisted.web.resource import Resource
-from twisted.web import server
-from twisted.internet import defer
-from sydent.http.servlets import jsonwrap
-from sydent.threepid import threePidAssocFromDict
-from sydent.db.peers import PeerStore
-from sydent.db.threepid_associations import GlobalAssociationStore
-from sydent.db.invite_tokens import JoinTokenStore
-from sydent.replication.peer import NoMatchingSignatureException, NoSignaturesException, RemotePeerError
-from signedjson.sign import SignatureVerifyException
-import logging
-import json
-logger = logging.getLogger(__name__)
-class ReplicationPushServlet(Resource):
-    def __init__(self, sydent):
-        self.sydent = sydent
-    def render_POST(self, request):
-        self._async_render_POST(request)
-        return server.NOT_DONE_YET
-    @defer.inlineCallbacks
-    def _async_render_POST(self, request):
-        """Verify and store replicated information from trusted peer identity servers.
-        To prevent data sent from erroneous servers from being stored, we
-        initially verify that the sender's certificate contains a commonName
-        that we trust. This is checked against the peers stored in the local
-        DB. Data is then ingested.
-        Replicated associations must each be individually signed by the
-        signing key of the remote peer, which we verify using the verifykey
-        stored in the local DB.
-        Other data does not need to be signed.
-        :params request: The HTTPS request.
-        """
-        peerCert = request.transport.getPeerCertificate()
-        peerCertCn = peerCert.get_subject().commonName
-        peerStore = PeerStore(self.sydent)
-        peer = peerStore.getPeerByName(peerCertCn)
-        if not peer:
-            logger.exception("Got connection from %s but no peer found by that name", peerCertCn)
-            request.setResponseCode(403)
-            request.write(json.dumps({'errcode': 'M_UNKNOWN_PEER', 'error': 'This peer is not known to this server'}))
-            request.finish()
-            return
-        logger.info("Push connection made from peer %s", peer.servername)
-        if not request.requestHeaders.hasHeader('Content-Type') or \
-                request.requestHeaders.getRawHeaders('Content-Type')[0] != 'application/json':
-            logger.warn("Peer %s made push connection with non-JSON content (type: %s)",
-                        peer.servername, request.requestHeaders.getRawHeaders('Content-Type')[0])
-            request.setResponseCode(400)
-            request.write(json.dumps({'errcode': 'M_NOT_JSON', 'error': 'This endpoint expects JSON'}))
-            request.finish()
-            return
-        try:
-            inJson = json.load(request.content)
-        except ValueError:
-            logger.warn("Peer %s made push connection with malformed JSON", peer.servername)
-            request.setResponseCode(400)
-            request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': 'Malformed JSON'}))
-            request.finish()
-            return
-        # Ensure there is data we are able to process
-        if 'sg_assocs' not in inJson and 'invite_tokens' not in inJson and 'ephemeral_public_keys' not in inJson:
-            logger.warn("Peer %s made push connection with no 'sg_assocs', 'invite_tokens' or 'ephemeral_public_keys' keys in JSON", peer.servername)
-            request.setResponseCode(400)
-            request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': 'No "sg_assocs", "invite_tokens" or "ephemeral_public_keys" key in JSON'}))
-            request.finish()
-            return
-        # Check for any signed associations
-        if 'sg_assocs' in inJson and len(inJson['sg_assocs']) > 0:
-            if len(inJson['sg_assocs']) > MAX_SG_ASSOCS_LIMIT:
-                request.setResponseCode(400)
-                request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': '"sg_assocs" has more than %d keys' % MAX_SG_ASSOCS_LIMIT}))
-                request.finish()
-                return
-            globalAssocsStore = GlobalAssociationStore(self.sydent)
-            # Check that this message is signed by one of our trusted associated peers
-            for originId, sgAssoc in inJson['sg_assocs'].items():
-                try:
-                    yield peer.verifySignedAssociation(sgAssoc)
-                except (NoSignaturesException, NoMatchingSignatureException, RemotePeerError, SignatureVerifyException):
-                    self.sydent.db.rollback()
-                    logger.warn("Failed to verify JSON from %s", peer.servername)
-                    request.setResponseCode(400)
-                    request.write(json.dumps({'errcode': 'M_VERIFICATION_FAILED', 'error': 'Signature verification failed'}))
-                    request.finish()
-                    return
-                except Exception:
-                    self.sydent.db.rollback()
-                    logger.exception("Failed to verify JSON from %s", peer.servername)
-                    request.setResponseCode(500)
-                    request.write(json.dumps({'errcode': 'M_INTERNAL_SERVER_ERROR', 'error': 'Signature verification failed'}))
-                    request.finish()
-                    return
-                assocObj = threePidAssocFromDict(sgAssoc)
-                if assocObj.mxid is not None:
-                    # Add the association components and the original signed
-                    # object (as assocs must be signed when requested by clients)
-                    globalAssocsStore.addAssociation(assocObj, json.dumps(sgAssoc), peer.servername, originId, commit=False)
-                else:
-                    logger.info("Incoming deletion: removing associations for %s / %s", assocObj.medium, assocObj.address)
-                    globalAssocsStore.removeAssociation(assocObj.medium, assocObj.address)
-                logger.info("Stored association with origin ID %s from %s", originId, peer.servername)
-                # if this is an association that matches one of our invite_tokens then we should call the onBind callback
-                # at this point, in order to tell the inviting HS that someone out there has just bound the 3PID.
-                self.sydent.threepidBinder.notifyPendingInvites(assocObj)
-        # Check for any invite tokens
-        if 'invite_tokens' in inJson or 'ephemeral_public_keys' in inJson:
-            tokensStore = JoinTokenStore(self.sydent)
-            if 'invite_tokens' in inJson and len(inJson['invite_tokens']) > 0:
-                if len(inJson['invite_tokens']) > MAX_INVITE_TOKENS_LIMIT:
-                    self.sydent.db.rollback()
-                    request.setResponseCode(400)
-                    request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': '"invite_tokens" has more than %d keys' % MAX_INVITE_TOKENS_LIMIT}))
-                    request.finish()
-                    return
-                last_processed_id = tokensStore.getLastTokenIdFromServer(peer.servername)
-                for originId, inviteToken in inJson["invite_tokens"].items():
-                    # Make sure we haven't processed this token already
-                    # If so, back out of all incoming tokens and return an error
-                    if int(originId) <= int(last_processed_id):
-                        self.sydent.db.rollback()
-                        request.setResponseCode(200)
-                        request.write(json.dumps({'success': True, 'message': 'Already processed token ID %s' % str(originId)}))
-                        request.finish()
-                        return
-                    tokensStore.storeToken(inviteToken['medium'], inviteToken['address'], inviteToken['room_id'],
-                                        inviteToken['sender'], inviteToken['token'],
-                                        originServer=peer.servername, originId=originId, commit=False)
-                    logger.info("Stored invite token with origin ID %s from %s", originId, peer.servername)
-            # Check for any ephemeral public keys
-            if 'ephemeral_public_keys' in inJson and len(inJson['ephemeral_public_keys']) > 0:
-                if len(inJson['ephemeral_public_keys']) > MAX_EPHEMERAL_PUBLIC_KEYS_LIMIT:
-                    self.sydent.db.rollback()
-                    request.setResponseCode(400)
-                    request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': '"ephemeral_public_keys" has more than %d keys' % MAX_EPHEMERAL_PUBLIC_KEYS_LIMIT}))
-                    request.finish()
-                    return
-                last_processed_id = tokensStore.getLastEphemeralPublicKeyIdFromServer(peer.servername)
-                for originId, ephemeralKey in inJson["ephemeral_public_keys"].items():
-                    # Make sure we haven't processed this key already
-                    # If so, back out of all incoming keys and return an error
-                    if int(originId) <= int(last_processed_id):
-                        self.sydent.db.rollback()
-                        request.setResponseCode(200)
-                        request.write(json.dumps({'success': True, 'message': 'Already processed key ID %s' % str(originId)}))
-                        request.finish()
-                        return
-                    tokensStore.storeEphemeralPublicKey(
-                        ephemeralKey['public_key'], persistenceTs=ephemeralKey['persistence_ts'],
-                        originServer=peer.servername, originId=originId, commit=False)
-                    logger.info("Stored ephemeral key with origin ID %s from %s", originId, peer.servername)
-        self.sydent.db.commit()
-        request.write(json.dumps({'success':True}))
-        request.finish()
-        return

+ 1 - 1

@@ -48,7 +48,7 @@ from http.servlets.bulklookupservlet import BulkLookupServlet
 from http.servlets.pubkeyservlets import Ed25519Servlet
 from http.servlets.threepidbindservlet import ThreePidBindServlet
 from http.servlets.threepidunbindservlet import ThreePidUnbindServlet
-from http.servlets.replicationpushservlet import ReplicationPushServlet
+from http.servlets.replication import ReplicationPushServlet
 from http.servlets.getvalidated3pidservlet import GetValidated3pidServlet
 from http.servlets.store_invite_servlet import StoreInviteServlet
 from http.servlets.infoservlet import InfoServlet