import time import os import subprocess import shutil import collections import math import msgpack import gevent import gevent.lock from Plugin import PluginManager from Debug import Debug from Crypt import CryptHash from lib import merkletools from util import helper import util from BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked # We can only import plugin host clases after the plugins are loaded @PluginManager.afterLoad def importPluginnedClasses(): global VerifyError, config from Content.ContentManager import VerifyError from Config import config if "upload_nonces" not in locals(): upload_nonces = {} @PluginManager.registerTo("UiRequest") class UiRequestPlugin(object): def isCorsAllowed(self, path): if path == "/ZeroNet-Internal/BigfileUpload": return True else: return super(UiRequestPlugin, self).isCorsAllowed(path) def actionBigfileUpload(self): nonce = self.get.get("upload_nonce") if nonce not in upload_nonces: return self.error403("Upload nonce error.") upload_info = upload_nonces[nonce] del upload_nonces[nonce] self.sendHeader(200, "text/html", noscript=True, extra_headers={ "Access-Control-Allow-Origin": "null", "Access-Control-Allow-Credentials": "true" }) self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers site = upload_info["site"] inner_path = upload_info["inner_path"] with, "wb", create_dirs=True) as out_file: merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile( self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file ) if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split hash = piecemap_info["sha512_pieces"][0].encode("hex") hash_id = site.content_manager.hashfield.getHashId(hash) site.content_manager.optionalDownloaded(inner_path, hash_id, upload_info["size"], own=True) else: # Big file file_name = helper.getFilename(inner_path) msgpack.pack({file_name: piecemap_info},["piecemap"], "wb")) # Find piecemap and file relative path to content.json file_info = site.content_manager.getFileInfo(inner_path, new_file=True) content_inner_path_dir = helper.getDirname(file_info["content_inner_path"]) piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):] file_relative_path = inner_path[len(content_inner_path_dir):] # Add file to content.json if["content_inner_path"]): content =["content_inner_path"]) else: content = {} if "files_optional" not in content: content["files_optional"] = {} content["files_optional"][file_relative_path] = { "sha512": merkle_root, "size": upload_info["size"], "piecemap": piecemap_relative_path, "piece_size": piece_size } merkle_root_hash_id = site.content_manager.hashfield.getHashId(merkle_root) site.content_manager.optionalDownloaded(inner_path, merkle_root_hash_id, upload_info["size"], own=True)["content_inner_path"], content) site.content_manager.contents.loadItem(file_info["content_inner_path"]) # reload cache return { "merkle_root": merkle_root, "piece_num": len(piecemap_info["sha512_pieces"]), "piece_size": piece_size, "inner_path": inner_path } def readMultipartHeaders(self, wsgi_input): for i in range(100): line = wsgi_input.readline() if line == "\r\n": break return i def actionFile(self, file_path, *args, **kwargs): if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB path_parts = kwargs["path_parts"] site = self.server.site_manager.get(path_parts["address"]) big_file =["inner_path"], prebuffer=2 * 1024 * 1024) if big_file: kwargs["file_obj"] = big_file kwargs["file_size"] = big_file.size return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs) @PluginManager.registerTo("UiWebsocket") class UiWebsocketPlugin(object): def actionBigfileUploadInit(self, to, inner_path, size): valid_signers = auth_address = self.user.getAuthAddress( if not["own"] and auth_address not in valid_signers: self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers)) return self.response(to, {"error": "Forbidden, you can only modify your own files"}) nonce = CryptHash.random() piece_size = 1024 * 1024 inner_path = file_info =, new_file=True) content_inner_path_dir = helper.getDirname(file_info["content_inner_path"]) file_relative_path = inner_path[len(content_inner_path_dir):] upload_nonces[nonce] = { "added": time.time(), "site":, "inner_path": inner_path, "websocket_client": self, "size": size, "piece_size": piece_size, "piecemap": inner_path + ".piecemap.msgpack" } return { "url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce, "piece_size": piece_size, "inner_path": inner_path, "file_relative_path": file_relative_path } def actionSiteSetAutodownloadBigfileLimit(self, to, limit): permissions = self.getPermissions(to) if "ADMIN" not in permissions: return self.response(to, "You don't have permission to run this command")["autodownload_bigfile_size_limit"] = int(limit) self.response(to, "ok") def actionFileDelete(self, to, inner_path): piecemap_inner_path = inner_path + ".piecemap.msgpack" if self.hasFilePermission(inner_path) and # Also delete .piecemap.msgpack file if exists self.log.debug("Deleting piecemap: %s" % piecemap_inner_path) file_info = if file_info: content_json =["content_inner_path"]) relative_path = file_info["relative_path"] if relative_path in content_json.get("files_optional", {}): del content_json["files_optional"][relative_path]["content_inner_path"], content_json)["content_inner_path"], add_bad_files=False, force=True) try: except Exception, err: self.log.error("File %s delete error: %s" % (piecemap_inner_path, err)) return super(UiWebsocketPlugin, self).actionFileDelete(to, inner_path) @PluginManager.registerTo("ContentManager") class ContentManagerPlugin(object): def getFileInfo(self, inner_path, *args, **kwargs): if "|" not in inner_path: return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs) inner_path, file_range = inner_path.split("|") pos_from, pos_to = map(int, file_range.split("-")) file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs) return file_info def readFile(self, file_in, size, buff_size=1024 * 64): part_num = 0 recv_left = size while 1: part_num += 1 read_size = min(buff_size, recv_left) part = if not part: break yield part if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload time.sleep(0.001) recv_left -= read_size if recv_left <= 0: break def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None):["has_bigfile"] = True recv = 0 try: piece_hash = CryptHash.sha512t() piece_hashes = [] piece_recv = 0 mt = merkletools.MerkleTools() mt.hash_function = CryptHash.sha512t part = "" for part in self.readFile(file_in, size): if file_out: file_out.write(part) recv += len(part) piece_recv += len(part) piece_hash.update(part) if piece_recv >= piece_size: piece_digest = piece_hash.digest() piece_hashes.append(piece_digest) mt.leaves.append(piece_digest) piece_hash = CryptHash.sha512t() piece_recv = 0 if len(piece_hashes) % 100 == 0 or recv == size:"- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % ( float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024 )) part = "" if len(part) > 0: piece_digest = piece_hash.digest() piece_hashes.append(piece_digest) mt.leaves.append(piece_digest) except Exception as err: raise err finally: if file_out: file_out.close() mt.make_tree() return mt.get_merkle_root(), piece_size, { "sha512_pieces": piece_hashes } def hashFile(self, dir_inner_path, file_relative_path, optional=False): inner_path = dir_inner_path + file_relative_path file_size = # Only care about optional files >1MB if not optional or file_size < 1 * 1024 * 1024: return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional) back = {} content = self.contents.get(dir_inner_path + "content.json") hash = None piecemap_relative_path = None piece_size = None # Don't re-hash if it's already in content.json if content and file_relative_path in content.get("files_optional", {}): file_node = content["files_optional"][file_relative_path] if file_node["size"] == file_size:"- [SAME SIZE] %s" % file_relative_path) hash = file_node.get("sha512") piecemap_relative_path = file_node.get("piecemap") piece_size = file_node.get("piece_size") if not hash or not piecemap_relative_path: # Not in content.json yet if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)"- [HASHING] %s" % file_relative_path) merkle_root, piece_size, piecemap_info = self.hashBigfile(, "rb"), file_size) if not hash: hash = merkle_root if not piecemap_relative_path: file_name = helper.getFilename(file_relative_path) piecemap_relative_path = file_relative_path + ".piecemap.msgpack" piecemap_inner_path = inner_path + ".piecemap.msgpack" msgpack.pack({file_name: piecemap_info},, "wb")) back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, optional=True)) piece_num = int(math.ceil(float(file_size) / piece_size)) # Add the merkle root to hashfield hash_id = self.optionalDownloaded(inner_path, hash_id, file_size, own=True)[hash].fromstring("1" * piece_num) back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size} return back def getPiecemap(self, inner_path): file_info = piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"], priority=20) piecemap = msgpack.unpack([helper.getFilename(inner_path)] piecemap["piece_size"] = file_info["piece_size"] return piecemap def verifyPiece(self, inner_path, pos, piece): piecemap = self.getPiecemap(inner_path) piece_i = pos / piecemap["piece_size"] if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]: raise VerifyError("Invalid hash") return True def verifyFile(self, inner_path, file, ignore_same=True): if "|" not in inner_path: return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same) inner_path, file_range = inner_path.split("|") pos_from, pos_to = map(int, file_range.split("-")) return self.verifyPiece(inner_path, pos_from, file) def optionalDownloaded(self, inner_path, hash_id, size=None, own=False): if "|" in inner_path: inner_path, file_range = inner_path.split("|") pos_from, pos_to = map(int, file_range.split("-")) file_info = self.getFileInfo(inner_path) # Mark piece downloaded piece_i = pos_from / file_info["piece_size"][file_info["sha512"]][piece_i] = True # Only add to site size on first request if hash_id in self.hashfield: size = 0 elif size > 1024 * 1024: file_info = self.getFileInfo(inner_path) if file_info and "sha512" in file_info: # We already have the file, but not in piecefield sha512 = file_info["sha512"] if sha512 not in return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own) def optionalRemoved(self, inner_path, hash_id, size=None): if size and size > 1024 * 1024: file_info = self.getFileInfo(inner_path) sha512 = file_info["sha512"] if sha512 in del[sha512] # Also remove other pieces of the file from download queue for key in if key.startswith(inner_path + "|"): del[key] return super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size) @PluginManager.registerTo("SiteStorage") class SiteStoragePlugin(object): def __init__(self, *args, **kwargs): super(SiteStoragePlugin, self).__init__(*args, **kwargs) self.piecefields = collections.defaultdict(BigfilePiecefield) if "piecefields" in"cache", {}): for sha512, piecefield_packed in["cache"].get("piecefields").iteritems(): if piecefield_packed: self.piecefields[sha512].unpack(piecefield_packed.decode("base64"))["cache"]["piecefields"] = {} def createSparseFile(self, inner_path, size, sha512=None): file_path = self.getPath(inner_path) file_dir = os.path.dirname(file_path) if not os.path.isdir(file_dir): os.makedirs(file_dir) f = open(file_path, 'wb') f.truncate(min(1024 * 1024 * 5, size)) # Only pre-allocate up to 5MB f.close() if == "nt": startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo) if sha512 and sha512 in self.piecefields: self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path) del self.piecefields[sha512] def write(self, inner_path, content): if "|" not in inner_path: return super(SiteStoragePlugin, self).write(inner_path, content) # Write to specific position by passing |{pos} after the filename inner_path, file_range = inner_path.split("|") pos_from, pos_to = map(int, file_range.split("-")) file_path = self.getPath(inner_path) # Create dir if not exist file_dir = os.path.dirname(file_path) if not os.path.isdir(file_dir): os.makedirs(file_dir) if not os.path.isfile(file_path): file_info = self.createSparseFile(inner_path, file_info["size"]) # Write file with open(file_path, "rb+") as file: if hasattr(content, 'read'): # File-like object shutil.copyfileobj(content, file) # Write buff to disk else: # Simple string file.write(content) del content self.onUpdated(inner_path) def checkBigfile(self, inner_path): file_info = if not file_info or (file_info and "piecemap" not in file_info): # It's not a big file return False["has_bigfile"] = True file_path = self.getPath(inner_path) sha512 = file_info["sha512"] piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"])) if os.path.isfile(file_path): if sha512 not in self.piecefields: if open(file_path).read(128) == "\0" * 128: piece_data = "0" else: piece_data = "1" self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data)) self.piecefields[sha512].fromstring(piece_data * piece_num) else: self.log.debug("Creating bigfile: %s" % inner_path) self.createSparseFile(inner_path, file_info["size"], sha512) self.piecefields[sha512].fromstring("0" * piece_num) return True def openBigfile(self, inner_path, prebuffer=0): if not self.checkBigfile(inner_path): return False, blocking=False) # Download piecemap return BigFile(, inner_path, prebuffer=prebuffer) class BigFile(object): def __init__(self, site, inner_path, prebuffer=0): = site self.inner_path = inner_path file_path = file_info = self.piece_size = file_info["piece_size"] self.sha512 = file_info["sha512"] self.size = file_info["size"] self.prebuffer = prebuffer self.read_bytes = 0 self.piecefield =[self.sha512] self.f = open(file_path, "rb+") self.read_lock = gevent.lock.Semaphore() def read(self, buff=64 * 1024): with self.read_lock: pos = self.f.tell() read_until = min(self.size, pos + buff) requests = [] # Request all required blocks while 1: piece_i = pos / self.piece_size if piece_i * self.piece_size >= read_until: break pos_from = piece_i * self.piece_size pos_to = pos_from + self.piece_size if not self.piecefield[piece_i]: requests.append("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10)) pos += self.piece_size if not all(requests): return None # Request prebuffer if self.prebuffer: prebuffer_until = min(self.size, read_until + self.prebuffer) priority = 3 while 1: piece_i = pos / self.piece_size if piece_i * self.piece_size >= prebuffer_until: break pos_from = piece_i * self.piece_size pos_to = pos_from + self.piece_size if not self.piecefield[piece_i]:"%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority)) priority -= 1 pos += self.piece_size gevent.joinall(requests) self.read_bytes += buff # Increase buffer for long reads if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:"%s: Increasing bigfile buffer size to 5MB..." % self.inner_path) self.prebuffer = 5 * 1024 * 1024 return def seek(self, pos, whence=0): with self.read_lock: if whence == 2: # Relative from file end pos = self.size + pos # Use the real size instead of size on the disk whence = 0 return, whence) def tell(self): return self.f.tell() def close(self): self.f.close() def __enter__(self): return self def __exit__(self, exc_type, exc_val, exc_tb): self.close() @PluginManager.registerTo("WorkerManager") class WorkerManagerPlugin(object): def addTask(self, inner_path, *args, **kwargs): file_info = kwargs.get("file_info") if file_info and "piecemap" in file_info: # Bigfile["has_bigfile"] = True piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"] piecemap_task = None if not # Start download piecemap piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30) autodownload_bigfile_size_limit ="autodownload_bigfile_size_limit", config.autodownload_bigfile_size_limit) if "|" not in inner_path and and file_info["size"] / 1024 / 1024 <= autodownload_bigfile_size_limit: gevent.spawn_later(0.1,, inner_path + "|all") # Download all pieces if "|" in inner_path: # Start download piece task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs) inner_path, file_range = inner_path.split("|") pos_from, pos_to = map(int, file_range.split("-")) task["piece_i"] = pos_from / file_info["piece_size"] task["sha512"] = file_info["sha512"] else: if inner_path in del[inner_path] if piecemap_task: task = piecemap_task else: fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified fake_evt.set(True) task = {"evt": fake_evt} if not, file_info["size"], file_info["sha512"]) piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))[file_info["sha512"]].fromstring("0" * piece_num) else: task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs) return task def taskAddPeer(self, task, peer): if "piece_i" in task: if not peer.piecefields[task["sha512"]][task["piece_i"]]: if task["sha512"] not in peer.piecefields: gevent.spawn(peer.updatePiecefields, force=True) elif not task["peers"]: gevent.spawn(peer.updatePiecefields) return False # Deny to add peers to task if file not in piecefield return super(WorkerManagerPlugin, self).taskAddPeer(task, peer) @PluginManager.registerTo("FileRequest") class FileRequestPlugin(object): def isReadable(self, site, inner_path, file, pos): # Peek into file if == "\0" * 10: # Looks empty, but makes sures we don't have that piece file_info = site.content_manager.getFileInfo(inner_path) piece_i = pos / file_info["piece_size"] if not[file_info["sha512"]][piece_i]: return False # Seek back to position we want to read return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos) def actionGetPiecefields(self, params): site = self.sites.get(params["site"]) if not site or not site.settings["serving"]: # Site unknown or not serving self.response({"error": "Unknown site"}) return False # Add peer to site if not added before peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True) if not peer.connection: # Just added peer.connect(self.connection) # Assign current connection to peer piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in} self.response({"piecefields_packed": piecefields_packed}) def actionSetPiecefields(self, params): site = self.sites.get(params["site"]) if not site or not site.settings["serving"]: # Site unknown or not serving self.response({"error": "Unknown site"}) self.connection.badAction(5) return False # Add or get peer peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection) if not peer.connection: peer.connect(self.connection) peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked) for sha512, piecefield_packed in params["piecefields_packed"].iteritems(): peer.piecefields[sha512].unpack(piecefield_packed) site.settings["has_bigfile"] = True self.response({"ok": "Updated"}) @PluginManager.registerTo("Peer") class PeerPlugin(object): def __getattr__(self, key): if key == "piecefields": self.piecefields = collections.defaultdict(BigfilePiecefieldPacked) return self.piecefields elif key == "time_piecefields_updated": self.time_piecefields_updated = None return self.time_piecefields_updated else: return super(PeerPlugin, self).__getattr__(key) @util.Noparallel(ignore_args=True) def updatePiecefields(self, force=False): if self.connection and self.connection.handshake.get("rev", 0) < 2190: return False # Not supported # Don't update piecefield again in 1 min if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force: return False self.time_piecefields_updated = time.time() res = self.request("getPiecefields", {"site":}) if not res or "error" in res: return False self.piecefields = collections.defaultdict(BigfilePiecefieldPacked) try: for sha512, piecefield_packed in res["piecefields_packed"].iteritems(): self.piecefields[sha512].unpack(piecefield_packed) except Exception as err: self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err)) return self.piecefields def sendMyHashfield(self, *args, **kwargs): return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs) def updateHashfield(self, *args, **kwargs): if"has_bigfile"): thread = gevent.spawn(self.updatePiecefields, *args, **kwargs) back = super(PeerPlugin, self).updateHashfield(*args, **kwargs) thread.join() return back else: return super(PeerPlugin, self).updateHashfield(*args, **kwargs) def getFile(self, site, inner_path, *args, **kwargs): if "|" in inner_path: inner_path, file_range = inner_path.split("|") pos_from, pos_to = map(int, file_range.split("-")) kwargs["pos_from"] = pos_from kwargs["pos_to"] = pos_to return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs) @PluginManager.registerTo("Site") class SitePlugin(object): def isFileDownloadAllowed(self, inner_path, file_info): if "piecemap" in file_info: file_info = file_info.copy() file_info["size"] = file_info["piece_size"] return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info) def getSettingsCache(self): back = super(SitePlugin, self).getSettingsCache() if back["piecefields"] = {sha512: piecefield.pack().encode("base64") for sha512, piecefield in} return back def needFile(self, inner_path, *args, **kwargs): if inner_path.endswith("|all"): @util.Pooled(20) def pooledNeedBigfile(*args, **kwargs): return self.needFile(*args, **kwargs) inner_path = inner_path.replace("|all", "") file_info = self.needFileInfo(inner_path) file_size = file_info["size"] piece_size = file_info["piece_size"] piece_num = int(math.ceil(float(file_size) / piece_size)) file_threads = [] piecefield =["sha512"]) for piece_i in range(piece_num): piece_from = piece_i * piece_size piece_to = min(file_size, piece_from + piece_size) if not piecefield or not piecefield[piece_i]: res = pooledNeedBigfile("%s|%s-%s" % (inner_path, piece_from, piece_to), blocking=False) if res is not True and res is not False: file_threads.append(res) gevent.joinall(file_threads) else: return super(SitePlugin, self).needFile(inner_path, *args, **kwargs) @PluginManager.registerTo("ConfigPlugin") class ConfigPlugin(object): def createArguments(self): group = self.parser.add_argument_group("Bigfile plugin") group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles smaller than this limit if help distribute option is checked', default=1, metavar="MB", type=int) return super(ConfigPlugin, self).createArguments()