pusher.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import copy
  17. import twisted.internet.reactor
  18. import twisted.internet.task
  19. from sydent.util import time_msec
  20. from sydent.replication.peer import LocalPeer
  21. from sydent.db.threepid_associations import LocalAssociationStore
  22. from sydent.db.invite_tokens import JoinTokenStore
  23. from sydent.db.peers import PeerStore
  24. from sydent.threepid.signer import Signer
  25. logger = logging.getLogger(__name__)
  26. EPHEMERAL_PUBLIC_KEYS_PUSH_LIMIT = 100
  27. INVITE_TOKENS_PUSH_LIMIT = 100
  28. ASSOCIATIONS_PUSH_LIMIT = 100
  29. class Pusher:
  30. def __init__(self, sydent):
  31. self.sydent = sydent
  32. self.pushing = False
  33. self.peerStore = PeerStore(self.sydent)
  34. def setup(self):
  35. cb = twisted.internet.task.LoopingCall(Pusher.scheduledPush, self)
  36. cb.start(10.0)
  37. def getSignedAssociationsAfterId(self, afterId, limit, shadow=False):
  38. """Return max `limit` associations from the database after a given
  39. DB table id.
  40. :param afterId: A database id to act as an offset. Rows after this id
  41. are returned.
  42. :type afterId: int
  43. :param limit: Max amount of database rows to return.
  44. :type limit: int
  45. :param shadow: Whether these associations are intended for a shadow
  46. server.
  47. :type shadow: bool
  48. :returns a tuple with the first item being a dict of associations,
  49. and the second being the maximum table id of the returned
  50. associations.
  51. :rtype: Tuple[Dict[Dict, Dict], int|None]
  52. """
  53. assocs = {}
  54. localAssocStore = LocalAssociationStore(self.sydent)
  55. (localAssocs, maxId) = localAssocStore.getAssociationsAfterId(afterId, limit)
  56. signer = Signer(self.sydent)
  57. for localId, assoc in localAssocs.items():
  58. if shadow and self.sydent.shadow_hs_master and self.sydent.shadow_hs_slave:
  59. # mxid is null if 3pid has been unbound
  60. if assoc.mxid:
  61. assoc.mxid = assoc.mxid.replace(
  62. ":" + self.sydent.shadow_hs_master,
  63. ":" + self.sydent.shadow_hs_slave
  64. )
  65. assocs[localId] = signer.signedThreePidAssociation(assoc)
  66. return (assocs, maxId)
  67. def doLocalPush(self):
  68. """
  69. Synchronously push local associations to this server (ie. copy them to globals table)
  70. The local server is essentially treated the same as any other peer except we don't do the
  71. network round-trip and this function can be used so the association goes into the global table
  72. before the http call returns (so clients know it will be available on at least the same ID server they used)
  73. """
  74. localPeer = LocalPeer(self.sydent)
  75. (signedAssocs, _) = self.getSignedAssociationsAfterId(localPeer.lastId, None)
  76. localPeer.pushUpdates(signedAssocs)
  77. def scheduledPush(self):
  78. """Push pending updates to a remote peer. To be called regularly."""
  79. if self.pushing:
  80. return
  81. self.pushing = True
  82. updateDeferred = None
  83. join_token_store = JoinTokenStore(self.sydent)
  84. try:
  85. peers = self.peerStore.getAllPeers()
  86. for p in peers:
  87. logger.debug("Looking for updates to push to %s", p.servername)
  88. # Dictionary for holding all data to push
  89. push_data = {}
  90. # Dictionary for holding all the ids of db tables we've successfully replicated up to
  91. ids = {}
  92. total_updates = 0
  93. # Push associations
  94. (push_data["sg_assocs"], ids["sg_assocs"]) = self.getSignedAssociationsAfterId(p.lastSentAssocsId, ASSOCIATIONS_PUSH_LIMIT, p.shadow)
  95. total_updates += len(push_data["sg_assocs"])
  96. # Push invite tokens and ephemeral public keys
  97. (push_data["invite_tokens"], ids["invite_tokens"]) = join_token_store.getInviteTokensAfterId(p.lastSentInviteTokensId, INVITE_TOKENS_PUSH_LIMIT)
  98. (push_data["ephemeral_public_keys"], ids["ephemeral_public_keys"]) = join_token_store.getEphemeralPublicKeysAfterId(p.lastSentEphemeralKeysId, EPHEMERAL_PUBLIC_KEYS_PUSH_LIMIT)
  99. total_updates += len(push_data["invite_tokens"]) + len(push_data["ephemeral_public_keys"])
  100. logger.debug("%d updates to push to %s", total_updates, p.servername)
  101. if total_updates:
  102. logger.info("Pushing %d updates to %s:%d", total_updates, p.servername, p.port)
  103. updateDeferred = p.pushUpdates(push_data)
  104. updateDeferred.addCallback(self._pushSucceeded, peer=p, ids=ids)
  105. updateDeferred.addErrback(self._pushFailed, peer=p)
  106. break
  107. finally:
  108. if not updateDeferred:
  109. self.pushing = False
  110. def _pushSucceeded(self, result, peer, ids):
  111. """To be called after a successful push to a remote peer."""
  112. logger.info("Pushed updates to %s with result %d %s",
  113. peer.servername, result.code, result.phrase)
  114. self.peerStore.setLastSentIdAndPokeSucceeded(peer.servername, ids, time_msec())
  115. self.pushing = False
  116. self.scheduledPush()
  117. def _pushFailed(self, failure, peer):
  118. """To be called after an unsuccessful push to a remote peer."""
  119. logger.info("Failed to push updates to %s:%s: %s", peer.servername, peer.port, failure)
  120. self.pushing = False
  121. return None