peer.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from sydent.db.threepid_associations import GlobalAssociationStore
  16. from sydent.threepid import threePidAssocFromDict
  17. import signedjson.sign
  18. import signedjson.key
  19. import logging
  20. import json
  21. import nacl
  22. import twisted.internet.reactor
  23. from twisted.internet import defer
  24. from twisted.web.client import readBody
  25. logger = logging.getLogger(__name__)
  26. class Peer(object):
  27. def __init__(self, servername, pubkeys):
  28. self.servername = servername
  29. self.pubkeys = pubkeys
  30. self.shadow = False
  31. class LocalPeer(Peer):
  32. """
  33. The local peer (ourselves: essentially copying from the local associations table to the global one)
  34. """
  35. def __init__(self, sydent):
  36. super(LocalPeer, self).__init__(sydent.server_name, {})
  37. self.sydent = sydent
  38. globalAssocStore = GlobalAssociationStore(self.sydent)
  39. self.lastId = globalAssocStore.lastIdFromServer(self.servername)
  40. if self.lastId is None:
  41. self.lastId = -1
  42. def pushUpdates(self, sgAssocs):
  43. """Push updates from local associations table to the global one."""
  44. globalAssocStore = GlobalAssociationStore(self.sydent)
  45. for localId in sgAssocs:
  46. if localId > self.lastId:
  47. assocObj = threePidAssocFromDict(sgAssocs[localId][0])
  48. if assocObj.mxid is not None:
  49. # We can probably skip verification for the local peer (although it could be good as a sanity check)
  50. globalAssocStore.addAssociation(assocObj, json.dumps(sgAssocs[localId][0]),
  51. self.sydent.server_name, localId)
  52. else:
  53. globalAssocStore.removeAssociation(assocObj.medium, assocObj.address)
  54. # inject the shadow association, if any.
  55. if sgAssocs[localId][1] is not None:
  56. shadowAssocObj = threePidAssocFromDict(sgAssocs[localId][1])
  57. if shadowAssocObj.mxid is not None:
  58. # we deliberately identify this as originating from us rather than the shadow IS
  59. globalAssocStore.addAssociation(shadowAssocObj, json.dumps(sgAssocs[localId][1]),
  60. self.sydent.server_name, localId)
  61. else:
  62. globalAssocStore.removeAssociation(shadowAssocObj.medium, shadowAssocObj.address)
  63. # if this is an association that matches one of our invite_tokens then we should call the onBind callback
  64. # at this point, in order to tell the inviting HS that someone out there has just bound the 3PID.
  65. self.sydent.threepidBinder.notifyPendingInvites(assocObj)
  66. d = defer.succeed(True)
  67. return d
  68. class RemotePeer(Peer):
  69. def __init__(self, sydent, server_name, pubkeys):
  70. super(RemotePeer, self).__init__(server_name, pubkeys)
  71. self.sydent = sydent
  72. self.port = 1001
  73. # Get verify key from signing key
  74. signing_key = signedjson.key.decode_signing_key_base64(alg, "0", self.pubkeys[alg])
  75. self.verify_key = signing_key.verify_key
  76. # Attach metadata
  77. self.verify_key.alg = alg
  78. self.verify_key.version = 0
  79. def verifyMessage(self, jsonMessage):
  80. if not 'signatures' in jsonMessage:
  81. raise NoSignaturesException()
  82. alg = 'ed25519'
  83. key_ids = signedjson.sign.signature_ids(jsonMessage, self.servername)
  84. if not key_ids or len(key_ids) == 0 or not key_ids[0].startswith(alg + ":"):
  85. e = NoMatchingSignatureException()
  86. e.foundSigs = jsonMessage['signatures'].keys()
  87. e.requiredServername = self.servername
  88. raise e
  89. # Get verify key from signing key
  90. signing_key = signedjson.key.decode_signing_key_base64(alg, "0", self.pubkeys[alg])
  91. verify_key = signing_key.verify_key
  92. # Attach metadata
  93. verify_key.alg = alg
  94. verify_key.version = 0
  95. # Verify the JSON
  96. signedjson.sign.verify_signed_json(jsonMessage, self.servername, self.verify_key)
  97. def pushUpdates(self, data):
  98. """Push updates to a remote peer.
  99. :param data: A dictionary of possible `sg_assocs`, `invite_tokens`
  100. and `ephemeral_public_keys` keys.
  101. :type data: Dict
  102. :returns a deferred.
  103. :rtype: Deferred
  104. """
  105. # sgAssocs is comprised of tuples (sgAssoc, shadowSgAssoc)
  106. if self.shadow:
  107. data["sg_assocs"] = { k: v[1] for k, v in data["sg_assocs"].items() }
  108. else:
  109. data["sg_assocs"] = { k: v[0] for k, v in data["sg_assocs"].items() }
  110. reqDeferred = self.sydent.replicationHttpsClient.postJson(self.servername,
  111. self.port,
  112. '/_matrix/identity/replicate/v1/push',
  113. data)
  114. # XXX: We'll also need to prune the deleted associations out of the
  115. # local associations table once they've been replicated to all peers
  116. # (ie. remove the record we kept in order to propagate the deletion to
  117. # other peers).
  118. updateDeferred = defer.Deferred()
  119. reqDeferred.addCallback(self._pushSuccess, updateDeferred=updateDeferred)
  120. reqDeferred.addErrback(self._pushFailed, updateDeferred=updateDeferred)
  121. return updateDeferred
  122. def _pushSuccess(self, result, updateDeferred):
  123. if result.code >= 200 and result.code < 300:
  124. updateDeferred.callback(result)
  125. else:
  126. d = readBody(result)
  127. d.addCallback(self._failedPushBodyRead, updateDeferred=updateDeferred)
  128. d.addErrback(self._pushFailed, updateDeferred=updateDeferred)
  129. def _failedPushBodyRead(self, body, updateDeferred):
  130. errObj = json.loads(body)
  131. e = RemotePeerError()
  132. e.errorDict = errObj
  133. updateDeferred.errback(e)
  134. def _pushFailed(self, failure, updateDeferred):
  135. updateDeferred.errback(failure)
  136. return None
  137. class NoSignaturesException(Exception):
  138. pass
  139. class NoMatchingSignatureException(Exception):
  140. def __str__(self):
  141. return "Found signatures: %s, required server name: %s" % (self.foundSigs, self.requiredServername)
  142. class RemotePeerError(Exception):
  143. def __str__(self):
  144. return repr(self.errorDict)