pusher.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import twisted.internet.reactor
  17. import twisted.internet.task
  18. from sydent.util import time_msec
  19. from sydent.replication.peer import LocalPeer
  20. from sydent.db.threepid_associations import LocalAssociationStore
  21. from sydent.db.peers import PeerStore
  22. from sydent.threepid.signer import Signer
  23. logger = logging.getLogger(__name__)
  24. class Pusher:
  25. def __init__(self, sydent):
  26. self.sydent = sydent
  27. self.pushing = False
  28. self.peerStore = PeerStore(self.sydent)
  29. def setup(self):
  30. cb = twisted.internet.task.LoopingCall(Pusher.scheduledPush, self)
  31. cb.start(10.0)
  32. def getSignedAssociationsAfterId(self, afterId, limit):
  33. assocs = {}
  34. localAssocStore = LocalAssociationStore(self.sydent)
  35. (localAssocs, maxId) = localAssocStore.getAssociationsAfterId(afterId, limit)
  36. signer = Signer(self.sydent)
  37. for localId in localAssocs:
  38. sgAssoc = signer.signedThreePidAssociation(localAssocs[localId])
  39. signedAssocs[localId] = sgAssoc
  40. return (signedAssocs, maxId)
  41. def doLocalPush(self):
  42. """
  43. Synchronously push local associations to this server (ie. copy them to globals table)
  44. The local server is essentially treated the same as any other peer except we don't do the
  45. network round-trip and this function can be used so the association goes into the global table
  46. before the http call returns (so clients know it will be available on at least the same ID server they used)
  47. """
  48. localPeer = LocalPeer(self.sydent)
  49. signedAssocs = self.getSignedAssociationsAfterId(localPeer.lastId, None)[0]
  50. localPeer.pushUpdates(signedAssocs)
  51. def scheduledPush(self):
  52. if self.pushing:
  53. return
  54. self.pushing = True
  55. updateDeferred = None
  56. try:
  57. peers = self.peerStore.getAllPeers()
  58. for p in peers:
  59. logger.debug("Looking for update after %d to push to %s", p.lastSentVersion, p.servername)
  60. (signedAssocTuples, maxId) = self.getSignedAssociationsAfterId(p.lastSentVersion, 100)
  61. logger.debug("%d updates to push to %s", len(signedAssocTuples), p.servername)
  62. if len(signedAssocTuples) > 0:
  63. logger.info("Pushing %d updates to %s", len(signedAssocTuples), p.servername)
  64. updateDeferred = p.pushUpdates(signedAssocTuples)
  65. updateDeferred.addCallback(self._pushSucceeded, peer=p, maxId=maxId)
  66. updateDeferred.addErrback(self._pushFailed, peer=p)
  67. break
  68. finally:
  69. if not updateDeferred:
  70. self.pushing = False
  71. def _pushSucceeded(self, result, peer, maxId):
  72. logger.info("Pushed updates up to %d to %s with result %d %s",
  73. maxId, peer.servername, result.code, result.phrase)
  74. self.peerStore.setLastSentVersionAndPokeSucceeded(peer.servername, maxId, time_msec())
  75. self.pushing = False
  76. self.scheduledPush()
  77. def _pushFailed(self, failure, peer):
  78. logger.info("Failed to push updates to %s: %s", peer.servername, failure)
  79. self.pushing = False
  80. return None