peer.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from sydent.db.threepid_associations import GlobalAssociationStore
  16. from sydent.threepid import threePidAssocFromDict
  17. import signedjson.sign
  18. import signedjson.key
  19. import logging
  20. import json
  21. import nacl
  22. import twisted.internet.reactor
  23. from twisted.internet import defer
  24. from twisted.web.client import readBody
  25. logger = logging.getLogger(__name__)
  26. class Peer(object):
  27. def __init__(self, servername, pubkeys):
  28. self.servername = servername
  29. self.pubkeys = pubkeys
  30. self.shadow = False
  31. class LocalPeer(Peer):
  32. """
  33. The local peer (ourselves: essentially copying from the local associations table to the global one)
  34. """
  35. def __init__(self, sydent):
  36. super(LocalPeer, self).__init__(sydent.server_name, {})
  37. self.sydent = sydent
  38. globalAssocStore = GlobalAssociationStore(self.sydent)
  39. self.lastId = globalAssocStore.lastIdFromServer(self.servername)
  40. if self.lastId is None:
  41. self.lastId = -1
  42. def pushUpdates(self, sgAssocs):
  43. """Push updates from local associations table to the global one."""
  44. globalAssocStore = GlobalAssociationStore(self.sydent)
  45. for localId in sgAssocs:
  46. if localId > self.lastId:
  47. assocObj = threePidAssocFromDict(sgAssocs[localId][0])
  48. if assocObj.mxid is not None:
  49. # We can probably skip verification for the local peer (although it could be good as a sanity check)
  50. globalAssocStore.addAssociation(assocObj, json.dumps(sgAssocs[localId][0]),
  51. self.sydent.server_name, localId)
  52. else:
  53. globalAssocStore.removeAssociation(assocObj.medium, assocObj.address)
  54. # inject the shadow association, if any.
  55. if sgAssocs[localId][1] is not None:
  56. shadowAssocObj = threePidAssocFromDict(sgAssocs[localId][1])
  57. if shadowAssocObj.mxid is not None:
  58. # we deliberately identify this as originating from us rather than the shadow IS
  59. globalAssocStore.addAssociation(shadowAssocObj, json.dumps(sgAssocs[localId][1]),
  60. self.sydent.server_name, localId)
  61. else:
  62. globalAssocStore.removeAssociation(shadowAssocObj.medium, shadowAssocObj.address)
  63. # if this is an association that matches one of our invite_tokens then we should call the onBind callback
  64. # at this point, in order to tell the inviting HS that someone out there has just bound the 3PID.
  65. self.sydent.threepidBinder.notifyPendingInvites(assocObj)
  66. d = defer.succeed(True)
  67. return d
  68. class RemotePeer(Peer):
  69. def __init__(self, sydent, server_name, pubkeys):
  70. super(RemotePeer, self).__init__(server_name, pubkeys)
  71. self.sydent = sydent
  72. self.port = 1001
  73. # Get verify key from signing key
  74. signing_key = signedjson.key.decode_signing_key_base64(alg, "0", self.pubkeys[alg])
  75. self.verify_key = signing_key.verify_key
  76. # Attach metadata
  77. self.verify_key.alg = alg
  78. self.verify_key.version = 0
  79. def verifyMessage(self, jsonMessage):
  80. """Verify a JSON structure has a valid signature from the remote peer."""
  81. if not 'signatures' in jsonMessage:
  82. raise NoSignaturesException()
  83. alg = 'ed25519'
  84. key_ids = signedjson.sign.signature_ids(jsonMessage, self.servername)
  85. if not key_ids or len(key_ids) == 0 or not key_ids[0].startswith(alg + ":"):
  86. e = NoMatchingSignatureException()
  87. e.foundSigs = jsonMessage['signatures'].keys()
  88. e.requiredServername = self.servername
  89. raise e
  90. # Get verify key from signing key
  91. signing_key = signedjson.key.decode_signing_key_base64(alg, "0", self.pubkeys[alg])
  92. verify_key = signing_key.verify_key
  93. # Attach metadata
  94. verify_key.alg = alg
  95. verify_key.version = 0
  96. # Verify the JSON
  97. signedjson.sign.verify_signed_json(jsonMessage, self.servername, self.verify_key)
  98. def pushUpdates(self, data):
  99. """Push updates to a remote peer.
  100. :param data: A dictionary of possible `sg_assocs`, `invite_tokens`
  101. and `ephemeral_public_keys` keys.
  102. :type data: Dict
  103. :returns a deferred.
  104. :rtype: Deferred
  105. """
  106. # sgAssocs is comprised of tuples (sgAssoc, shadowSgAssoc)
  107. if self.shadow:
  108. data["sg_assocs"] = { k: v[1] for k, v in data["sg_assocs"].items() }
  109. else:
  110. data["sg_assocs"] = { k: v[0] for k, v in data["sg_assocs"].items() }
  111. reqDeferred = self.sydent.replicationHttpsClient.postJson(self.servername,
  112. self.port,
  113. '/_matrix/identity/replicate/v1/push',
  114. data)
  115. # XXX: We'll also need to prune the deleted associations out of the
  116. # local associations table once they've been replicated to all peers
  117. # (ie. remove the record we kept in order to propagate the deletion to
  118. # other peers).
  119. updateDeferred = defer.Deferred()
  120. reqDeferred.addCallback(self._pushSuccess, updateDeferred=updateDeferred)
  121. reqDeferred.addErrback(self._pushFailed, updateDeferred=updateDeferred)
  122. return updateDeferred
  123. def _pushSuccess(self, result, updateDeferred):
  124. if result.code >= 200 and result.code < 300:
  125. updateDeferred.callback(result)
  126. else:
  127. d = readBody(result)
  128. d.addCallback(self._failedPushBodyRead, updateDeferred=updateDeferred)
  129. d.addErrback(self._pushFailed, updateDeferred=updateDeferred)
  130. def _failedPushBodyRead(self, body, updateDeferred):
  131. errObj = json.loads(body)
  132. e = RemotePeerError()
  133. e.errorDict = errObj
  134. updateDeferred.errback(e)
  135. def _pushFailed(self, failure, updateDeferred):
  136. updateDeferred.errback(failure)
  137. return None
  138. class NoSignaturesException(Exception):
  139. pass
  140. class NoMatchingSignatureException(Exception):
  141. def __str__(self):
  142. return "Found signatures: %s, required server name: %s" % (self.foundSigs, self.requiredServername)
  143. class RemotePeerError(Exception):
  144. def __str__(self):
  145. return repr(self.errorDict)