BootstrapperDb.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159
  1. import time
  2. import re
  3. import gevent
  4. from Config import config
  5. from Db import Db
  6. from util import helper
  7. class BootstrapperDb(Db):
  8. def __init__(self):
  9. self.version = 6
  10. self.hash_ids = {} # hash -> id cache
  11. super(BootstrapperDb, self).__init__({"db_name": "Bootstrapper"}, "%s/bootstrapper.db" % config.data_dir)
  12. self.foreign_keys = True
  13. self.checkTables()
  14. self.updateHashCache()
  15. gevent.spawn(self.cleanup)
  16. def cleanup(self):
  17. while 1:
  18. time.sleep(4*60)
  19. timeout = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time() - 60 * 40))
  20. self.execute("DELETE FROM peer WHERE date_announced < ?", [timeout])
  21. def updateHashCache(self):
  22. res = self.execute("SELECT * FROM hash")
  23. self.hash_ids = {str(row["hash"]): row["hash_id"] for row in res}
  24. self.log.debug("Loaded %s hash_ids" % len(self.hash_ids))
  25. def checkTables(self):
  26. version = int(self.execute("PRAGMA user_version").fetchone()[0])
  27. self.log.debug("Db version: %s, needed: %s" % (version, self.version))
  28. if version < self.version:
  29. self.createTables()
  30. else:
  31. self.execute("VACUUM")
  32. def createTables(self):
  33. # Delete all tables
  34. self.execute("PRAGMA writable_schema = 1")
  35. self.execute("DELETE FROM sqlite_master WHERE type IN ('table', 'index', 'trigger')")
  36. self.execute("PRAGMA writable_schema = 0")
  37. self.execute("VACUUM")
  38. self.execute("PRAGMA INTEGRITY_CHECK")
  39. # Create new tables
  40. self.execute("""
  41. CREATE TABLE peer (
  42. peer_id INTEGER PRIMARY KEY ASC AUTOINCREMENT NOT NULL UNIQUE,
  43. port INTEGER NOT NULL,
  44. ip4 TEXT,
  45. onion TEXT,
  46. date_added DATETIME DEFAULT (CURRENT_TIMESTAMP),
  47. date_announced DATETIME DEFAULT (CURRENT_TIMESTAMP)
  48. );
  49. """)
  50. self.execute("""
  51. CREATE TABLE peer_to_hash (
  52. peer_to_hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL,
  53. peer_id INTEGER REFERENCES peer (peer_id) ON DELETE CASCADE,
  54. hash_id INTEGER REFERENCES hash (hash_id)
  55. );
  56. """)
  57. self.execute("CREATE INDEX peer_id ON peer_to_hash (peer_id);")
  58. self.execute("CREATE INDEX hash_id ON peer_to_hash (hash_id);")
  59. self.execute("""
  60. CREATE TABLE hash (
  61. hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL,
  62. hash BLOB UNIQUE NOT NULL,
  63. date_added DATETIME DEFAULT (CURRENT_TIMESTAMP)
  64. );
  65. """)
  66. self.execute("PRAGMA user_version = %s" % self.version)
  67. def getHashId(self, hash):
  68. if hash not in self.hash_ids:
  69. self.log.debug("New hash: %s" % repr(hash))
  70. self.execute("INSERT OR IGNORE INTO hash ?", {"hash": buffer(hash)})
  71. self.hash_ids[hash] = self.cur.cursor.lastrowid
  72. return self.hash_ids[hash]
  73. def peerAnnounce(self, ip4=None, onion=None, port=None, hashes=[], onion_signed=False, delete_missing_hashes=False):
  74. hashes_ids_announced = []
  75. for hash in hashes:
  76. hashes_ids_announced.append(self.getHashId(hash))
  77. if not ip4 and not onion:
  78. return 0
  79. # Check user
  80. if onion:
  81. res = self.execute("SELECT * FROM peer WHERE ? LIMIT 1", {"onion": onion})
  82. else:
  83. res = self.execute("SELECT * FROM peer WHERE ? LIMIT 1", {"ip4": ip4, "port": port})
  84. user_row = res.fetchone()
  85. now = time.strftime("%Y-%m-%d %H:%M:%S")
  86. if user_row:
  87. peer_id = user_row["peer_id"]
  88. self.execute("UPDATE peer SET date_announced = ? WHERE peer_id = ?", (now, peer_id))
  89. else:
  90. self.log.debug("New peer: %s %s signed: %s" % (ip4, onion, onion_signed))
  91. if onion and not onion_signed:
  92. return len(hashes)
  93. self.execute("INSERT INTO peer ?", {"ip4": ip4, "onion": onion, "port": port, "date_announced": now})
  94. peer_id = self.cur.cursor.lastrowid
  95. # Check user's hashes
  96. res = self.execute("SELECT * FROM peer_to_hash WHERE ?", {"peer_id": peer_id})
  97. hash_ids_db = [row["hash_id"] for row in res]
  98. if hash_ids_db != hashes_ids_announced:
  99. hash_ids_added = set(hashes_ids_announced) - set(hash_ids_db)
  100. hash_ids_removed = set(hash_ids_db) - set(hashes_ids_announced)
  101. if not onion or onion_signed:
  102. for hash_id in hash_ids_added:
  103. self.execute("INSERT INTO peer_to_hash ?", {"peer_id": peer_id, "hash_id": hash_id})
  104. if hash_ids_removed and delete_missing_hashes:
  105. self.execute("DELETE FROM peer_to_hash WHERE ?", {"peer_id": peer_id, "hash_id": list(hash_ids_removed)})
  106. return len(hash_ids_added) + len(hash_ids_removed)
  107. else:
  108. return 0
  109. def peerList(self, hash, ip4=None, onions=[], port=None, limit=30, need_types=["ip4", "onion"]):
  110. hash_peers = {"ip4": [], "onion": []}
  111. if limit == 0:
  112. return hash_peers
  113. hashid = self.getHashId(hash)
  114. where = "hash_id = :hashid"
  115. if onions:
  116. onions_escaped = ["'%s'" % re.sub("[^a-z0-9,]", "", onion) for onion in onions if type(onion) is str]
  117. where += " AND (onion NOT IN (%s) OR onion IS NULL)" % ",".join(onions_escaped)
  118. elif ip4:
  119. where += " AND (NOT (ip4 = :ip4 AND port = :port) OR ip4 IS NULL)"
  120. query = """
  121. SELECT ip4, port, onion
  122. FROM peer_to_hash
  123. LEFT JOIN peer USING (peer_id)
  124. WHERE %s
  125. ORDER BY date_announced DESC
  126. LIMIT :limit
  127. """ % where
  128. res = self.execute(query, {"hashid": hashid, "ip4": ip4, "onions": onions, "port": port, "limit": limit})
  129. for row in res:
  130. if row["ip4"] and "ip4" in need_types:
  131. hash_peers["ip4"].append(
  132. helper.packAddress(row["ip4"], row["port"])
  133. )
  134. if row["onion"] and "onion" in need_types:
  135. hash_peers["onion"].append(
  136. helper.packOnionAddress(row["onion"], row["port"])
  137. )
  138. return hash_peers