BootstrapperDb.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. import time
  2. import re
  3. import gevent
  4. from Config import config
  5. from Db import Db
  6. from util import helper
  7. class BootstrapperDb(Db):
  8. def __init__(self):
  9. self.version = 7
  10. self.hash_ids = {} # hash -> id cache
  11. super(BootstrapperDb, self).__init__({"db_name": "Bootstrapper"}, "%s/bootstrapper.db" % config.data_dir)
  12. self.foreign_keys = True
  13. self.checkTables()
  14. self.updateHashCache()
  15. gevent.spawn(self.cleanup)
  16. def cleanup(self):
  17. while 1:
  18. time.sleep(4 * 60)
  19. timeout = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time() - 60 * 40))
  20. self.execute("DELETE FROM peer WHERE date_announced < ?", [timeout])
  21. def updateHashCache(self):
  22. res = self.execute("SELECT * FROM hash")
  23. self.hash_ids = {str(row["hash"]): row["hash_id"] for row in res}
  24. self.log.debug("Loaded %s hash_ids" % len(self.hash_ids))
  25. def checkTables(self):
  26. version = int(self.execute("PRAGMA user_version").fetchone()[0])
  27. self.log.debug("Db version: %s, needed: %s" % (version, self.version))
  28. if version < self.version:
  29. self.createTables()
  30. else:
  31. self.execute("VACUUM")
  32. def createTables(self):
  33. # Delete all tables
  34. self.execute("PRAGMA writable_schema = 1")
  35. self.execute("DELETE FROM sqlite_master WHERE type IN ('table', 'index', 'trigger')")
  36. self.execute("PRAGMA writable_schema = 0")
  37. self.execute("VACUUM")
  38. self.execute("PRAGMA INTEGRITY_CHECK")
  39. # Create new tables
  40. self.execute("""
  41. CREATE TABLE peer (
  42. peer_id INTEGER PRIMARY KEY ASC AUTOINCREMENT NOT NULL UNIQUE,
  43. type TEXT,
  44. address TEXT,
  45. port INTEGER NOT NULL,
  46. date_added DATETIME DEFAULT (CURRENT_TIMESTAMP),
  47. date_announced DATETIME DEFAULT (CURRENT_TIMESTAMP)
  48. );
  49. """)
  50. self.execute("CREATE UNIQUE INDEX peer_key ON peer (address, port);")
  51. self.execute("""
  52. CREATE TABLE peer_to_hash (
  53. peer_to_hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL,
  54. peer_id INTEGER REFERENCES peer (peer_id) ON DELETE CASCADE,
  55. hash_id INTEGER REFERENCES hash (hash_id)
  56. );
  57. """)
  58. self.execute("CREATE INDEX peer_id ON peer_to_hash (peer_id);")
  59. self.execute("CREATE INDEX hash_id ON peer_to_hash (hash_id);")
  60. self.execute("""
  61. CREATE TABLE hash (
  62. hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL,
  63. hash BLOB UNIQUE NOT NULL,
  64. date_added DATETIME DEFAULT (CURRENT_TIMESTAMP)
  65. );
  66. """)
  67. self.execute("PRAGMA user_version = %s" % self.version)
  68. def getHashId(self, hash):
  69. if hash not in self.hash_ids:
  70. self.log.debug("New hash: %s" % repr(hash))
  71. self.execute("INSERT OR IGNORE INTO hash ?", {"hash": buffer(hash)})
  72. self.hash_ids[hash] = self.cur.cursor.lastrowid
  73. return self.hash_ids[hash]
  74. def peerAnnounce(self, ip_type, address, port=None, hashes=[], onion_signed=False, delete_missing_hashes=False):
  75. hashes_ids_announced = []
  76. for hash in hashes:
  77. hashes_ids_announced.append(self.getHashId(hash))
  78. # Check user
  79. res = self.execute("SELECT peer_id FROM peer WHERE ? LIMIT 1", {"address": address, "port": port})
  80. user_row = res.fetchone()
  81. now = time.strftime("%Y-%m-%d %H:%M:%S")
  82. if user_row:
  83. peer_id = user_row["peer_id"]
  84. self.execute("UPDATE peer SET date_announced = ? WHERE peer_id = ?", (now, peer_id))
  85. else:
  86. self.log.debug("New peer: %s signed: %s" % (address, onion_signed))
  87. if ip_type == "onion" and not onion_signed:
  88. return len(hashes)
  89. self.execute("INSERT INTO peer ?", {"type": ip_type, "address": address, "port": port, "date_announced": now})
  90. peer_id = self.cur.cursor.lastrowid
  91. # Check user's hashes
  92. res = self.execute("SELECT * FROM peer_to_hash WHERE ?", {"peer_id": peer_id})
  93. hash_ids_db = [row["hash_id"] for row in res]
  94. if hash_ids_db != hashes_ids_announced:
  95. hash_ids_added = set(hashes_ids_announced) - set(hash_ids_db)
  96. hash_ids_removed = set(hash_ids_db) - set(hashes_ids_announced)
  97. if ip_type != "onion" or onion_signed:
  98. for hash_id in hash_ids_added:
  99. self.execute("INSERT INTO peer_to_hash ?", {"peer_id": peer_id, "hash_id": hash_id})
  100. if hash_ids_removed and delete_missing_hashes:
  101. self.execute("DELETE FROM peer_to_hash WHERE ?", {"peer_id": peer_id, "hash_id": list(hash_ids_removed)})
  102. return len(hash_ids_added) + len(hash_ids_removed)
  103. else:
  104. return 0
  105. def peerList(self, hash, address=None, onions=[], port=None, limit=30, need_types=["ipv4", "onion"], order=True):
  106. back = {"ipv4": [], "ipv6": [], "onion": []}
  107. if limit == 0:
  108. return back
  109. hashid = self.getHashId(hash)
  110. if order:
  111. order_sql = "ORDER BY date_announced DESC"
  112. else:
  113. order_sql = ""
  114. where_sql = "hash_id = :hashid"
  115. if onions:
  116. onions_escaped = ["'%s'" % re.sub("[^a-z0-9,]", "", onion) for onion in onions if type(onion) is str]
  117. where_sql += " AND address NOT IN (%s)" % ",".join(onions_escaped)
  118. elif address:
  119. where_sql += " AND NOT (address = :address AND port = :port)"
  120. query = """
  121. SELECT type, address, port
  122. FROM peer_to_hash
  123. LEFT JOIN peer USING (peer_id)
  124. WHERE %s
  125. %s
  126. LIMIT :limit
  127. """ % (where_sql, order_sql)
  128. res = self.execute(query, {"hashid": hashid, "address": address, "port": port, "limit": limit})
  129. for row in res:
  130. if row["type"] in need_types:
  131. if row["type"] == "onion":
  132. packed = helper.packOnionAddress(row["address"], row["port"])
  133. else:
  134. packed = helper.packAddress(str(row["address"]), row["port"])
  135. back[row["type"]].append(packed)
  136. return back