1
0

pusher.py 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2014 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. import logging
  16. import copy
  17. import twisted.internet.reactor
  18. import twisted.internet.task
  19. from sydent.util import time_msec
  20. from sydent.replication.peer import LocalPeer
  21. from sydent.db.threepid_associations import LocalAssociationStore
  22. from sydent.db.invite_tokens import JoinTokenStore
  23. from sydent.db.peers import PeerStore
  24. from sydent.threepid.signer import Signer
  25. logger = logging.getLogger(__name__)
  26. EPHEMERAL_PUBLIC_KEYS_PUSH_LIMIT = 100
  27. INVITE_TOKENS_PUSH_LIMIT = 100
  28. ASSOCIATIONS_PUSH_LIMIT = 100
  29. class Pusher:
  30. def __init__(self, sydent):
  31. self.sydent = sydent
  32. self.pushing = False
  33. self.peerStore = PeerStore(self.sydent)
  34. def setup(self):
  35. cb = twisted.internet.task.LoopingCall(Pusher.scheduledPush, self)
  36. cb.start(10.0)
  37. def getAssociationsAfterId(self, afterId, limit, signed=False):
  38. """Return max `limit` associations from the database after a given
  39. DB table id.
  40. :param afterId: A database id to act as an offset. Rows after this id
  41. are returned.
  42. :type afterId: int
  43. :param limit: Max amount of database rows to return.
  44. :type limit: int
  45. :returns a tuple with the first item being a list of associations,
  46. and the second being the maximum table id of the returned
  47. associations.
  48. :rtype: Tuple[list[Tuple[Dict, Dict]], int|None]
  49. """
  50. assocs = {}
  51. localAssocStore = LocalAssociationStore(self.sydent)
  52. (localAssocs, maxId) = localAssocStore.getAssociationsAfterId(afterId, limit)
  53. signer = Signer(self.sydent)
  54. for localId in localAssocs:
  55. assoc = localAssocs[localId]
  56. if signed:
  57. assoc = signer.signedThreePidAssociation(localAssocs[localId])
  58. shadowAssoc = None
  59. if self.sydent.shadow_hs_master and self.sydent.shadow_hs_slave:
  60. shadowAssoc = copy.deepcopy(localAssocs[localId])
  61. # mxid is null if 3pid has been unbound
  62. if shadowAssoc.mxid:
  63. shadowAssoc.mxid = shadowAssoc.mxid.replace(
  64. ":" + self.sydent.shadow_hs_master,
  65. ":" + self.sydent.shadow_hs_slave
  66. )
  67. if signed:
  68. shadowAssoc = signer.signedThreePidAssociation(shadowAssoc)
  69. assocs[localId] = (assoc, shadowAssoc)
  70. return (assocs, maxId)
  71. def getInviteTokensAfterId(self, afterId, limit):
  72. """Return max `limit` invite tokens from the database after a given
  73. DB table id.
  74. :param afterId: A database id to act as an offset. Rows after this id
  75. are returned.
  76. :type afterId: int
  77. :param limit: Max amount of database rows to return.
  78. :type limit: int
  79. :returns a tuple with the first item being a list of tokens, and the
  80. second being the maximum table id of the returned tokens.
  81. :rtype: Tuple[Dict[int, Dict], int|None]
  82. """
  83. # TODO: Do something for shadow servers?
  84. join_token_store = JoinTokenStore(self.sydent)
  85. return join_token_store.getInviteTokensAfterId(afterId, limit)
  86. def getEphemeralPublicKeysAfterId(self, afterId, limit):
  87. """Return max `limit` ephemeral keys from the database after a given
  88. table id.
  89. :param afterId: A database id to act as an offset. Rows after this id
  90. are returned.
  91. :type afterId: int
  92. :param limit: Max amount of database rows to return.
  93. :type limit: int
  94. :returns a tuple with the first item being a list of keys, and the
  95. second being the maximum table id of the returned keys.
  96. :rtype: Tuple[Dict[int, Dict], int|None]
  97. """
  98. # TODO: Do something for shadow servers?
  99. join_token_store = JoinTokenStore(self.sydent)
  100. return join_token_store.getEphemeralPublicKeysAfterId(afterId, limit)
  101. def doLocalPush(self):
  102. """
  103. Synchronously push local associations to this server (ie. copy them to globals table)
  104. The local server is essentially treated the same as any other peer except we don't do the
  105. network round-trip and this function can be used so the association goes into the global table
  106. before the http call returns (so clients know it will be available on at least the same ID server they used)
  107. """
  108. localPeer = LocalPeer(self.sydent)
  109. (signedAssocs, _) = self.getSignedAssociationsAfterId(localPeer.lastId, None)
  110. localPeer.pushUpdates(signedAssocs)
  111. def scheduledPush(self):
  112. """Push pending updates to a remote peer. To be called regularly."""
  113. if self.pushing:
  114. return
  115. self.pushing = True
  116. updateDeferred = None
  117. try:
  118. peers = self.peerStore.getAllPeers()
  119. for p in peers:
  120. logger.debug("Looking for updates to push to %s", p.servername)
  121. # Dictionary for holding all data to push
  122. push_data = {}
  123. ids = {}
  124. total_updates = 0
  125. # Push associations (called sg_assocs for legacy reasons)
  126. (push_data["sg_assocs"], ids["sg_assocs"]) = self.getAssociationsAfterId(p.lastSentAssocsId, ASSOCIATIONS_PUSH_LIMIT)
  127. total_updates += len(push_data["sg_assocs"])
  128. # Push invite tokens and ephemeral public keys
  129. (push_data["invite_tokens"], ids["invite_tokens"]) = self.getInviteTokensAfterId(p.lastSentInviteTokensId, INVITE_TOKENS_PUSH_LIMIT)
  130. (push_data["ephemeral_public_keys"], ids["ephemeral_public_keys"]) = self.getEphemeralPublicKeysAfterId(p.lastSentEphemeralKeysId, EPHEMERAL_PUBLIC_KEYS_PUSH_LIMIT)
  131. total_updates += len(push_data["invite_tokens"]) + len(push_data["ephemeral_public_keys"])
  132. # Sign push data
  133. signer = Signer(self.sydent)
  134. push_data = signer.signReplicationJSON(push_data)
  135. logger.debug("%d updates to push to %s", total_updates, p.servername)
  136. if total_updates:
  137. logger.info("Pushing %d updates to %s:%d", total_updates, p.servername, p.port)
  138. logger.info("Sending: %s", str(push_data))
  139. updateDeferred = p.pushUpdates(push_data)
  140. updateDeferred.addCallback(self._pushSucceeded, peer=p, ids=ids)
  141. updateDeferred.addErrback(self._pushFailed, peer=p)
  142. break
  143. finally:
  144. if not updateDeferred:
  145. self.pushing = False
  146. def _pushSucceeded(self, result, peer, ids):
  147. """To be called after a successful push to a remote peer."""
  148. logger.info("Pushed updates to %s with result %d %s",
  149. peer.servername, result.code, result.phrase)
  150. self.peerStore.setLastSentIdAndPokeSucceeded(peer.servername, ids, time_msec())
  151. self.pushing = False
  152. self.scheduledPush()
  153. def _pushFailed(self, failure, peer):
  154. """To be called after an unsuccessful push to a remote peer."""
  155. logger.info("Failed to push updates to %s:%s: %s", peer.servername, peer.port, failure)
  156. self.pushing = False
  157. return None