BigfilePlugin.py 33 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791
  1. import time
  2. import os
  3. import subprocess
  4. import shutil
  5. import collections
  6. import math
  7. import warnings
  8. import base64
  9. import binascii
  10. import json
  11. import gevent
  12. import gevent.lock
  13. from Plugin import PluginManager
  14. from Debug import Debug
  15. from Crypt import CryptHash
  16. with warnings.catch_warnings():
  17. warnings.filterwarnings("ignore") # Ignore missing sha3 warning
  18. import merkletools
  19. from util import helper
  20. from util import Msgpack
  21. from util.Flag import flag
  22. import util
  23. from .BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked
  24. # We can only import plugin host clases after the plugins are loaded
  25. @PluginManager.afterLoad
  26. def importPluginnedClasses():
  27. global VerifyError, config
  28. from Content.ContentManager import VerifyError
  29. from Config import config
  30. if "upload_nonces" not in locals():
  31. upload_nonces = {}
  32. @PluginManager.registerTo("UiRequest")
  33. class UiRequestPlugin(object):
  34. def isCorsAllowed(self, path):
  35. if path == "/ZeroNet-Internal/BigfileUpload":
  36. return True
  37. else:
  38. return super(UiRequestPlugin, self).isCorsAllowed(path)
  39. @helper.encodeResponse
  40. def actionBigfileUpload(self):
  41. nonce = self.get.get("upload_nonce")
  42. if nonce not in upload_nonces:
  43. return self.error403("Upload nonce error.")
  44. upload_info = upload_nonces[nonce]
  45. del upload_nonces[nonce]
  46. self.sendHeader(200, "text/html", noscript=True, extra_headers={
  47. "Access-Control-Allow-Origin": "null",
  48. "Access-Control-Allow-Credentials": "true"
  49. })
  50. self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers
  51. site = upload_info["site"]
  52. inner_path = upload_info["inner_path"]
  53. with site.storage.open(inner_path, "wb", create_dirs=True) as out_file:
  54. merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile(
  55. self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file
  56. )
  57. if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split
  58. hash = binascii.hexlify(piecemap_info["sha512_pieces"][0])
  59. hash_id = site.content_manager.hashfield.getHashId(hash)
  60. site.content_manager.optionalDownloaded(inner_path, hash_id, upload_info["size"], own=True)
  61. else: # Big file
  62. file_name = helper.getFilename(inner_path)
  63. site.storage.open(upload_info["piecemap"], "wb").write(Msgpack.pack({file_name: piecemap_info}))
  64. # Find piecemap and file relative path to content.json
  65. file_info = site.content_manager.getFileInfo(inner_path, new_file=True)
  66. content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
  67. piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):]
  68. file_relative_path = inner_path[len(content_inner_path_dir):]
  69. # Add file to content.json
  70. if site.storage.isFile(file_info["content_inner_path"]):
  71. content = site.storage.loadJson(file_info["content_inner_path"])
  72. else:
  73. content = {}
  74. if "files_optional" not in content:
  75. content["files_optional"] = {}
  76. content["files_optional"][file_relative_path] = {
  77. "sha512": merkle_root,
  78. "size": upload_info["size"],
  79. "piecemap": piecemap_relative_path,
  80. "piece_size": piece_size
  81. }
  82. merkle_root_hash_id = site.content_manager.hashfield.getHashId(merkle_root)
  83. site.content_manager.optionalDownloaded(inner_path, merkle_root_hash_id, upload_info["size"], own=True)
  84. site.storage.writeJson(file_info["content_inner_path"], content)
  85. site.content_manager.contents.loadItem(file_info["content_inner_path"]) # reload cache
  86. return json.dumps({
  87. "merkle_root": merkle_root,
  88. "piece_num": len(piecemap_info["sha512_pieces"]),
  89. "piece_size": piece_size,
  90. "inner_path": inner_path
  91. })
  92. def readMultipartHeaders(self, wsgi_input):
  93. found = False
  94. for i in range(100):
  95. line = wsgi_input.readline()
  96. if line == b"\r\n":
  97. found = True
  98. break
  99. if not found:
  100. raise Exception("No multipart header found")
  101. return i
  102. def actionFile(self, file_path, *args, **kwargs):
  103. if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB
  104. path_parts = kwargs["path_parts"]
  105. site = self.server.site_manager.get(path_parts["address"])
  106. big_file = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024)
  107. if big_file:
  108. kwargs["file_obj"] = big_file
  109. kwargs["file_size"] = big_file.size
  110. return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs)
  111. @PluginManager.registerTo("UiWebsocket")
  112. class UiWebsocketPlugin(object):
  113. def actionBigfileUploadInit(self, to, inner_path, size):
  114. valid_signers = self.site.content_manager.getValidSigners(inner_path)
  115. auth_address = self.user.getAuthAddress(self.site.address)
  116. if not self.site.settings["own"] and auth_address not in valid_signers:
  117. self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
  118. return self.response(to, {"error": "Forbidden, you can only modify your own files"})
  119. nonce = CryptHash.random()
  120. piece_size = 1024 * 1024
  121. inner_path = self.site.content_manager.sanitizePath(inner_path)
  122. file_info = self.site.content_manager.getFileInfo(inner_path, new_file=True)
  123. content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
  124. file_relative_path = inner_path[len(content_inner_path_dir):]
  125. upload_nonces[nonce] = {
  126. "added": time.time(),
  127. "site": self.site,
  128. "inner_path": inner_path,
  129. "websocket_client": self,
  130. "size": size,
  131. "piece_size": piece_size,
  132. "piecemap": inner_path + ".piecemap.msgpack"
  133. }
  134. return {
  135. "url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce,
  136. "piece_size": piece_size,
  137. "inner_path": inner_path,
  138. "file_relative_path": file_relative_path
  139. }
  140. @flag.no_multiuser
  141. def actionSiteSetAutodownloadBigfileLimit(self, to, limit):
  142. permissions = self.getPermissions(to)
  143. if "ADMIN" not in permissions:
  144. return self.response(to, "You don't have permission to run this command")
  145. self.site.settings["autodownload_bigfile_size_limit"] = int(limit)
  146. self.response(to, "ok")
  147. def actionFileDelete(self, to, inner_path):
  148. piecemap_inner_path = inner_path + ".piecemap.msgpack"
  149. if self.hasFilePermission(inner_path) and self.site.storage.isFile(piecemap_inner_path):
  150. # Also delete .piecemap.msgpack file if exists
  151. self.log.debug("Deleting piecemap: %s" % piecemap_inner_path)
  152. file_info = self.site.content_manager.getFileInfo(piecemap_inner_path)
  153. if file_info:
  154. content_json = self.site.storage.loadJson(file_info["content_inner_path"])
  155. relative_path = file_info["relative_path"]
  156. if relative_path in content_json.get("files_optional", {}):
  157. del content_json["files_optional"][relative_path]
  158. self.site.storage.writeJson(file_info["content_inner_path"], content_json)
  159. self.site.content_manager.loadContent(file_info["content_inner_path"], add_bad_files=False, force=True)
  160. try:
  161. self.site.storage.delete(piecemap_inner_path)
  162. except Exception as err:
  163. self.log.error("File %s delete error: %s" % (piecemap_inner_path, err))
  164. return super(UiWebsocketPlugin, self).actionFileDelete(to, inner_path)
  165. @PluginManager.registerTo("ContentManager")
  166. class ContentManagerPlugin(object):
  167. def getFileInfo(self, inner_path, *args, **kwargs):
  168. if "|" not in inner_path:
  169. return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
  170. inner_path, file_range = inner_path.split("|")
  171. pos_from, pos_to = map(int, file_range.split("-"))
  172. file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
  173. return file_info
  174. def readFile(self, file_in, size, buff_size=1024 * 64):
  175. part_num = 0
  176. recv_left = size
  177. while 1:
  178. part_num += 1
  179. read_size = min(buff_size, recv_left)
  180. part = file_in.read(read_size)
  181. if not part:
  182. break
  183. yield part
  184. if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload
  185. time.sleep(0.001)
  186. recv_left -= read_size
  187. if recv_left <= 0:
  188. break
  189. def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None):
  190. self.site.settings["has_bigfile"] = True
  191. recv = 0
  192. try:
  193. piece_hash = CryptHash.sha512t()
  194. piece_hashes = []
  195. piece_recv = 0
  196. mt = merkletools.MerkleTools()
  197. mt.hash_function = CryptHash.sha512t
  198. part = ""
  199. for part in self.readFile(file_in, size):
  200. if file_out:
  201. file_out.write(part)
  202. recv += len(part)
  203. piece_recv += len(part)
  204. piece_hash.update(part)
  205. if piece_recv >= piece_size:
  206. piece_digest = piece_hash.digest()
  207. piece_hashes.append(piece_digest)
  208. mt.leaves.append(piece_digest)
  209. piece_hash = CryptHash.sha512t()
  210. piece_recv = 0
  211. if len(piece_hashes) % 100 == 0 or recv == size:
  212. self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % (
  213. float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024
  214. ))
  215. part = ""
  216. if len(part) > 0:
  217. piece_digest = piece_hash.digest()
  218. piece_hashes.append(piece_digest)
  219. mt.leaves.append(piece_digest)
  220. except Exception as err:
  221. raise err
  222. finally:
  223. if file_out:
  224. file_out.close()
  225. mt.make_tree()
  226. merkle_root = mt.get_merkle_root()
  227. if type(merkle_root) is bytes: # Python <3.5
  228. merkle_root = merkle_root.decode()
  229. return merkle_root, piece_size, {
  230. "sha512_pieces": piece_hashes
  231. }
  232. def hashFile(self, dir_inner_path, file_relative_path, optional=False):
  233. inner_path = dir_inner_path + file_relative_path
  234. file_size = self.site.storage.getSize(inner_path)
  235. # Only care about optional files >1MB
  236. if not optional or file_size < 1 * 1024 * 1024:
  237. return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
  238. back = {}
  239. content = self.contents.get(dir_inner_path + "content.json")
  240. hash = None
  241. piecemap_relative_path = None
  242. piece_size = None
  243. # Don't re-hash if it's already in content.json
  244. if content and file_relative_path in content.get("files_optional", {}):
  245. file_node = content["files_optional"][file_relative_path]
  246. if file_node["size"] == file_size:
  247. self.log.info("- [SAME SIZE] %s" % file_relative_path)
  248. hash = file_node.get("sha512")
  249. piecemap_relative_path = file_node.get("piecemap")
  250. piece_size = file_node.get("piece_size")
  251. if not hash or not piecemap_relative_path: # Not in content.json yet
  252. if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB
  253. return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
  254. self.log.info("- [HASHING] %s" % file_relative_path)
  255. merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb"), file_size)
  256. if not hash:
  257. hash = merkle_root
  258. if not piecemap_relative_path:
  259. file_name = helper.getFilename(file_relative_path)
  260. piecemap_relative_path = file_relative_path + ".piecemap.msgpack"
  261. piecemap_inner_path = inner_path + ".piecemap.msgpack"
  262. self.site.storage.open(piecemap_inner_path, "wb").write(Msgpack.pack({file_name: piecemap_info}))
  263. back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, optional=True))
  264. piece_num = int(math.ceil(float(file_size) / piece_size))
  265. # Add the merkle root to hashfield
  266. hash_id = self.site.content_manager.hashfield.getHashId(hash)
  267. self.optionalDownloaded(inner_path, hash_id, file_size, own=True)
  268. self.site.storage.piecefields[hash].frombytes(b"\x01" * piece_num)
  269. back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size}
  270. return back
  271. def getPiecemap(self, inner_path):
  272. file_info = self.site.content_manager.getFileInfo(inner_path)
  273. piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
  274. self.site.needFile(piecemap_inner_path, priority=20)
  275. piecemap = Msgpack.unpack(self.site.storage.open(piecemap_inner_path, "rb").read())[helper.getFilename(inner_path)]
  276. piecemap["piece_size"] = file_info["piece_size"]
  277. return piecemap
  278. def verifyPiece(self, inner_path, pos, piece):
  279. piecemap = self.getPiecemap(inner_path)
  280. piece_i = int(pos / piecemap["piece_size"])
  281. if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]:
  282. raise VerifyError("Invalid hash")
  283. return True
  284. def verifyFile(self, inner_path, file, ignore_same=True):
  285. if "|" not in inner_path:
  286. return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same)
  287. inner_path, file_range = inner_path.split("|")
  288. pos_from, pos_to = map(int, file_range.split("-"))
  289. return self.verifyPiece(inner_path, pos_from, file)
  290. def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
  291. if "|" in inner_path:
  292. inner_path, file_range = inner_path.split("|")
  293. pos_from, pos_to = map(int, file_range.split("-"))
  294. file_info = self.getFileInfo(inner_path)
  295. # Mark piece downloaded
  296. piece_i = int(pos_from / file_info["piece_size"])
  297. self.site.storage.piecefields[file_info["sha512"]][piece_i] = b"\x01"
  298. # Only add to site size on first request
  299. if hash_id in self.hashfield:
  300. size = 0
  301. elif size > 1024 * 1024:
  302. file_info = self.getFileInfo(inner_path)
  303. if file_info and "sha512" in file_info: # We already have the file, but not in piecefield
  304. sha512 = file_info["sha512"]
  305. if sha512 not in self.site.storage.piecefields:
  306. self.site.storage.checkBigfile(inner_path)
  307. return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
  308. def optionalRemoved(self, inner_path, hash_id, size=None):
  309. if size and size > 1024 * 1024:
  310. file_info = self.getFileInfo(inner_path)
  311. sha512 = file_info["sha512"]
  312. if sha512 in self.site.storage.piecefields:
  313. del self.site.storage.piecefields[sha512]
  314. # Also remove other pieces of the file from download queue
  315. for key in list(self.site.bad_files.keys()):
  316. if key.startswith(inner_path + "|"):
  317. del self.site.bad_files[key]
  318. self.site.worker_manager.removeSolvedFileTasks()
  319. return super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
  320. @PluginManager.registerTo("SiteStorage")
  321. class SiteStoragePlugin(object):
  322. def __init__(self, *args, **kwargs):
  323. super(SiteStoragePlugin, self).__init__(*args, **kwargs)
  324. self.piecefields = collections.defaultdict(BigfilePiecefield)
  325. if "piecefields" in self.site.settings.get("cache", {}):
  326. for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").items():
  327. if piecefield_packed:
  328. self.piecefields[sha512].unpack(base64.b64decode(piecefield_packed))
  329. self.site.settings["cache"]["piecefields"] = {}
  330. def createSparseFile(self, inner_path, size, sha512=None):
  331. file_path = self.getPath(inner_path)
  332. file_dir = os.path.dirname(file_path)
  333. if not os.path.isdir(file_dir):
  334. os.makedirs(file_dir)
  335. f = open(file_path, 'wb')
  336. f.truncate(min(1024 * 1024 * 5, size)) # Only pre-allocate up to 5MB
  337. f.close()
  338. if os.name == "nt":
  339. startupinfo = subprocess.STARTUPINFO()
  340. startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
  341. subprocess.call(["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo)
  342. if sha512 and sha512 in self.piecefields:
  343. self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path)
  344. del self.piecefields[sha512]
  345. def write(self, inner_path, content):
  346. if "|" not in inner_path:
  347. return super(SiteStoragePlugin, self).write(inner_path, content)
  348. # Write to specific position by passing |{pos} after the filename
  349. inner_path, file_range = inner_path.split("|")
  350. pos_from, pos_to = map(int, file_range.split("-"))
  351. file_path = self.getPath(inner_path)
  352. # Create dir if not exist
  353. file_dir = os.path.dirname(file_path)
  354. if not os.path.isdir(file_dir):
  355. os.makedirs(file_dir)
  356. if not os.path.isfile(file_path):
  357. file_info = self.site.content_manager.getFileInfo(inner_path)
  358. self.createSparseFile(inner_path, file_info["size"])
  359. # Write file
  360. with open(file_path, "rb+") as file:
  361. file.seek(pos_from)
  362. if hasattr(content, 'read'): # File-like object
  363. shutil.copyfileobj(content, file) # Write buff to disk
  364. else: # Simple string
  365. file.write(content)
  366. del content
  367. self.onUpdated(inner_path)
  368. def checkBigfile(self, inner_path):
  369. file_info = self.site.content_manager.getFileInfo(inner_path)
  370. if not file_info or (file_info and "piecemap" not in file_info): # It's not a big file
  371. return False
  372. self.site.settings["has_bigfile"] = True
  373. file_path = self.getPath(inner_path)
  374. sha512 = file_info["sha512"]
  375. piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
  376. if os.path.isfile(file_path):
  377. if sha512 not in self.piecefields:
  378. if open(file_path, "rb").read(128) == b"\0" * 128:
  379. piece_data = b"\x00"
  380. else:
  381. piece_data = b"\x01"
  382. self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data))
  383. self.piecefields[sha512].frombytes(piece_data * piece_num)
  384. else:
  385. self.log.debug("Creating bigfile: %s" % inner_path)
  386. self.createSparseFile(inner_path, file_info["size"], sha512)
  387. self.piecefields[sha512].frombytes(b"\x00" * piece_num)
  388. self.log.debug("Created bigfile: %s" % inner_path)
  389. return True
  390. def openBigfile(self, inner_path, prebuffer=0):
  391. if not self.checkBigfile(inner_path):
  392. return False
  393. self.site.needFile(inner_path, blocking=False) # Download piecemap
  394. return BigFile(self.site, inner_path, prebuffer=prebuffer)
  395. class BigFile(object):
  396. def __init__(self, site, inner_path, prebuffer=0):
  397. self.site = site
  398. self.inner_path = inner_path
  399. file_path = site.storage.getPath(inner_path)
  400. file_info = self.site.content_manager.getFileInfo(inner_path)
  401. self.piece_size = file_info["piece_size"]
  402. self.sha512 = file_info["sha512"]
  403. self.size = file_info["size"]
  404. self.prebuffer = prebuffer
  405. self.read_bytes = 0
  406. self.piecefield = self.site.storage.piecefields[self.sha512]
  407. self.f = open(file_path, "rb+")
  408. self.read_lock = gevent.lock.Semaphore()
  409. def read(self, buff=64 * 1024):
  410. with self.read_lock:
  411. pos = self.f.tell()
  412. read_until = min(self.size, pos + buff)
  413. requests = []
  414. # Request all required blocks
  415. while 1:
  416. piece_i = int(pos / self.piece_size)
  417. if piece_i * self.piece_size >= read_until:
  418. break
  419. pos_from = piece_i * self.piece_size
  420. pos_to = pos_from + self.piece_size
  421. if not self.piecefield[piece_i]:
  422. requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
  423. pos += self.piece_size
  424. if not all(requests):
  425. return None
  426. # Request prebuffer
  427. if self.prebuffer:
  428. prebuffer_until = min(self.size, read_until + self.prebuffer)
  429. priority = 3
  430. while 1:
  431. piece_i = int(pos / self.piece_size)
  432. if piece_i * self.piece_size >= prebuffer_until:
  433. break
  434. pos_from = piece_i * self.piece_size
  435. pos_to = pos_from + self.piece_size
  436. if not self.piecefield[piece_i]:
  437. self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
  438. priority -= 1
  439. pos += self.piece_size
  440. gevent.joinall(requests)
  441. self.read_bytes += buff
  442. # Increase buffer for long reads
  443. if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
  444. self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
  445. self.prebuffer = 5 * 1024 * 1024
  446. return self.f.read(buff)
  447. def seek(self, pos, whence=0):
  448. with self.read_lock:
  449. if whence == 2: # Relative from file end
  450. pos = self.size + pos # Use the real size instead of size on the disk
  451. whence = 0
  452. return self.f.seek(pos, whence)
  453. def tell(self):
  454. return self.f.tell()
  455. def close(self):
  456. self.f.close()
  457. def __enter__(self):
  458. return self
  459. def __exit__(self, exc_type, exc_val, exc_tb):
  460. self.close()
  461. @PluginManager.registerTo("WorkerManager")
  462. class WorkerManagerPlugin(object):
  463. def addTask(self, inner_path, *args, **kwargs):
  464. file_info = kwargs.get("file_info")
  465. if file_info and "piecemap" in file_info: # Bigfile
  466. self.site.settings["has_bigfile"] = True
  467. piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
  468. piecemap_task = None
  469. if not self.site.storage.isFile(piecemap_inner_path):
  470. # Start download piecemap
  471. piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30)
  472. autodownload_bigfile_size_limit = self.site.settings.get("autodownload_bigfile_size_limit", config.autodownload_bigfile_size_limit)
  473. if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= autodownload_bigfile_size_limit:
  474. gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces
  475. if "|" in inner_path:
  476. # Start download piece
  477. task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
  478. inner_path, file_range = inner_path.split("|")
  479. pos_from, pos_to = map(int, file_range.split("-"))
  480. task["piece_i"] = int(pos_from / file_info["piece_size"])
  481. task["sha512"] = file_info["sha512"]
  482. else:
  483. if inner_path in self.site.bad_files:
  484. del self.site.bad_files[inner_path]
  485. if piecemap_task:
  486. task = piecemap_task
  487. else:
  488. fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified
  489. fake_evt.set(True)
  490. task = {"evt": fake_evt}
  491. if not self.site.storage.isFile(inner_path):
  492. self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"])
  493. piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
  494. self.site.storage.piecefields[file_info["sha512"]].frombytes(b"\x00" * piece_num)
  495. else:
  496. task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
  497. return task
  498. def taskAddPeer(self, task, peer):
  499. if "piece_i" in task:
  500. if not peer.piecefields[task["sha512"]][task["piece_i"]]:
  501. if task["sha512"] not in peer.piecefields:
  502. gevent.spawn(peer.updatePiecefields, force=True)
  503. elif not task["peers"]:
  504. gevent.spawn(peer.updatePiecefields)
  505. return False # Deny to add peers to task if file not in piecefield
  506. return super(WorkerManagerPlugin, self).taskAddPeer(task, peer)
  507. @PluginManager.registerTo("FileRequest")
  508. class FileRequestPlugin(object):
  509. def isReadable(self, site, inner_path, file, pos):
  510. # Peek into file
  511. if file.read(10) == b"\0" * 10:
  512. # Looks empty, but makes sures we don't have that piece
  513. file_info = site.content_manager.getFileInfo(inner_path)
  514. if "piece_size" in file_info:
  515. piece_i = int(pos / file_info["piece_size"])
  516. if not site.storage.piecefields[file_info["sha512"]][piece_i]:
  517. return False
  518. # Seek back to position we want to read
  519. file.seek(pos)
  520. return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos)
  521. def actionGetPiecefields(self, params):
  522. site = self.sites.get(params["site"])
  523. if not site or not site.isServing(): # Site unknown or not serving
  524. self.response({"error": "Unknown site"})
  525. return False
  526. # Add peer to site if not added before
  527. peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
  528. if not peer.connection: # Just added
  529. peer.connect(self.connection) # Assign current connection to peer
  530. piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.items()}
  531. self.response({"piecefields_packed": piecefields_packed})
  532. def actionSetPiecefields(self, params):
  533. site = self.sites.get(params["site"])
  534. if not site or not site.isServing(): # Site unknown or not serving
  535. self.response({"error": "Unknown site"})
  536. self.connection.badAction(5)
  537. return False
  538. # Add or get peer
  539. peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection)
  540. if not peer.connection:
  541. peer.connect(self.connection)
  542. peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  543. for sha512, piecefield_packed in params["piecefields_packed"].items():
  544. peer.piecefields[sha512].unpack(piecefield_packed)
  545. site.settings["has_bigfile"] = True
  546. self.response({"ok": "Updated"})
  547. @PluginManager.registerTo("Peer")
  548. class PeerPlugin(object):
  549. def __getattr__(self, key):
  550. if key == "piecefields":
  551. self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  552. return self.piecefields
  553. elif key == "time_piecefields_updated":
  554. self.time_piecefields_updated = None
  555. return self.time_piecefields_updated
  556. else:
  557. return super(PeerPlugin, self).__getattr__(key)
  558. @util.Noparallel(ignore_args=True)
  559. def updatePiecefields(self, force=False):
  560. if self.connection and self.connection.handshake.get("rev", 0) < 2190:
  561. return False # Not supported
  562. # Don't update piecefield again in 1 min
  563. if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force:
  564. return False
  565. self.time_piecefields_updated = time.time()
  566. res = self.request("getPiecefields", {"site": self.site.address})
  567. if not res or "error" in res:
  568. return False
  569. self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  570. try:
  571. for sha512, piecefield_packed in res["piecefields_packed"].items():
  572. self.piecefields[sha512].unpack(piecefield_packed)
  573. except Exception as err:
  574. self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err))
  575. return self.piecefields
  576. def sendMyHashfield(self, *args, **kwargs):
  577. return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs)
  578. def updateHashfield(self, *args, **kwargs):
  579. if self.site.settings.get("has_bigfile"):
  580. thread = gevent.spawn(self.updatePiecefields, *args, **kwargs)
  581. back = super(PeerPlugin, self).updateHashfield(*args, **kwargs)
  582. thread.join()
  583. return back
  584. else:
  585. return super(PeerPlugin, self).updateHashfield(*args, **kwargs)
  586. def getFile(self, site, inner_path, *args, **kwargs):
  587. if "|" in inner_path:
  588. inner_path, file_range = inner_path.split("|")
  589. pos_from, pos_to = map(int, file_range.split("-"))
  590. kwargs["pos_from"] = pos_from
  591. kwargs["pos_to"] = pos_to
  592. return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs)
  593. @PluginManager.registerTo("Site")
  594. class SitePlugin(object):
  595. def isFileDownloadAllowed(self, inner_path, file_info):
  596. if "piecemap" in file_info:
  597. file_size_mb = file_info["size"] / 1024 / 1024
  598. if config.bigfile_size_limit and file_size_mb > config.bigfile_size_limit:
  599. self.log.debug(
  600. "Bigfile size %s too large: %sMB > %sMB, skipping..." %
  601. (inner_path, file_size_mb, config.bigfile_size_limit)
  602. )
  603. return False
  604. file_info = file_info.copy()
  605. file_info["size"] = file_info["piece_size"]
  606. return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info)
  607. def getSettingsCache(self):
  608. back = super(SitePlugin, self).getSettingsCache()
  609. if self.storage.piecefields:
  610. back["piecefields"] = {sha512: base64.b64encode(piecefield.pack()).decode("utf8") for sha512, piecefield in self.storage.piecefields.items()}
  611. return back
  612. def needFile(self, inner_path, *args, **kwargs):
  613. if inner_path.endswith("|all"):
  614. @util.Pooled(20)
  615. def pooledNeedBigfile(inner_path, *args, **kwargs):
  616. if inner_path not in self.bad_files:
  617. self.log.debug("Cancelled piece, skipping %s" % inner_path)
  618. return False
  619. return self.needFile(inner_path, *args, **kwargs)
  620. inner_path = inner_path.replace("|all", "")
  621. file_info = self.needFileInfo(inner_path)
  622. # Use default function to download non-optional file
  623. if "piece_size" not in file_info:
  624. return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
  625. file_size = file_info["size"]
  626. piece_size = file_info["piece_size"]
  627. piece_num = int(math.ceil(float(file_size) / piece_size))
  628. file_threads = []
  629. piecefield = self.storage.piecefields.get(file_info["sha512"])
  630. for piece_i in range(piece_num):
  631. piece_from = piece_i * piece_size
  632. piece_to = min(file_size, piece_from + piece_size)
  633. if not piecefield or not piecefield[piece_i]:
  634. inner_path_piece = "%s|%s-%s" % (inner_path, piece_from, piece_to)
  635. self.bad_files[inner_path_piece] = self.bad_files.get(inner_path_piece, 1)
  636. res = pooledNeedBigfile(inner_path_piece, blocking=False)
  637. if res is not True and res is not False:
  638. file_threads.append(res)
  639. gevent.joinall(file_threads)
  640. else:
  641. return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
  642. @PluginManager.registerTo("ConfigPlugin")
  643. class ConfigPlugin(object):
  644. def createArguments(self):
  645. group = self.parser.add_argument_group("Bigfile plugin")
  646. group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles smaller than this limit if help distribute option is checked', default=10, metavar="MB", type=int)
  647. group.add_argument('--bigfile_size_limit', help='Maximum size of downloaded big files', default=False, metavar="MB", type=int)
  648. return super(ConfigPlugin, self).createArguments()