peer.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from sydent.db.threepid_associations import GlobalAssociationStore
  16. from sydent.threepid import threePidAssocFromDict
  17. import signedjson.sign
  18. import logging
  19. import json
  20. import twisted.internet.reactor
  21. from twisted.internet import defer
  22. from twisted.web.client import readBody
  23. logger = logging.getLogger(__name__)
  24. class Peer(object):
  25. def __init__(self, servername, pubkeys):
  26. self.servername = servername
  27. self.pubkeys = pubkeys
  28. def pushUpdates(self, sgAssocs):
  29. """
  30. :param sgAssocs: Sequence of (originId, sgAssoc) tuples where originId is the id on the creating server and
  31. sgAssoc is the json object of the signed association
  32. :return a deferred
  33. """
  34. pass
  35. class LocalPeer(Peer):
  36. """
  37. The local peer (ourselves: essentially copying from the local associations table to the global one)
  38. """
  39. def __init__(self, sydent):
  40. super(LocalPeer, self).__init__(sydent.server_name, {})
  41. self.sydent = sydent
  42. globalAssocStore = GlobalAssociationStore(self.sydent)
  43. self.lastId = globalAssocStore.lastIdFromServer(self.servername)
  44. if self.lastId is None:
  45. self.lastId = -1
  46. def pushUpdates(self, sgAssocs):
  47. globalAssocStore = GlobalAssociationStore(self.sydent)
  48. for localId in sgAssocs:
  49. if localId > self.lastId:
  50. assocObj = threePidAssocFromDict(sgAssocs[localId])
  51. if assocObj.mxid is not None:
  52. # We can probably skip verification for the local peer (although it could be good as a sanity check)
  53. globalAssocStore.addAssociation(assocObj, json.dumps(sgAssocs[localId]),
  54. self.sydent.server_name, localId)
  55. else:
  56. globalAssocStore.removeAssociation(assocObj.medium, assocObj.address)
  57. d = twisted.internet.defer.succeed(True)
  58. return d
  59. class RemotePeer(Peer):
  60. def __init__(self, sydent, server_name, pubkeys):
  61. super(RemotePeer, self).__init__(server_name, pubkeys)
  62. self.sydent = sydent
  63. self.port = 1001
  64. def verifyMessage(self, jsonMessage):
  65. if not 'signatures' in jsonMessage:
  66. raise NoSignaturesException()
  67. alg = 'ed25519'
  68. key_ids = signedjson.sign.signature_ids(jsonMessage, self.servername)
  69. if not key_ids or len(key_ids) == 0 or not key_ids[0].startswith(alg + ":"):
  70. e = NoMatchingSignatureException()
  71. e.foundSigs = jsonMessage['signatures'].keys()
  72. e.requiredServername = self.servername
  73. raise e
  74. # Get verify key from signing key
  75. signing_key = signedjson.key.decode_signing_key_base64(alg, "0", self.pubkeys[alg])
  76. verify_key = signing_key.verify_key
  77. # Attach metadata
  78. verify_key.alg = alg
  79. verify_key.version = 0
  80. # Verify the JSON
  81. signedjson.sign.verify_signed_json(jsonMessage, self.servername, verify_key)
  82. def pushUpdates(self, sgAssocs):
  83. body = {'sgAssocs': sgAssocs}
  84. reqDeferred = self.sydent.replicationHttpsClient.postJson(self.servername,
  85. self.port,
  86. '/_matrix/identity/replicate/v1/push',
  87. body)
  88. # XXX: We'll also need to prune the deleted associations out of the
  89. # local associations table once they've been replicated to all peers
  90. # (ie. remove the record we kept in order to propagate the deletion to
  91. # other peers).
  92. updateDeferred = twisted.internet.defer.Deferred()
  93. reqDeferred.addCallback(self._pushSuccess, updateDeferred=updateDeferred)
  94. reqDeferred.addErrback(self._pushFailed, updateDeferred=updateDeferred)
  95. return updateDeferred
  96. def _pushSuccess(self, result, updateDeferred):
  97. if result.code >= 200 and result.code < 300:
  98. updateDeferred.callback(result)
  99. else:
  100. d = readBody(result)
  101. d.addCallback(self._failedPushBodyRead, updateDeferred=updateDeferred)
  102. d.addErrback(self._pushFailed, updateDeferred=updateDeferred)
  103. def _failedPushBodyRead(self, body, updateDeferred):
  104. errObj = json.loads(body)
  105. e = RemotePeerError()
  106. e.errorDict = errObj
  107. updateDeferred.errback(e)
  108. def _pushFailed(self, failure, updateDeferred):
  109. updateDeferred.errback(failure)
  110. return None
  111. class NoSignaturesException(Exception):
  112. pass
  113. class NoMatchingSignatureException(Exception):
  114. def __str__(self):
  115. return "Found signatures: %s, required server name: %s" % (self.foundSigs, self.requiredServername)
  116. class RemotePeerError(Exception):
  117. def __str__(self):
  118. return repr(self.errorDict)