123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773 |
- import os
- import json
- import logging
- import hashlib
- import re
- import time
- import random
- import sys
- import binascii
- import struct
- import socket
- import urllib
- import urllib2
- import gevent
- import util
- from lib import bencode
- from lib.subtl.subtl import UdpTrackerClient
- from Config import config
- from Peer import Peer
- from Worker import WorkerManager
- from Debug import Debug
- from Content import ContentManager
- from SiteStorage import SiteStorage
- from Crypt import CryptHash
- from util import helper
- import SiteManager
- class Site:
- def __init__(self, address, allow_create=True):
- self.address = re.sub("[^A-Za-z0-9]", "", address) # Make sure its correct address
- self.address_short = "%s..%s" % (self.address[:6], self.address[-4:]) # Short address for logging
- self.log = logging.getLogger("Site:%s" % self.address_short)
- self.addEventListeners()
- self.content = None # Load content.json
- self.peers = {} # Key: ip:port, Value: Peer.Peer
- self.peer_blacklist = SiteManager.peer_blacklist # Ignore this peers (eg. myself)
- self.last_announce = 0 # Last announce time to tracker
- self.last_tracker_id = random.randint(0, 10) # Last announced tracker id
- self.worker_manager = WorkerManager(self) # Handle site download from other peers
- self.bad_files = {} # SHA check failed files, need to redownload {"inner.content": 1} (key: file, value: failed accept)
- self.content_updated = None # Content.js update time
- self.notifications = [] # Pending notifications displayed once on page load [error|ok|info, message, timeout]
- self.page_requested = False # Page viewed in browser
- self.storage = SiteStorage(self, allow_create=allow_create) # Save and load site files
- self.loadSettings() # Load settings from sites.json
- self.content_manager = ContentManager(self) # Load contents
- self.connection_server = None
- if "main" in sys.modules and "file_server" in dir(sys.modules["main"]): # Use global file server by default if possible
- self.connection_server = sys.modules["main"].file_server
- else:
- self.connection_server = None
- if not self.settings.get("auth_key"): # To auth user in site (Obsolete, will be removed)
- self.settings["auth_key"] = CryptHash.random()
- self.log.debug("New auth key: %s" % self.settings["auth_key"])
- self.saveSettings()
- if not self.settings.get("wrapper_key"): # To auth websocket permissions
- self.settings["wrapper_key"] = CryptHash.random()
- self.log.debug("New wrapper key: %s" % self.settings["wrapper_key"])
- self.saveSettings()
- self.websockets = [] # Active site websocket connections
- def __str__(self):
- return "Site %s" % self.address_short
- def __repr__(self):
- return "<%s>" % self.__str__()
- # Load site settings from data/sites.json
- def loadSettings(self):
- sites_settings = json.load(open("%s/sites.json" % config.data_dir))
- if self.address in sites_settings:
- self.settings = sites_settings[self.address]
- else:
- if self.address == config.homepage: # Add admin permissions to homepage
- permissions = ["ADMIN"]
- else:
- permissions = []
- self.settings = {"own": False, "serving": True, "permissions": permissions} # Default
- return
- # Save site settings to data/sites.json
- def saveSettings(self):
- sites_settings = json.load(open("%s/sites.json" % config.data_dir))
- sites_settings[self.address] = self.settings
- helper.atomicWrite("%s/sites.json" % config.data_dir, json.dumps(sites_settings, indent=2, sort_keys=True))
- # Max site size in MB
- def getSizeLimit(self):
- return self.settings.get("size_limit", config.size_limit)
- # Next size limit based on current size
- def getNextSizeLimit(self):
- size_limits = [10, 20, 50, 100, 200, 500, 1000, 2000, 5000, 10000, 20000, 50000, 100000]
- size = self.settings.get("size", 0)
- for size_limit in size_limits:
- if size * 1.2 < size_limit * 1024 * 1024:
- return size_limit
- return 999999
- # Download all file from content.json
- def downloadContent(self, inner_path, download_files=True, peer=None, check_modifications=False):
- s = time.time()
- self.log.debug("Downloading %s..." % inner_path)
- found = self.needFile(inner_path, update=self.bad_files.get(inner_path))
- content_inner_dir = helper.getDirname(inner_path)
- if not found:
- self.log.debug("Download %s failed, check_modifications: %s" % (inner_path, check_modifications))
- if check_modifications: # Download failed, but check modifications if its succed later
- self.onFileDone.once(lambda file_name: self.checkModifications(0), "check_modifications")
- return False # Could not download content.json
- self.log.debug("Got %s" % inner_path)
- changed, deleted = self.content_manager.loadContent(inner_path, load_includes=False)
- # Start download files
- file_threads = []
- if download_files:
- for file_relative_path in self.content_manager.contents[inner_path].get("files", {}).keys():
- file_inner_path = content_inner_dir + file_relative_path
- # Start download and dont wait for finish, return the event
- res = self.needFile(file_inner_path, blocking=False, update=self.bad_files.get(file_inner_path), peer=peer)
- if res is not True and res is not False: # Need downloading and file is allowed
- file_threads.append(res) # Append evt
- # Wait for includes download
- include_threads = []
- for file_relative_path in self.content_manager.contents[inner_path].get("includes", {}).keys():
- file_inner_path = content_inner_dir + file_relative_path
- include_thread = gevent.spawn(self.downloadContent, file_inner_path, download_files=download_files, peer=peer)
- include_threads.append(include_thread)
- self.log.debug("%s: Downloading %s includes..." % (inner_path, len(include_threads)))
- gevent.joinall(include_threads)
- self.log.debug("%s: Includes download ended" % inner_path)
- if check_modifications: # Check if every file is up-to-date
- self.checkModifications(0)
- self.log.debug("%s: Downloading %s files, changed: %s..." % (inner_path, len(file_threads), len(changed)))
- gevent.joinall(file_threads)
- self.log.debug("%s: DownloadContent ended in %.2fs" % (inner_path, time.time() - s))
- return True
- # Return bad files with less than 3 retry
- def getReachableBadFiles(self):
- if not self.bad_files:
- return False
- return [bad_file for bad_file, retry in self.bad_files.iteritems() if retry < 3]
- # Retry download bad files
- def retryBadFiles(self):
- for bad_file in self.bad_files.keys():
- self.needFile(bad_file, update=True, blocking=False)
- # Download all files of the site
- @util.Noparallel(blocking=False)
- def download(self, check_size=False, blind_includes=False):
- self.log.debug(
- "Start downloading, bad_files: %s, check_size: %s, blind_includes: %s" %
- (self.bad_files, check_size, blind_includes)
- )
- gevent.spawn(self.announce)
- if check_size: # Check the size first
- valid = self.downloadContent("content.json", download_files=False) # Just download content.json files
- if not valid:
- return False # Cant download content.jsons or size is not fits
- # Download everything
- valid = self.downloadContent("content.json", check_modifications=blind_includes)
- self.retryBadFiles()
- return valid
- # Update worker, try to find client that supports listModifications command
- def updater(self, peers_try, queried, since):
- while 1:
- if not peers_try or len(queried) >= 3: # Stop after 3 successful query
- break
- peer = peers_try.pop(0)
- if not peer.connection and len(queried) < 2:
- peer.connect() # Only open new connection if less than 2 queried already
- if not peer.connection or peer.connection.handshake.get("rev", 0) < 126:
- continue # Not compatible
- res = peer.listModified(since)
- if not res or "modified_files" not in res:
- continue # Failed query
- queried.append(peer)
- for inner_path, modified in res["modified_files"].iteritems(): # Check if the peer has newer files than we
- content = self.content_manager.contents.get(inner_path)
- if (not content or modified > content["modified"]) and inner_path not in self.bad_files:
- self.log.debug("New modified file from %s: %s" % (peer, inner_path))
- # We dont have this file or we have older
- self.bad_files[inner_path] = self.bad_files.get(inner_path, 0) + 1 # Mark as bad file
- gevent.spawn(self.downloadContent, inner_path) # Download the content.json + the changed files
- # Check modified content.json files from peers and add modified files to bad_files
- # Return: Successfully queried peers [Peer, Peer...]
- def checkModifications(self, since=None):
- peers_try = [] # Try these peers
- queried = [] # Successfully queried from these peers
- peers = self.peers.values()
- random.shuffle(peers)
- for peer in peers: # Try to find connected good peers, but we must have at least 5 peers
- if peer.findConnection() and peer.connection.handshake.get("rev", 0) > 125: # Add to the beginning if rev125
- peers_try.insert(0, peer)
- elif len(peers_try) < 5: # Backup peers, add to end of the try list
- peers_try.append(peer)
- if since is None: # No since definied, download from last modification time-1day
- since = self.settings.get("modified", 60 * 60 * 24) - 60 * 60 * 24
- self.log.debug("Try to get listModifications from peers: %s since: %s" % (peers_try, since))
- updaters = []
- for i in range(3):
- updaters.append(gevent.spawn(self.updater, peers_try, queried, since))
- gevent.joinall(updaters, timeout=10) # Wait 10 sec to workers done query modifications
- time.sleep(0.1)
- self.log.debug("Queried listModifications from: %s" % queried)
- return queried
- # Update content.json from peers and download changed files
- # Return: None
- @util.Noparallel()
- def update(self, announce=False):
- self.content_manager.loadContent("content.json") # Reload content.json
- self.content_updated = None # Reset content updated time
- self.updateWebsocket(updating=True)
- if announce:
- self.announce()
- queried = self.checkModifications()
- if not queried: # Not found any client that supports listModifications
- self.log.debug("Fallback to old-style update")
- self.redownloadContents()
- if not self.settings["own"]:
- self.storage.checkFiles(quick_check=True) # Quick check files based on file size
- changed, deleted = self.content_manager.loadContent("content.json")
- if self.bad_files:
- self.download()
- self.settings["size"] = self.content_manager.getTotalSize() # Update site size
- self.updateWebsocket(updated=True)
- # Update site by redownload all content.json
- def redownloadContents(self):
- # Download all content.json again
- content_threads = []
- for inner_path in self.content_manager.contents.keys():
- content_threads.append(self.needFile(inner_path, update=True, blocking=False))
- self.log.debug("Waiting %s content.json to finish..." % len(content_threads))
- gevent.joinall(content_threads)
- # Publish worker
- def publisher(self, inner_path, peers, published, limit, event_done=None):
- file_size = self.storage.getSize(inner_path)
- body = self.storage.read(inner_path)
- while 1:
- if not peers or len(published) >= limit:
- if event_done:
- event_done.set(True)
- break # All peers done, or published engouht
- peer = peers.pop(0)
- if peer.connection and peer.connection.last_ping_delay: # Peer connected
- # Timeout: 5sec + size in kb + last_ping
- timeout = timeout = 5 + int(file_size / 1024) + peer.connection.last_ping_delay
- else: # Peer not connected
- # Timeout: 5sec + size in kb
- timeout = timeout = 5 + int(file_size / 1024)
- result = {"exception": "Timeout"}
- for retry in range(2):
- try:
- with gevent.Timeout(timeout, False):
- result = peer.request("update", {
- "site": self.address,
- "inner_path": inner_path,
- "body": body,
- "peer": (config.ip_external, config.fileserver_port)
- })
- if result:
- break
- except Exception, err:
- result = {"exception": Debug.formatException(err)}
- if result and "ok" in result:
- published.append(peer)
- self.log.info("[OK] %s: %s" % (peer.key, result["ok"]))
- else:
- if result == {"exception": "Timeout"}:
- peer.onConnectionError()
- self.log.info("[FAILED] %s: %s" % (peer.key, result))
- # Update content.json on peers
- @util.Noparallel()
- def publish(self, limit=5, inner_path="content.json"):
- published = [] # Successfully published (Peer)
- publishers = [] # Publisher threads
- connected_peers = self.getConnectedPeers()
- if len(connected_peers) > limit * 2: # Publish to already connected peers if possible
- peers = connected_peers
- else:
- peers = self.peers.values()
- self.log.info("Publishing to %s/%s peers (connected: %s)..." % (
- min(len(self.peers), limit), len(self.peers), len(connected_peers)
- ))
- if not peers:
- return 0 # No peers found
- random.shuffle(peers)
- event_done = gevent.event.AsyncResult()
- for i in range(min(len(self.peers), limit, 5)): # Max 5 thread
- publisher = gevent.spawn(self.publisher, inner_path, peers, published, limit, event_done)
- publishers.append(publisher)
- event_done.get() # Wait for done
- if len(published) < min(len(self.peers), limit):
- time.sleep(0.2) # If less than we need sleep a bit
- if len(published) == 0:
- gevent.joinall(publishers) # No successful publish, wait for all publisher
- # Make sure the connected passive peers got the update
- passive_peers = [
- peer for peer in peers
- if peer.connection and not peer.connection.closed and peer.key.endswith(":0") and peer not in published
- ] # Every connected passive peer that we not published to
- self.log.info(
- "Successfuly published to %s peers, publishing to %s more passive peers" %
- (len(published), len(passive_peers))
- )
- for peer in passive_peers:
- gevent.spawn(self.publisher, inner_path, passive_peers, published, limit=10)
- return len(published)
- # Copy this site
- def clone(self, address, privatekey=None, address_index=None, overwrite=False):
- import shutil
- new_site = SiteManager.site_manager.need(address, all_file=False)
- default_dirs = [] # Dont copy these directories (has -default version)
- for dir_name in os.listdir(self.storage.directory):
- if "-default" in dir_name:
- default_dirs.append(dir_name.replace("-default", ""))
- self.log.debug("Cloning to %s, ignore dirs: %s" % (address, default_dirs))
- # Copy root content.json
- if not new_site.storage.isFile("content.json") and not overwrite:
- # Content.json not exist yet, create a new one from source site
- content_json = self.storage.loadJson("content.json")
- if "domain" in content_json:
- del content_json["domain"]
- content_json["title"] = "my" + content_json["title"]
- content_json["cloned_from"] = self.address
- if address_index:
- content_json["address_index"] = address_index # Site owner's BIP32 index
- new_site.storage.writeJson("content.json", content_json)
- new_site.content_manager.loadContent(
- "content.json", add_bad_files=False, delete_removed_files=False, load_includes=False
- )
- # Copy files
- for content_inner_path, content in self.content_manager.contents.items():
- for file_relative_path in sorted(content["files"].keys()):
- file_inner_path = helper.getDirname(content_inner_path) + file_relative_path # Relative to content.json
- file_inner_path = file_inner_path.strip("/") # Strip leading /
- if file_inner_path.split("/")[0] in default_dirs: # Dont copy directories that has -default postfixed alternative
- self.log.debug("[SKIP] %s (has default alternative)" % file_inner_path)
- continue
- file_path = self.storage.getPath(file_inner_path)
- # Copy the file normally to keep the -default postfixed dir and file to allow cloning later
- file_path_dest = new_site.storage.getPath(file_inner_path)
- self.log.debug("[COPY] %s to %s..." % (file_inner_path, file_path_dest))
- dest_dir = os.path.dirname(file_path_dest)
- if not os.path.isdir(dest_dir):
- os.makedirs(dest_dir)
- shutil.copy(file_path, file_path_dest)
- # If -default in path, create a -default less copy of the file
- if "-default" in file_inner_path:
- file_path_dest = new_site.storage.getPath(file_inner_path.replace("-default", ""))
- if new_site.storage.isFile(file_path_dest) and not overwrite: # Don't overwrite site files with default ones
- self.log.debug("[SKIP] Default file: %s (already exist)" % file_inner_path)
- continue
- self.log.debug("[COPY] Default file: %s to %s..." % (file_inner_path, file_path_dest))
- dest_dir = os.path.dirname(file_path_dest)
- if not os.path.isdir(dest_dir):
- os.makedirs(dest_dir)
- shutil.copy(file_path, file_path_dest)
- # Sign if content json
- if file_path_dest.endswith("/content.json"):
- new_site.storage.onUpdated(file_inner_path.replace("-default", ""))
- new_site.content_manager.loadContent(
- file_inner_path.replace("-default", ""), add_bad_files=False,
- delete_removed_files=False, load_includes=False
- )
- if privatekey:
- new_site.content_manager.sign(file_inner_path.replace("-default", ""), privatekey)
- new_site.content_manager.loadContent(
- file_inner_path, add_bad_files=False, delete_removed_files=False, load_includes=False
- )
- if privatekey:
- new_site.content_manager.sign("content.json", privatekey)
- new_site.content_manager.loadContent(
- "content.json", add_bad_files=False, delete_removed_files=False, load_includes=False
- )
- # Rebuild DB
- if new_site.storage.isFile("dbschema.json"):
- new_site.storage.closeDb()
- new_site.storage.rebuildDb()
- return new_site
- # Check and download if file not exist
- def needFile(self, inner_path, update=False, blocking=True, peer=None, priority=0):
- if self.storage.isFile(inner_path) and not update: # File exist, no need to do anything
- return True
- elif self.settings["serving"] is False: # Site not serving
- return False
- else: # Wait until file downloaded
- self.bad_files[inner_path] = self.bad_files.get(inner_path, 0) + 1 # Mark as bad file
- if not self.content_manager.contents.get("content.json"): # No content.json, download it first!
- self.log.debug("Need content.json first")
- gevent.spawn(self.announce)
- if inner_path != "content.json": # Prevent double download
- task = self.worker_manager.addTask("content.json", peer)
- task.get()
- self.content_manager.loadContent()
- if not self.content_manager.contents.get("content.json"):
- return False # Content.json download failed
- if not inner_path.endswith("content.json") and not self.content_manager.getFileInfo(inner_path):
- # No info for file, download all content.json first
- self.log.debug("No info for %s, waiting for all content.json" % inner_path)
- success = self.downloadContent("content.json", download_files=False)
- if not success:
- return False
- if not self.content_manager.getFileInfo(inner_path):
- return False # Still no info for file
- task = self.worker_manager.addTask(inner_path, peer, priority=priority)
- if blocking:
- return task.get()
- else:
- return task
- # Add or update a peer to site
- # return_peer: Always return the peer even if it was already present
- def addPeer(self, ip, port, return_peer=False):
- if not ip:
- return False
- if (ip, port) in self.peer_blacklist:
- return False # Ignore blacklist (eg. myself)
- key = "%s:%s" % (ip, port)
- if key in self.peers: # Already has this ip
- self.peers[key].found()
- if return_peer: # Always return peer
- return self.peers[key]
- else:
- return False
- else: # New peer
- peer = Peer(ip, port, self)
- self.peers[key] = peer
- return peer
- # Gather peer from connected peers
- @util.Noparallel(blocking=False)
- def announcePex(self, query_num=2, need_num=5):
- peers = [peer for peer in self.peers.values() if peer.connection and peer.connection.connected] # Connected peers
- if len(peers) == 0: # Small number of connected peers for this site, connect to any
- self.log.debug("Small number of peers detected...query all of peers using pex")
- peers = self.peers.values()
- need_num = 10
- random.shuffle(peers)
- done = 0
- added = 0
- for peer in peers:
- if peer.connection: # Has connection
- if "port_opened" in peer.connection.handshake: # This field added recently, so probably has has peer exchange
- res = peer.pex(need_num=need_num)
- else:
- res = False
- else: # No connection
- res = peer.pex(need_num=need_num)
- if type(res) == int: # We have result
- done += 1
- added += res
- if res:
- self.worker_manager.onPeers()
- self.updateWebsocket(peers_added=res)
- if done == query_num:
- break
- self.log.debug("Queried pex from %s peers got %s new peers." % (done, added))
- # Gather peers from tracker
- # Return: Complete time or False on error
- def announceTracker(self, protocol, address, fileserver_port, address_hash, my_peer_id):
- s = time.time()
- if protocol == "udp": # Udp tracker
- if config.disable_udp:
- return False # No udp supported
- ip, port = address.split(":")
- tracker = UdpTrackerClient(ip, int(port))
- tracker.peer_port = fileserver_port
- try:
- tracker.connect()
- tracker.poll_once()
- tracker.announce(info_hash=address_hash, num_want=50)
- back = tracker.poll_once()
- peers = back["response"]["peers"]
- except Exception, err:
- return False
- else: # Http tracker
- params = {
- 'info_hash': binascii.a2b_hex(address_hash),
- 'peer_id': my_peer_id, 'port': fileserver_port,
- 'uploaded': 0, 'downloaded': 0, 'left': 0, 'compact': 1, 'numwant': 30,
- 'event': 'started'
- }
- req = None
- try:
- url = "http://" + address + "?" + urllib.urlencode(params)
- # Load url
- with gevent.Timeout(10, False): # Make sure of timeout
- req = urllib2.urlopen(url, timeout=8)
- response = req.read()
- req.fp._sock.recv = None # Hacky avoidance of memory leak for older python versions
- req.close()
- req = None
- if not response:
- self.log.debug("Http tracker %s response error" % url)
- return False
- # Decode peers
- peer_data = bencode.decode(response)["peers"]
- response = None
- peer_count = len(peer_data) / 6
- peers = []
- for peer_offset in xrange(peer_count):
- off = 6 * peer_offset
- peer = peer_data[off:off + 6]
- addr, port = struct.unpack('!LH', peer)
- peers.append({"addr": socket.inet_ntoa(struct.pack('!L', addr)), "port": port})
- except Exception, err:
- self.log.debug("Http tracker %s error: %s" % (url, err))
- if req:
- req.close()
- req = None
- return False
- # Adding peers
- added = 0
- for peer in peers:
- if not peer["port"]:
- continue # Dont add peers with port 0
- if self.addPeer(peer["addr"], peer["port"]):
- added += 1
- if added:
- self.worker_manager.onPeers()
- self.updateWebsocket(peers_added=added)
- self.log.debug("Found %s peers, new: %s" % (len(peers), added))
- return time.time() - s
- # Add myself and get other peers from tracker
- def announce(self, force=False, num=5, pex=True):
- if time.time() < self.last_announce + 30 and not force:
- return # No reannouncing within 30 secs
- self.last_announce = time.time()
- trackers = config.trackers
- if num == 1: # Only announce on one tracker, increment the queried tracker id
- self.last_tracker_id += 1
- self.last_tracker_id = self.last_tracker_id % len(trackers)
- trackers = [trackers[self.last_tracker_id]] # We only going to use this one
- errors = []
- slow = []
- address_hash = hashlib.sha1(self.address).hexdigest() # Site address hash
- my_peer_id = sys.modules["main"].file_server.peer_id
- if sys.modules["main"].file_server.port_opened:
- fileserver_port = config.fileserver_port
- else: # Port not opened, report port 0
- fileserver_port = 0
- s = time.time()
- announced = 0
- threads = []
- for tracker in trackers: # Start announce threads
- protocol, address = tracker.split("://")
- thread = gevent.spawn(self.announceTracker, protocol, address, fileserver_port, address_hash, my_peer_id)
- threads.append(thread)
- thread.address = address
- thread.protocol = protocol
- if len(threads) > num: # Announce limit
- break
- gevent.joinall(threads) # Wait for announce finish
- for thread in threads:
- if thread.value:
- if thread.value > 1:
- slow.append("%.2fs %s://%s" % (thread.value, thread.protocol, thread.address))
- announced += 1
- else:
- errors.append("%s://%s" % (thread.protocol, thread.address))
- # Save peers num
- self.settings["peers"] = len(self.peers)
- self.saveSettings()
- if len(errors) < min(num, len(trackers)): # Less errors than total tracker nums
- self.log.debug(
- "Announced port %s to %s trackers in %.3fs, errors: %s, slow: %s" %
- (fileserver_port, announced, time.time() - s, errors, slow)
- )
- else:
- self.log.error("Announce to %s trackers in %.3fs, failed" % (announced, time.time() - s))
- if pex:
- if not [peer for peer in self.peers.values() if peer.connection and peer.connection.connected]:
- # If no connected peer yet then wait for connections
- gevent.spawn_later(3, self.announcePex, need_num=10) # Spawn 3 secs later
- else: # Else announce immediately
- self.announcePex()
- # Keep connections to get the updates (required for passive clients)
- def needConnections(self, num=3):
- need = min(len(self.peers), num) # Need 3 peer, but max total peers
- connected = 0
- for peer in self.peers.values(): # Check current connected number
- if peer.connection and peer.connection.connected:
- connected += 1
- self.log.debug("Need connections: %s, Current: %s, Total: %s" % (need, connected, len(self.peers)))
- if connected < need: # Need more than we have
- for peer in self.peers.values():
- if not peer.connection or not peer.connection.connected: # No peer connection or disconnected
- peer.pex() # Initiate peer exchange
- if peer.connection and peer.connection.connected:
- connected += 1 # Successfully connected
- if connected >= need:
- break
- return connected
- # Return: Probably working, connectable Peers
- def getConnectablePeers(self, need_num=5, ignore=[]):
- peers = self.peers.values()
- random.shuffle(peers)
- found = []
- for peer in peers:
- if peer.key.endswith(":0"):
- continue # Not connectable
- if not peer.connection:
- continue # No connection
- if peer.key in ignore:
- continue # The requester has this peer
- if time.time() - peer.connection.last_recv_time > 60 * 60 * 2: # Last message more than 2 hours ago
- peer.connection = None # Cleanup: Dead connection
- continue
- found.append(peer)
- if len(found) >= need_num:
- break # Found requested number of peers
- if (not found and not ignore) or (need_num > 5 and need_num < 100 and len(found) < need_num):
- # Return not that good peers: Not found any peer and the requester dont have any or cant give enough peer
- found = [peer for peer in peers if not peer.key.endswith(":0") and peer.key not in ignore][0:need_num - len(found)]
- return found
- def getConnectedPeers(self):
- return [peer for peer in self.peers.values() if peer.connection and peer.connection.connected]
- # Cleanup probably dead peers
- def cleanupPeers(self):
- peers = self.peers.values()
- if len(peers) < 20:
- return False
- removed = 0
- for peer in peers:
- if peer.connection and peer.connection.connected:
- continue
- if peer.connection and not peer.connection.connected:
- peer.connection = None # Dead connection
- if time.time() - peer.last_found > 60 * 60 * 4: # Not found on tracker or via pex in last 4 hour
- peer.remove()
- removed += 1
- if removed > 5: # Don't remove too much at once
- break
- if removed:
- self.log.debug("Cleanup peers result: Removed %s, left: %s" % (removed, len(self.peers)))
- # - Events -
- # Add event listeners
- def addEventListeners(self):
- self.onFileStart = util.Event() # If WorkerManager added new task
- self.onFileDone = util.Event() # If WorkerManager successfully downloaded a file
- self.onFileFail = util.Event() # If WorkerManager failed to download a file
- self.onComplete = util.Event() # All file finished
- self.onFileStart.append(lambda inner_path: self.fileStarted()) # No parameters to make Noparallel batching working
- self.onFileDone.append(lambda inner_path: self.fileDone(inner_path))
- self.onFileFail.append(lambda inner_path: self.fileFailed(inner_path))
- # Send site status update to websocket clients
- def updateWebsocket(self, **kwargs):
- if kwargs:
- param = {"event": kwargs.items()[0]}
- else:
- param = None
- for ws in self.websockets:
- ws.event("siteChanged", self, param)
- # File download started
- @util.Noparallel(blocking=False)
- def fileStarted(self):
- time.sleep(0.001) # Wait for other files adds
- self.updateWebsocket(file_started=True)
- # File downloaded successful
- def fileDone(self, inner_path):
- # File downloaded, remove it from bad files
- if inner_path in self.bad_files:
- self.log.debug("Bad file solved: %s" % inner_path)
- del(self.bad_files[inner_path])
- # Update content.json last downlad time
- if inner_path == "content.json":
- self.content_updated = time.time()
- self.updateWebsocket(file_done=inner_path)
- # File download failed
- def fileFailed(self, inner_path):
- if inner_path == "content.json":
- self.content_updated = False
- self.log.debug("Can't update content.json")
- if inner_path in self.bad_files:
- self.bad_files[inner_path] = self.bad_files.get(inner_path, 0) + 1
- self.updateWebsocket(file_failed=inner_path)
|