peer.py 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. from sydent.db.threepid_associations import GlobalAssociationStore
  16. from sydent.threepid import threePidAssocFromDict
  17. from unpaddedbase64 import decode_base64
  18. import signedjson.sign
  19. import signedjson.key
  20. import logging
  21. import json
  22. import nacl
  23. import twisted.internet.reactor
  24. from twisted.internet import defer
  25. from twisted.web.client import readBody
  26. logger = logging.getLogger(__name__)
  27. SIGNING_KEY_ALGORITHM = "ed25519"
  28. class Peer(object):
  29. def __init__(self, servername, pubkeys):
  30. self.servername = servername
  31. self.pubkeys = pubkeys
  32. self.shadow = False
  33. class LocalPeer(Peer):
  34. """
  35. The local peer (ourselves: essentially copying from the local associations table to the global one)
  36. """
  37. def __init__(self, sydent):
  38. super(LocalPeer, self).__init__(sydent.server_name, {})
  39. self.sydent = sydent
  40. globalAssocStore = GlobalAssociationStore(self.sydent)
  41. self.lastId = globalAssocStore.lastIdFromServer(self.servername)
  42. if self.lastId is None:
  43. self.lastId = -1
  44. def pushUpdates(self, sgAssocs):
  45. """Push updates from local associations table to the global one."""
  46. globalAssocStore = GlobalAssociationStore(self.sydent)
  47. for localId in sgAssocs:
  48. if localId > self.lastId:
  49. assocObj = threePidAssocFromDict(sgAssocs[localId][0])
  50. if assocObj.mxid is not None:
  51. # We can probably skip verification for the local peer (although it could be good as a sanity check)
  52. globalAssocStore.addAssociation(assocObj, json.dumps(sgAssocs[localId][0]),
  53. self.sydent.server_name, localId)
  54. else:
  55. globalAssocStore.removeAssociation(assocObj.medium, assocObj.address)
  56. # inject the shadow association, if any.
  57. if sgAssocs[localId][1] is not None:
  58. shadowAssocObj = threePidAssocFromDict(sgAssocs[localId][1])
  59. if shadowAssocObj.mxid is not None:
  60. # we deliberately identify this as originating from us rather than the shadow IS
  61. globalAssocStore.addAssociation(shadowAssocObj, json.dumps(sgAssocs[localId][1]),
  62. self.sydent.server_name, localId)
  63. else:
  64. globalAssocStore.removeAssociation(shadowAssocObj.medium, shadowAssocObj.address)
  65. # if this is an association that matches one of our invite_tokens then we should call the onBind callback
  66. # at this point, in order to tell the inviting HS that someone out there has just bound the 3PID.
  67. self.sydent.threepidBinder.notifyPendingInvites(assocObj)
  68. d = defer.succeed(True)
  69. return d
  70. class RemotePeer(Peer):
  71. def __init__(self, sydent, server_name, pubkeys):
  72. super(RemotePeer, self).__init__(server_name, pubkeys)
  73. self.sydent = sydent
  74. self.port = 1001
  75. # Get verify key for this peer
  76. key_bytes = decode_base64(self.pubkeys[SIGNING_KEY_ALGORITHM])
  77. self.verify_key = signedjson.key.decode_verify_key_bytes(SIGNING_KEY_ALGORITHM + ":", key_bytes)
  78. # Attach metadata
  79. self.verify_key.alg = SIGNING_KEY_ALGORITHM
  80. self.verify_key.version = 0
  81. def verifySignedAssociation(self, assoc):
  82. """Verifies a signature on a signed association.
  83. :param assoc: A signed association.
  84. :type assoc: Dict
  85. """
  86. if not 'signatures' in assoc:
  87. raise NoSignaturesException()
  88. key_ids = signedjson.sign.signature_ids(assoc, self.servername)
  89. if not key_ids or len(key_ids) == 0 or not key_ids[0].startswith(SIGNING_KEY_ALGORITHM + ":"):
  90. e = NoMatchingSignatureException()
  91. e.foundSigs = assoc['signatures'].keys()
  92. e.requiredServername = self.servername
  93. raise e
  94. # Verify the JSON
  95. signedjson.sign.verify_signed_json(assoc, self.servername, self.verify_key)
  96. def pushUpdates(self, data):
  97. """Push updates to a remote peer.
  98. :param data: A dictionary of possible `sg_assocs`, `invite_tokens`
  99. and `ephemeral_public_keys` keys.
  100. :type data: Dict
  101. :returns a deferred.
  102. :rtype: Deferred
  103. """
  104. reqDeferred = self.sydent.replicationHttpsClient.postJson(self.servername,
  105. self.port,
  106. '/_matrix/identity/replicate/v1/push',
  107. data)
  108. # XXX: We'll also need to prune the deleted associations out of the
  109. # local associations table once they've been replicated to all peers
  110. # (ie. remove the record we kept in order to propagate the deletion to
  111. # other peers).
  112. updateDeferred = defer.Deferred()
  113. reqDeferred.addCallback(self._pushSuccess, updateDeferred=updateDeferred)
  114. reqDeferred.addErrback(self._pushFailed, updateDeferred=updateDeferred)
  115. return updateDeferred
  116. def _pushSuccess(self, result, updateDeferred):
  117. if result.code >= 200 and result.code < 300:
  118. updateDeferred.callback(result)
  119. else:
  120. d = readBody(result)
  121. d.addCallback(self._failedPushBodyRead, updateDeferred=updateDeferred)
  122. d.addErrback(self._pushFailed, updateDeferred=updateDeferred)
  123. def _failedPushBodyRead(self, body, updateDeferred):
  124. errObj = json.loads(body)
  125. e = RemotePeerError()
  126. e.errorDict = errObj
  127. updateDeferred.errback(e)
  128. def _pushFailed(self, failure, updateDeferred):
  129. updateDeferred.errback(failure)
  130. return None
  131. class NoSignaturesException(Exception):
  132. pass
  133. class NoMatchingSignatureException(Exception):
  134. def __str__(self):
  135. return "Found signatures: %s, required server name: %s" % (self.foundSigs, self.requiredServername)
  136. class RemotePeerError(Exception):
  137. def __str__(self):
  138. return repr(self.errorDict)