replication.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import twisted.python.log
  16. from twisted.web.resource import Resource
  17. from sydent.http.servlets import jsonwrap
  18. from sydent.threepid import threePidAssocFromDict
  19. from sydent.db.peers import PeerStore
  20. from sydent.db.threepid_associations import GlobalAssociationStore
  21. import logging
  22. import json
  23. logger = logging.getLogger(__name__)
  24. class ReplicationPushServlet(Resource):
  25. def __init__(self, sydent):
  26. self.sydent = sydent
  27. @jsonwrap
  28. def render_POST(self, request):
  29. peerCert = request.transport.getPeerCertificate()
  30. peerCertCn = peerCert.get_subject().commonName
  31. peerStore = PeerStore(self.sydent)
  32. peer = peerStore.getPeerByName(peerCertCn)
  33. if not peer:
  34. logger.warn("Got connection from %s but no peer found by that name", peerCertCn)
  35. request.setResponseCode(403)
  36. return {'errcode': 'M_UNKNOWN_PEER', 'error': 'This peer is not known to this server'}
  37. logger.info("Push connection made from peer %s", peer.servername)
  38. if not request.requestHeaders.hasHeader('Content-Type') or \
  39. request.requestHeaders.getRawHeaders('Content-Type')[0] != 'application/json':
  40. logger.warn("Peer %s made push connection with non-JSON content (type: %s)",
  41. peer.servername, request.requestHeaders.getRawHeaders('Content-Type')[0])
  42. return {'errcode': 'M_NOT_JSON', 'error': 'This endpoint expects JSON'}
  43. try:
  44. inJson = json.load(request.content)
  45. except ValueError:
  46. logger.warn("Peer %s made push connection with malformed JSON", peer.servername)
  47. return {'errcode': 'M_BAD_JSON', 'error': 'Malformed JSON'}
  48. if 'sgAssocs' not in inJson:
  49. logger.warn("Peer %s made push connection with no 'sgAssocs' key in JSON", peer.servername)
  50. return {'errcode': 'M_BAD_JSON', 'error': 'No "sgAssocs" key in JSON'}
  51. failedIds = []
  52. globalAssocsStore = GlobalAssociationStore(self.sydent)
  53. for originId,sgAssoc in inJson['sgAssocs'].items():
  54. try:
  55. peer.verifyMessage(sgAssoc)
  56. logger.debug("Signed association from %s with origin ID %s verified", peer.servername, originId)
  57. # Don't bother adding if one has already failed: we add all of them or none so we're only going to
  58. # roll back the transaction anyway (but we continue to try & verify the rest so we can give a
  59. # complete list of the ones that don't verify)
  60. if len(failedIds) > 0:
  61. continue
  62. assocObj = threePidAssocFromDict(sgAssoc)
  63. if assocObj.mxid is not None:
  64. globalAssocsStore.addAssociation(assocObj, json.dumps(sgAssoc), peer.servername, originId, commit=False)
  65. else:
  66. logger.info("Incoming deletion: removing associations for %s / %s", assocObj.medium, assocObj.address)
  67. globalAssocsStore.removeAssociation(assocObj.medium, assocObj.address)
  68. logger.info("Stored association origin ID %s from %s", originId, peer.servername)
  69. except:
  70. failedIds.append(originId)
  71. logger.warn("Failed to verify signed association from %s with origin ID %s",
  72. peer.servername, originId)
  73. twisted.python.log.err()
  74. if len(failedIds) > 0:
  75. self.sydent.db.rollback()
  76. request.setResponseCode(400)
  77. return {'errcode': 'M_VERIFICATION_FAILED', 'error': 'Verification failed for one or more associations',
  78. 'failed_ids':failedIds}
  79. else:
  80. self.sydent.db.commit()
  81. return {'success':True}