replication.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import twisted.python.log
  16. from twisted.web.resource import Resource
  17. from twisted.web import server
  18. from twisted.internet import defer
  19. from sydent.http.servlets import jsonwrap
  20. from sydent.threepid import threePidAssocFromDict
  21. from sydent.db.peers import PeerStore
  22. from sydent.db.threepid_associations import GlobalAssociationStore
  23. from sydent.db.invite_tokens import JoinTokenStore
  24. from sydent.replication.peer import NoMatchingSignatureException, NoSignaturesException, RemotePeerError
  25. from signedjson.sign import SignatureVerifyException
  26. import logging
  27. import json
  28. logger = logging.getLogger(__name__)
  29. MAX_SG_ASSOCS_LIMIT = 100
  30. MAX_INVITE_TOKENS_LIMIT = 100
  31. MAX_EPHEMERAL_PUBLIC_KEYS_LIMIT = 100
  32. class ReplicationPushServlet(Resource):
  33. def __init__(self, sydent):
  34. self.sydent = sydent
  35. def render_POST(self, request):
  36. self._async_render_POST(request)
  37. return server.NOT_DONE_YET
  38. @defer.inlineCallbacks
  39. def _async_render_POST(self, request):
  40. """Verify and store replicated information from trusted peer identity servers.
  41. To prevent data sent from erroneous servers from being stored, we
  42. initially verify that the sender's certificate contains a commonName
  43. that we trust. This is checked against the peers stored in the local
  44. DB. Data is then ingested.
  45. Replicated associations must each be individually signed by the
  46. signing key of the remote peer, which we verify using the verifykey
  47. stored in the local DB.
  48. Other data does not need to be signed.
  49. :params request: The HTTPS request.
  50. """
  51. peerCert = request.transport.getPeerCertificate()
  52. peerCertCn = peerCert.get_subject().commonName
  53. peerStore = PeerStore(self.sydent)
  54. peer = peerStore.getPeerByName(peerCertCn)
  55. if not peer:
  56. logger.exception("Got connection from %s but no peer found by that name", peerCertCn)
  57. request.setResponseCode(403)
  58. request.write(json.dumps({'errcode': 'M_UNKNOWN_PEER', 'error': 'This peer is not known to this server'}))
  59. request.finish()
  60. return
  61. logger.info("Push connection made from peer %s", peer.servername)
  62. if not request.requestHeaders.hasHeader('Content-Type') or \
  63. request.requestHeaders.getRawHeaders('Content-Type')[0] != 'application/json':
  64. logger.warn("Peer %s made push connection with non-JSON content (type: %s)",
  65. peer.servername, request.requestHeaders.getRawHeaders('Content-Type')[0])
  66. request.setResponseCode(400)
  67. request.write(json.dumps({'errcode': 'M_NOT_JSON', 'error': 'This endpoint expects JSON'}))
  68. request.finish()
  69. return
  70. try:
  71. inJson = json.load(request.content)
  72. except ValueError:
  73. logger.warn("Peer %s made push connection with malformed JSON", peer.servername)
  74. request.setResponseCode(400)
  75. request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': 'Malformed JSON'}))
  76. request.finish()
  77. return
  78. # Ensure there is data we are able to process
  79. if 'sg_assocs' not in inJson and 'invite_tokens' not in inJson and 'ephemeral_public_keys' not in inJson:
  80. logger.warn("Peer %s made push connection with no 'sg_assocs', 'invite_tokens' or 'ephemeral_public_keys' keys in JSON", peer.servername)
  81. request.setResponseCode(400)
  82. request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': 'No "sg_assocs", "invite_tokens" or "ephemeral_public_keys" key in JSON'}))
  83. request.finish()
  84. return
  85. # Check for any signed associations
  86. if 'sg_assocs' in inJson and len(inJson['sg_assocs']) > 0:
  87. if len(inJson['sg_assocs']) > MAX_SG_ASSOCS_LIMIT:
  88. request.setResponseCode(400)
  89. request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': '"sg_assocs" has more than %d keys' % MAX_SG_ASSOCS_LIMIT}))
  90. request.finish()
  91. return
  92. globalAssocsStore = GlobalAssociationStore(self.sydent)
  93. # Check that this message is signed by one of our trusted associated peers
  94. for originId, sgAssoc in inJson['sg_assocs'].items():
  95. try:
  96. yield peer.verifySignedAssociation(sgAssoc)
  97. except (NoSignaturesException, NoMatchingSignatureException, RemotePeerError, SignatureVerifyException):
  98. self.sydent.db.rollback()
  99. logger.warn("Failed to verify JSON from %s", peer.servername)
  100. request.setResponseCode(400)
  101. request.write(json.dumps({'errcode': 'M_VERIFICATION_FAILED', 'error': 'Signature verification failed'}))
  102. request.finish()
  103. return
  104. except Exception:
  105. self.sydent.db.rollback()
  106. logger.exception("Failed to verify JSON from %s", peer.servername)
  107. request.setResponseCode(500)
  108. request.write(json.dumps({'errcode': 'M_INTERNAL_SERVER_ERROR', 'error': 'Signature verification failed'}))
  109. request.finish()
  110. return
  111. assocObj = threePidAssocFromDict(sgAssoc)
  112. if assocObj.mxid is not None:
  113. # Add the association components and the original signed
  114. # object (as assocs must be signed when requested by clients)
  115. globalAssocsStore.addAssociation(assocObj, json.dumps(sgAssoc), peer.servername, originId, commit=False)
  116. else:
  117. logger.info("Incoming deletion: removing associations for %s / %s", assocObj.medium, assocObj.address)
  118. globalAssocsStore.removeAssociation(assocObj.medium, assocObj.address)
  119. logger.info("Stored association with origin ID %s from %s", originId, peer.servername)
  120. # if this is an association that matches one of our invite_tokens then we should call the onBind callback
  121. # at this point, in order to tell the inviting HS that someone out there has just bound the 3PID.
  122. self.sydent.threepidBinder.notifyPendingInvites(assocObj)
  123. # Check for any invite tokens
  124. if 'invite_tokens' in inJson or 'ephemeral_public_keys' in inJson:
  125. tokensStore = JoinTokenStore(self.sydent)
  126. if 'invite_tokens' in inJson and len(inJson['invite_tokens']) > 0:
  127. if len(inJson['invite_tokens']) > MAX_INVITE_TOKENS_LIMIT:
  128. self.sydent.db.rollback()
  129. request.setResponseCode(400)
  130. request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': '"invite_tokens" has more than %d keys' % MAX_INVITE_TOKENS_LIMIT}))
  131. request.finish()
  132. return
  133. last_processed_id = tokensStore.getLastTokenIdFromServer(peer.servername)
  134. for originId, inviteToken in inJson["invite_tokens"].items():
  135. # Make sure we haven't processed this token already
  136. # If so, back out of all incoming tokens and return an error
  137. if int(originId) <= int(last_processed_id):
  138. self.sydent.db.rollback()
  139. request.setResponseCode(200)
  140. request.write(json.dumps({'success': True, 'message': 'Already processed token ID %s' % str(originId)}))
  141. request.finish()
  142. return
  143. tokensStore.storeToken(inviteToken['medium'], inviteToken['address'], inviteToken['room_id'],
  144. inviteToken['sender'], inviteToken['token'],
  145. originServer=peer.servername, originId=originId, commit=False)
  146. logger.info("Stored invite token with origin ID %s from %s", originId, peer.servername)
  147. # Check for any ephemeral public keys
  148. if 'ephemeral_public_keys' in inJson and len(inJson['ephemeral_public_keys']) > 0:
  149. if len(inJson['ephemeral_public_keys']) > MAX_EPHEMERAL_PUBLIC_KEYS_LIMIT:
  150. self.sydent.db.rollback()
  151. request.setResponseCode(400)
  152. request.write(json.dumps({'errcode': 'M_BAD_JSON', 'error': '"ephemeral_public_keys" has more than %d keys' % MAX_EPHEMERAL_PUBLIC_KEYS_LIMIT}))
  153. request.finish()
  154. return
  155. last_processed_id = tokensStore.getLastEphemeralPublicKeyIdFromServer(peer.servername)
  156. for originId, ephemeralKey in inJson["ephemeral_public_keys"].items():
  157. # Make sure we haven't processed this key already
  158. # If so, back out of all incoming keys and return an error
  159. if int(originId) <= int(last_processed_id):
  160. self.sydent.db.rollback()
  161. request.setResponseCode(200)
  162. request.write(json.dumps({'success': True, 'message': 'Already processed key ID %s' % str(originId)}))
  163. request.finish()
  164. return
  165. tokensStore.storeEphemeralPublicKey(
  166. ephemeralKey['public_key'], persistenceTs=ephemeralKey['persistence_ts'],
  167. originServer=peer.servername, originId=originId, commit=False)
  168. logger.info("Stored ephemeral key with origin ID %s from %s", originId, peer.servername)
  169. self.sydent.db.commit()
  170. request.write(json.dumps({'success':True}))
  171. request.finish()
  172. return