BootstrapperDb.py 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158
  1. import time
  2. import re
  3. import gevent
  4. from Config import config
  5. from Db import Db
  6. from util import helper
  7. class BootstrapperDb(Db):
  8. def __init__(self):
  9. self.version = 6
  10. self.hash_ids = {} # hash -> id cache
  11. super(BootstrapperDb, self).__init__({"db_name": "Bootstrapper"}, "%s/bootstrapper.db" % config.data_dir)
  12. self.foreign_keys = True
  13. self.checkTables()
  14. self.updateHashCache()
  15. gevent.spawn(self.cleanup)
  16. def cleanup(self):
  17. while 1:
  18. self.execute("DELETE FROM peer WHERE date_announced < DATETIME('now', '-40 minute')")
  19. time.sleep(4*60)
  20. def updateHashCache(self):
  21. res = self.execute("SELECT * FROM hash")
  22. self.hash_ids = {str(row["hash"]): row["hash_id"] for row in res}
  23. self.log.debug("Loaded %s hash_ids" % len(self.hash_ids))
  24. def checkTables(self):
  25. version = int(self.execute("PRAGMA user_version").fetchone()[0])
  26. self.log.debug("Db version: %s, needed: %s" % (version, self.version))
  27. if version < self.version:
  28. self.createTables()
  29. else:
  30. self.execute("VACUUM")
  31. def createTables(self):
  32. # Delete all tables
  33. self.execute("PRAGMA writable_schema = 1")
  34. self.execute("DELETE FROM sqlite_master WHERE type IN ('table', 'index', 'trigger')")
  35. self.execute("PRAGMA writable_schema = 0")
  36. self.execute("VACUUM")
  37. self.execute("PRAGMA INTEGRITY_CHECK")
  38. # Create new tables
  39. self.execute("""
  40. CREATE TABLE peer (
  41. peer_id INTEGER PRIMARY KEY ASC AUTOINCREMENT NOT NULL UNIQUE,
  42. port INTEGER NOT NULL,
  43. ip4 TEXT,
  44. onion TEXT,
  45. date_added DATETIME DEFAULT (CURRENT_TIMESTAMP),
  46. date_announced DATETIME DEFAULT (CURRENT_TIMESTAMP)
  47. );
  48. """)
  49. self.execute("""
  50. CREATE TABLE peer_to_hash (
  51. peer_to_hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL,
  52. peer_id INTEGER REFERENCES peer (peer_id) ON DELETE CASCADE,
  53. hash_id INTEGER REFERENCES hash (hash_id)
  54. );
  55. """)
  56. self.execute("CREATE INDEX peer_id ON peer_to_hash (peer_id);")
  57. self.execute("CREATE INDEX hash_id ON peer_to_hash (hash_id);")
  58. self.execute("""
  59. CREATE TABLE hash (
  60. hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL,
  61. hash BLOB UNIQUE NOT NULL,
  62. date_added DATETIME DEFAULT (CURRENT_TIMESTAMP)
  63. );
  64. """)
  65. self.execute("PRAGMA user_version = %s" % self.version)
  66. def getHashId(self, hash):
  67. if hash not in self.hash_ids:
  68. self.log.debug("New hash: %s" % repr(hash))
  69. self.execute("INSERT OR IGNORE INTO hash ?", {"hash": buffer(hash)})
  70. self.hash_ids[hash] = self.cur.cursor.lastrowid
  71. return self.hash_ids[hash]
  72. def peerAnnounce(self, ip4=None, onion=None, port=None, hashes=[], onion_signed=False, delete_missing_hashes=False):
  73. hashes_ids_announced = []
  74. for hash in hashes:
  75. hashes_ids_announced.append(self.getHashId(hash))
  76. if not ip4 and not onion:
  77. return 0
  78. # Check user
  79. if onion:
  80. res = self.execute("SELECT * FROM peer WHERE ? LIMIT 1", {"onion": onion})
  81. else:
  82. res = self.execute("SELECT * FROM peer WHERE ? LIMIT 1", {"ip4": ip4, "port": port})
  83. user_row = res.fetchone()
  84. if user_row:
  85. peer_id = user_row["peer_id"]
  86. self.execute("UPDATE peer SET date_announced = DATETIME('now') WHERE ?", {"peer_id": peer_id})
  87. else:
  88. self.log.debug("New peer: %s %s signed: %s" % (ip4, onion, onion_signed))
  89. if onion and not onion_signed:
  90. return len(hashes)
  91. self.execute("INSERT INTO peer ?", {"ip4": ip4, "onion": onion, "port": port})
  92. peer_id = self.cur.cursor.lastrowid
  93. # Check user's hashes
  94. res = self.execute("SELECT * FROM peer_to_hash WHERE ?", {"peer_id": peer_id})
  95. hash_ids_db = [row["hash_id"] for row in res]
  96. if hash_ids_db != hashes_ids_announced:
  97. hash_ids_added = set(hashes_ids_announced) - set(hash_ids_db)
  98. hash_ids_removed = set(hash_ids_db) - set(hashes_ids_announced)
  99. if not onion or onion_signed:
  100. for hash_id in hash_ids_added:
  101. self.execute("INSERT INTO peer_to_hash ?", {"peer_id": peer_id, "hash_id": hash_id})
  102. if hash_ids_removed and delete_missing_hashes:
  103. self.execute("DELETE FROM peer_to_hash WHERE ?", {"peer_id": peer_id, "hash_id": list(hash_ids_removed)})
  104. return len(hash_ids_added) + len(hash_ids_removed)
  105. else:
  106. return 0
  107. def peerList(self, hash, ip4=None, onions=[], port=None, limit=30, need_types=["ip4", "onion"]):
  108. hash_peers = {"ip4": [], "onion": []}
  109. if limit == 0:
  110. return hash_peers
  111. hashid = self.getHashId(hash)
  112. where = "hash_id = :hashid"
  113. if onions:
  114. onions_escaped = ["'%s'" % re.sub("[^a-z0-9,]", "", onion) for onion in onions]
  115. where += " AND (onion NOT IN (%s) OR onion IS NULL)" % ",".join(onions_escaped)
  116. elif ip4:
  117. where += " AND (NOT (ip4 = :ip4 AND port = :port) OR ip4 IS NULL)"
  118. query = """
  119. SELECT ip4, port, onion
  120. FROM peer_to_hash
  121. LEFT JOIN peer USING (peer_id)
  122. WHERE %s
  123. ORDER BY date_announced DESC
  124. LIMIT :limit
  125. """ % where
  126. res = self.execute(query, {"hashid": hashid, "ip4": ip4, "onions": onions, "port": port, "limit": limit})
  127. for row in res:
  128. if row["ip4"] and "ip4" in need_types:
  129. hash_peers["ip4"].append(
  130. helper.packAddress(row["ip4"], row["port"])
  131. )
  132. if row["onion"] and "onion" in need_types:
  133. hash_peers["onion"].append(
  134. helper.packOnionAddress(row["onion"], row["port"])
  135. )
  136. return hash_peers