123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163 |
- import time
- import re
- import gevent
- from Config import config
- from Db import Db
- from util import helper
- class BootstrapperDb(Db):
- def __init__(self):
- self.version = 6
- self.hash_ids = {} # hash -> id cache
- super(BootstrapperDb, self).__init__({"db_name": "Bootstrapper"}, "%s/bootstrapper.db" % config.data_dir)
- self.foreign_keys = True
- self.checkTables()
- self.updateHashCache()
- gevent.spawn(self.cleanup)
- def cleanup(self):
- while 1:
- time.sleep(4*60)
- timeout = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time() - 60 * 40))
- self.execute("DELETE FROM peer WHERE date_announced < ?", [timeout])
- def updateHashCache(self):
- res = self.execute("SELECT * FROM hash")
- self.hash_ids = {str(row["hash"]): row["hash_id"] for row in res}
- self.log.debug("Loaded %s hash_ids" % len(self.hash_ids))
- def checkTables(self):
- version = int(self.execute("PRAGMA user_version").fetchone()[0])
- self.log.debug("Db version: %s, needed: %s" % (version, self.version))
- if version < self.version:
- self.createTables()
- else:
- self.execute("VACUUM")
- def createTables(self):
- # Delete all tables
- self.execute("PRAGMA writable_schema = 1")
- self.execute("DELETE FROM sqlite_master WHERE type IN ('table', 'index', 'trigger')")
- self.execute("PRAGMA writable_schema = 0")
- self.execute("VACUUM")
- self.execute("PRAGMA INTEGRITY_CHECK")
- # Create new tables
- self.execute("""
- CREATE TABLE peer (
- peer_id INTEGER PRIMARY KEY ASC AUTOINCREMENT NOT NULL UNIQUE,
- port INTEGER NOT NULL,
- ip4 TEXT,
- onion TEXT UNIQUE,
- date_added DATETIME DEFAULT (CURRENT_TIMESTAMP),
- date_announced DATETIME DEFAULT (CURRENT_TIMESTAMP)
- );
- """)
- self.execute("""
- CREATE TABLE peer_to_hash (
- peer_to_hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL,
- peer_id INTEGER REFERENCES peer (peer_id) ON DELETE CASCADE,
- hash_id INTEGER REFERENCES hash (hash_id)
- );
- """)
- self.execute("CREATE INDEX peer_id ON peer_to_hash (peer_id);")
- self.execute("CREATE INDEX hash_id ON peer_to_hash (hash_id);")
- self.execute("""
- CREATE TABLE hash (
- hash_id INTEGER PRIMARY KEY AUTOINCREMENT UNIQUE NOT NULL,
- hash BLOB UNIQUE NOT NULL,
- date_added DATETIME DEFAULT (CURRENT_TIMESTAMP)
- );
- """)
- self.execute("PRAGMA user_version = %s" % self.version)
- def getHashId(self, hash):
- if hash not in self.hash_ids:
- self.log.debug("New hash: %s" % repr(hash))
- self.execute("INSERT OR IGNORE INTO hash ?", {"hash": buffer(hash)})
- self.hash_ids[hash] = self.cur.cursor.lastrowid
- return self.hash_ids[hash]
- def peerAnnounce(self, ip4=None, onion=None, port=None, hashes=[], onion_signed=False, delete_missing_hashes=False):
- hashes_ids_announced = []
- for hash in hashes:
- hashes_ids_announced.append(self.getHashId(hash))
- if not ip4 and not onion:
- return 0
- # Check user
- if onion:
- res = self.execute("SELECT peer_id FROM peer WHERE ? LIMIT 1", {"onion": onion})
- else:
- res = self.execute("SELECT peer_id FROM peer WHERE ? LIMIT 1", {"ip4": ip4, "port": port})
- user_row = res.fetchone()
- now = time.strftime("%Y-%m-%d %H:%M:%S")
- if user_row:
- peer_id = user_row["peer_id"]
- self.execute("UPDATE peer SET date_announced = ? WHERE peer_id = ?", (now, peer_id))
- else:
- self.log.debug("New peer: %s %s signed: %s" % (ip4, onion, onion_signed))
- if onion and not onion_signed:
- return len(hashes)
- self.execute("INSERT INTO peer ?", {"ip4": ip4, "onion": onion, "port": port, "date_announced": now})
- peer_id = self.cur.cursor.lastrowid
- # Check user's hashes
- res = self.execute("SELECT * FROM peer_to_hash WHERE ?", {"peer_id": peer_id})
- hash_ids_db = [row["hash_id"] for row in res]
- if hash_ids_db != hashes_ids_announced:
- hash_ids_added = set(hashes_ids_announced) - set(hash_ids_db)
- hash_ids_removed = set(hash_ids_db) - set(hashes_ids_announced)
- if not onion or onion_signed:
- for hash_id in hash_ids_added:
- self.execute("INSERT INTO peer_to_hash ?", {"peer_id": peer_id, "hash_id": hash_id})
- if hash_ids_removed and delete_missing_hashes:
- self.execute("DELETE FROM peer_to_hash WHERE ?", {"peer_id": peer_id, "hash_id": list(hash_ids_removed)})
- return len(hash_ids_added) + len(hash_ids_removed)
- else:
- return 0
- def peerList(self, hash, ip4=None, onions=[], port=None, limit=30, need_types=["ip4", "onion"], order=True):
- hash_peers = {"ip4": [], "onion": []}
- if limit == 0:
- return hash_peers
- hashid = self.getHashId(hash)
- if order:
- order_sql = "ORDER BY date_announced DESC"
- else:
- order_sql = ""
- where_sql = "hash_id = :hashid"
- if onions:
- onions_escaped = ["'%s'" % re.sub("[^a-z0-9,]", "", onion) for onion in onions if type(onion) is str]
- where_sql += " AND (onion NOT IN (%s) OR onion IS NULL)" % ",".join(onions_escaped)
- elif ip4:
- where_sql += " AND (NOT (ip4 = :ip4 AND port = :port) OR ip4 IS NULL)"
- query = """
- SELECT ip4, port, onion
- FROM peer_to_hash
- LEFT JOIN peer USING (peer_id)
- WHERE %s
- %s
- LIMIT :limit
- """ % (where_sql, order_sql)
- res = self.execute(query, {"hashid": hashid, "ip4": ip4, "onions": onions, "port": port, "limit": limit})
- for row in res:
- if row["ip4"] and "ip4" in need_types:
- hash_peers["ip4"].append(
- helper.packAddress(row["ip4"], row["port"])
- )
- if row["onion"] and "onion" in need_types:
- hash_peers["onion"].append(
- helper.packOnionAddress(row["onion"], row["port"])
- )
- return hash_peers
|