BigfilePlugin.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758
  1. import time
  2. import os
  3. import subprocess
  4. import shutil
  5. import collections
  6. import math
  7. import msgpack
  8. import gevent
  9. import gevent.lock
  10. from Plugin import PluginManager
  11. from Debug import Debug
  12. from Crypt import CryptHash
  13. from lib import merkletools
  14. from util import helper
  15. import util
  16. from BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked
  17. # We can only import plugin host clases after the plugins are loaded
  18. @PluginManager.afterLoad
  19. def importPluginnedClasses():
  20. global VerifyError, config
  21. from Content.ContentManager import VerifyError
  22. from Config import config
  23. if "upload_nonces" not in locals():
  24. upload_nonces = {}
  25. @PluginManager.registerTo("UiRequest")
  26. class UiRequestPlugin(object):
  27. def isCorsAllowed(self, path):
  28. if path == "/ZeroNet-Internal/BigfileUpload":
  29. return True
  30. else:
  31. return super(UiRequestPlugin, self).isCorsAllowed(path)
  32. def actionBigfileUpload(self):
  33. nonce = self.get.get("upload_nonce")
  34. if nonce not in upload_nonces:
  35. return self.error403("Upload nonce error.")
  36. upload_info = upload_nonces[nonce]
  37. del upload_nonces[nonce]
  38. self.sendHeader(200, "text/html", noscript=True, extra_headers={
  39. "Access-Control-Allow-Origin": "null",
  40. "Access-Control-Allow-Credentials": "true"
  41. })
  42. self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers
  43. site = upload_info["site"]
  44. inner_path = upload_info["inner_path"]
  45. with site.storage.open(inner_path, "wb", create_dirs=True) as out_file:
  46. merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile(
  47. self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file
  48. )
  49. if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split
  50. hash = piecemap_info["sha512_pieces"][0].encode("hex")
  51. hash_id = site.content_manager.hashfield.getHashId(hash)
  52. site.content_manager.optionalDownloaded(inner_path, hash_id, upload_info["size"], own=True)
  53. else: # Big file
  54. file_name = helper.getFilename(inner_path)
  55. msgpack.pack({file_name: piecemap_info}, site.storage.open(upload_info["piecemap"], "wb"))
  56. # Find piecemap and file relative path to content.json
  57. file_info = site.content_manager.getFileInfo(inner_path, new_file=True)
  58. content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
  59. piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):]
  60. file_relative_path = inner_path[len(content_inner_path_dir):]
  61. # Add file to content.json
  62. if site.storage.isFile(file_info["content_inner_path"]):
  63. content = site.storage.loadJson(file_info["content_inner_path"])
  64. else:
  65. content = {}
  66. if "files_optional" not in content:
  67. content["files_optional"] = {}
  68. content["files_optional"][file_relative_path] = {
  69. "sha512": merkle_root,
  70. "size": upload_info["size"],
  71. "piecemap": piecemap_relative_path,
  72. "piece_size": piece_size
  73. }
  74. merkle_root_hash_id = site.content_manager.hashfield.getHashId(merkle_root)
  75. site.content_manager.optionalDownloaded(inner_path, merkle_root_hash_id, upload_info["size"], own=True)
  76. site.storage.writeJson(file_info["content_inner_path"], content)
  77. site.content_manager.contents.loadItem(file_info["content_inner_path"]) # reload cache
  78. return {
  79. "merkle_root": merkle_root,
  80. "piece_num": len(piecemap_info["sha512_pieces"]),
  81. "piece_size": piece_size,
  82. "inner_path": inner_path
  83. }
  84. def readMultipartHeaders(self, wsgi_input):
  85. for i in range(100):
  86. line = wsgi_input.readline()
  87. if line == "\r\n":
  88. break
  89. return i
  90. def actionFile(self, file_path, *args, **kwargs):
  91. if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB
  92. path_parts = kwargs["path_parts"]
  93. site = self.server.site_manager.get(path_parts["address"])
  94. big_file = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024)
  95. if big_file:
  96. kwargs["file_obj"] = big_file
  97. kwargs["file_size"] = big_file.size
  98. return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs)
  99. @PluginManager.registerTo("UiWebsocket")
  100. class UiWebsocketPlugin(object):
  101. def actionBigfileUploadInit(self, to, inner_path, size):
  102. valid_signers = self.site.content_manager.getValidSigners(inner_path)
  103. auth_address = self.user.getAuthAddress(self.site.address)
  104. if not self.site.settings["own"] and auth_address not in valid_signers:
  105. self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
  106. return self.response(to, {"error": "Forbidden, you can only modify your own files"})
  107. nonce = CryptHash.random()
  108. piece_size = 1024 * 1024
  109. inner_path = self.site.content_manager.sanitizePath(inner_path)
  110. file_info = self.site.content_manager.getFileInfo(inner_path, new_file=True)
  111. content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
  112. file_relative_path = inner_path[len(content_inner_path_dir):]
  113. upload_nonces[nonce] = {
  114. "added": time.time(),
  115. "site": self.site,
  116. "inner_path": inner_path,
  117. "websocket_client": self,
  118. "size": size,
  119. "piece_size": piece_size,
  120. "piecemap": inner_path + ".piecemap.msgpack"
  121. }
  122. return {
  123. "url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce,
  124. "piece_size": piece_size,
  125. "inner_path": inner_path,
  126. "file_relative_path": file_relative_path
  127. }
  128. def actionSiteSetAutodownloadBigfileLimit(self, to, limit):
  129. permissions = self.getPermissions(to)
  130. if "ADMIN" not in permissions:
  131. return self.response(to, "You don't have permission to run this command")
  132. self.site.settings["autodownload_bigfile_size_limit"] = int(limit)
  133. self.response(to, "ok")
  134. def actionFileDelete(self, to, inner_path):
  135. piecemap_inner_path = inner_path + ".piecemap.msgpack"
  136. if self.hasFilePermission(inner_path) and self.site.storage.isFile(piecemap_inner_path):
  137. # Also delete .piecemap.msgpack file if exists
  138. self.log.debug("Deleting piecemap: %s" % piecemap_inner_path)
  139. file_info = self.site.content_manager.getFileInfo(piecemap_inner_path)
  140. if file_info:
  141. content_json = self.site.storage.loadJson(file_info["content_inner_path"])
  142. relative_path = file_info["relative_path"]
  143. if relative_path in content_json.get("files_optional", {}):
  144. del content_json["files_optional"][relative_path]
  145. self.site.storage.writeJson(file_info["content_inner_path"], content_json)
  146. self.site.content_manager.loadContent(file_info["content_inner_path"], add_bad_files=False, force=True)
  147. try:
  148. self.site.storage.delete(piecemap_inner_path)
  149. except Exception, err:
  150. self.log.error("File %s delete error: %s" % (piecemap_inner_path, err))
  151. return super(UiWebsocketPlugin, self).actionFileDelete(to, inner_path)
  152. @PluginManager.registerTo("ContentManager")
  153. class ContentManagerPlugin(object):
  154. def getFileInfo(self, inner_path, *args, **kwargs):
  155. if "|" not in inner_path:
  156. return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
  157. inner_path, file_range = inner_path.split("|")
  158. pos_from, pos_to = map(int, file_range.split("-"))
  159. file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
  160. return file_info
  161. def readFile(self, file_in, size, buff_size=1024 * 64):
  162. part_num = 0
  163. recv_left = size
  164. while 1:
  165. part_num += 1
  166. read_size = min(buff_size, recv_left)
  167. part = file_in.read(read_size)
  168. if not part:
  169. break
  170. yield part
  171. if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload
  172. time.sleep(0.001)
  173. recv_left -= read_size
  174. if recv_left <= 0:
  175. break
  176. def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None):
  177. self.site.settings["has_bigfile"] = True
  178. recv = 0
  179. try:
  180. piece_hash = CryptHash.sha512t()
  181. piece_hashes = []
  182. piece_recv = 0
  183. mt = merkletools.MerkleTools()
  184. mt.hash_function = CryptHash.sha512t
  185. part = ""
  186. for part in self.readFile(file_in, size):
  187. if file_out:
  188. file_out.write(part)
  189. recv += len(part)
  190. piece_recv += len(part)
  191. piece_hash.update(part)
  192. if piece_recv >= piece_size:
  193. piece_digest = piece_hash.digest()
  194. piece_hashes.append(piece_digest)
  195. mt.leaves.append(piece_digest)
  196. piece_hash = CryptHash.sha512t()
  197. piece_recv = 0
  198. if len(piece_hashes) % 100 == 0 or recv == size:
  199. self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % (
  200. float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024
  201. ))
  202. part = ""
  203. if len(part) > 0:
  204. piece_digest = piece_hash.digest()
  205. piece_hashes.append(piece_digest)
  206. mt.leaves.append(piece_digest)
  207. except Exception as err:
  208. raise err
  209. finally:
  210. if file_out:
  211. file_out.close()
  212. mt.make_tree()
  213. return mt.get_merkle_root(), piece_size, {
  214. "sha512_pieces": piece_hashes
  215. }
  216. def hashFile(self, dir_inner_path, file_relative_path, optional=False):
  217. inner_path = dir_inner_path + file_relative_path
  218. file_size = self.site.storage.getSize(inner_path)
  219. # Only care about optional files >1MB
  220. if not optional or file_size < 1 * 1024 * 1024:
  221. return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
  222. back = {}
  223. content = self.contents.get(dir_inner_path + "content.json")
  224. hash = None
  225. piecemap_relative_path = None
  226. piece_size = None
  227. # Don't re-hash if it's already in content.json
  228. if content and file_relative_path in content.get("files_optional", {}):
  229. file_node = content["files_optional"][file_relative_path]
  230. if file_node["size"] == file_size:
  231. self.log.info("- [SAME SIZE] %s" % file_relative_path)
  232. hash = file_node.get("sha512")
  233. piecemap_relative_path = file_node.get("piecemap")
  234. piece_size = file_node.get("piece_size")
  235. if not hash or not piecemap_relative_path: # Not in content.json yet
  236. if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB
  237. return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
  238. self.log.info("- [HASHING] %s" % file_relative_path)
  239. merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb"), file_size)
  240. if not hash:
  241. hash = merkle_root
  242. if not piecemap_relative_path:
  243. file_name = helper.getFilename(file_relative_path)
  244. piecemap_relative_path = file_relative_path + ".piecemap.msgpack"
  245. piecemap_inner_path = inner_path + ".piecemap.msgpack"
  246. msgpack.pack({file_name: piecemap_info}, self.site.storage.open(piecemap_inner_path, "wb"))
  247. back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, optional=True))
  248. piece_num = int(math.ceil(float(file_size) / piece_size))
  249. # Add the merkle root to hashfield
  250. hash_id = self.site.content_manager.hashfield.getHashId(hash)
  251. self.optionalDownloaded(inner_path, hash_id, file_size, own=True)
  252. self.site.storage.piecefields[hash].fromstring("1" * piece_num)
  253. back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size}
  254. return back
  255. def getPiecemap(self, inner_path):
  256. file_info = self.site.content_manager.getFileInfo(inner_path)
  257. piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
  258. self.site.needFile(piecemap_inner_path, priority=20)
  259. piecemap = msgpack.unpack(self.site.storage.open(piecemap_inner_path))[helper.getFilename(inner_path)]
  260. piecemap["piece_size"] = file_info["piece_size"]
  261. return piecemap
  262. def verifyPiece(self, inner_path, pos, piece):
  263. piecemap = self.getPiecemap(inner_path)
  264. piece_i = pos / piecemap["piece_size"]
  265. if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]:
  266. raise VerifyError("Invalid hash")
  267. return True
  268. def verifyFile(self, inner_path, file, ignore_same=True):
  269. if "|" not in inner_path:
  270. return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same)
  271. inner_path, file_range = inner_path.split("|")
  272. pos_from, pos_to = map(int, file_range.split("-"))
  273. return self.verifyPiece(inner_path, pos_from, file)
  274. def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
  275. if "|" in inner_path:
  276. inner_path, file_range = inner_path.split("|")
  277. pos_from, pos_to = map(int, file_range.split("-"))
  278. file_info = self.getFileInfo(inner_path)
  279. # Mark piece downloaded
  280. piece_i = pos_from / file_info["piece_size"]
  281. self.site.storage.piecefields[file_info["sha512"]][piece_i] = True
  282. # Only add to site size on first request
  283. if hash_id in self.hashfield:
  284. size = 0
  285. elif size > 1024 * 1024:
  286. file_info = self.getFileInfo(inner_path)
  287. if file_info and "sha512" in file_info: # We already have the file, but not in piecefield
  288. sha512 = file_info["sha512"]
  289. if sha512 not in self.site.storage.piecefields:
  290. self.site.storage.checkBigfile(inner_path)
  291. return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
  292. def optionalRemoved(self, inner_path, hash_id, size=None):
  293. if size and size > 1024 * 1024:
  294. file_info = self.getFileInfo(inner_path)
  295. sha512 = file_info["sha512"]
  296. if sha512 in self.site.storage.piecefields:
  297. del self.site.storage.piecefields[sha512]
  298. # Also remove other pieces of the file from download queue
  299. for key in self.site.bad_files.keys():
  300. if key.startswith(inner_path + "|"):
  301. del self.site.bad_files[key]
  302. self.site.worker_manager.removeSolvedFileTasks()
  303. return super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
  304. @PluginManager.registerTo("SiteStorage")
  305. class SiteStoragePlugin(object):
  306. def __init__(self, *args, **kwargs):
  307. super(SiteStoragePlugin, self).__init__(*args, **kwargs)
  308. self.piecefields = collections.defaultdict(BigfilePiecefield)
  309. if "piecefields" in self.site.settings.get("cache", {}):
  310. for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").iteritems():
  311. if piecefield_packed:
  312. self.piecefields[sha512].unpack(piecefield_packed.decode("base64"))
  313. self.site.settings["cache"]["piecefields"] = {}
  314. def createSparseFile(self, inner_path, size, sha512=None):
  315. file_path = self.getPath(inner_path)
  316. file_dir = os.path.dirname(file_path)
  317. if not os.path.isdir(file_dir):
  318. os.makedirs(file_dir)
  319. f = open(file_path, 'wb')
  320. f.truncate(min(1024 * 1024 * 5, size)) # Only pre-allocate up to 5MB
  321. f.close()
  322. if os.name == "nt":
  323. startupinfo = subprocess.STARTUPINFO()
  324. startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
  325. subprocess.call(["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo)
  326. if sha512 and sha512 in self.piecefields:
  327. self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path)
  328. del self.piecefields[sha512]
  329. def write(self, inner_path, content):
  330. if "|" not in inner_path:
  331. return super(SiteStoragePlugin, self).write(inner_path, content)
  332. # Write to specific position by passing |{pos} after the filename
  333. inner_path, file_range = inner_path.split("|")
  334. pos_from, pos_to = map(int, file_range.split("-"))
  335. file_path = self.getPath(inner_path)
  336. # Create dir if not exist
  337. file_dir = os.path.dirname(file_path)
  338. if not os.path.isdir(file_dir):
  339. os.makedirs(file_dir)
  340. if not os.path.isfile(file_path):
  341. file_info = self.site.content_manager.getFileInfo(inner_path)
  342. self.createSparseFile(inner_path, file_info["size"])
  343. # Write file
  344. with open(file_path, "rb+") as file:
  345. file.seek(pos_from)
  346. if hasattr(content, 'read'): # File-like object
  347. shutil.copyfileobj(content, file) # Write buff to disk
  348. else: # Simple string
  349. file.write(content)
  350. del content
  351. self.onUpdated(inner_path)
  352. def checkBigfile(self, inner_path):
  353. file_info = self.site.content_manager.getFileInfo(inner_path)
  354. if not file_info or (file_info and "piecemap" not in file_info): # It's not a big file
  355. return False
  356. self.site.settings["has_bigfile"] = True
  357. file_path = self.getPath(inner_path)
  358. sha512 = file_info["sha512"]
  359. piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
  360. if os.path.isfile(file_path):
  361. if sha512 not in self.piecefields:
  362. if open(file_path).read(128) == "\0" * 128:
  363. piece_data = "0"
  364. else:
  365. piece_data = "1"
  366. self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data))
  367. self.piecefields[sha512].fromstring(piece_data * piece_num)
  368. else:
  369. self.log.debug("Creating bigfile: %s" % inner_path)
  370. self.createSparseFile(inner_path, file_info["size"], sha512)
  371. self.piecefields[sha512].fromstring("0" * piece_num)
  372. return True
  373. def openBigfile(self, inner_path, prebuffer=0):
  374. if not self.checkBigfile(inner_path):
  375. return False
  376. self.site.needFile(inner_path, blocking=False) # Download piecemap
  377. return BigFile(self.site, inner_path, prebuffer=prebuffer)
  378. class BigFile(object):
  379. def __init__(self, site, inner_path, prebuffer=0):
  380. self.site = site
  381. self.inner_path = inner_path
  382. file_path = site.storage.getPath(inner_path)
  383. file_info = self.site.content_manager.getFileInfo(inner_path)
  384. self.piece_size = file_info["piece_size"]
  385. self.sha512 = file_info["sha512"]
  386. self.size = file_info["size"]
  387. self.prebuffer = prebuffer
  388. self.read_bytes = 0
  389. self.piecefield = self.site.storage.piecefields[self.sha512]
  390. self.f = open(file_path, "rb+")
  391. self.read_lock = gevent.lock.Semaphore()
  392. def read(self, buff=64 * 1024):
  393. with self.read_lock:
  394. pos = self.f.tell()
  395. read_until = min(self.size, pos + buff)
  396. requests = []
  397. # Request all required blocks
  398. while 1:
  399. piece_i = pos / self.piece_size
  400. if piece_i * self.piece_size >= read_until:
  401. break
  402. pos_from = piece_i * self.piece_size
  403. pos_to = pos_from + self.piece_size
  404. if not self.piecefield[piece_i]:
  405. requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
  406. pos += self.piece_size
  407. if not all(requests):
  408. return None
  409. # Request prebuffer
  410. if self.prebuffer:
  411. prebuffer_until = min(self.size, read_until + self.prebuffer)
  412. priority = 3
  413. while 1:
  414. piece_i = pos / self.piece_size
  415. if piece_i * self.piece_size >= prebuffer_until:
  416. break
  417. pos_from = piece_i * self.piece_size
  418. pos_to = pos_from + self.piece_size
  419. if not self.piecefield[piece_i]:
  420. self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
  421. priority -= 1
  422. pos += self.piece_size
  423. gevent.joinall(requests)
  424. self.read_bytes += buff
  425. # Increase buffer for long reads
  426. if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
  427. self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
  428. self.prebuffer = 5 * 1024 * 1024
  429. return self.f.read(buff)
  430. def seek(self, pos, whence=0):
  431. with self.read_lock:
  432. if whence == 2: # Relative from file end
  433. pos = self.size + pos # Use the real size instead of size on the disk
  434. whence = 0
  435. return self.f.seek(pos, whence)
  436. def tell(self):
  437. return self.f.tell()
  438. def close(self):
  439. self.f.close()
  440. def __enter__(self):
  441. return self
  442. def __exit__(self, exc_type, exc_val, exc_tb):
  443. self.close()
  444. @PluginManager.registerTo("WorkerManager")
  445. class WorkerManagerPlugin(object):
  446. def addTask(self, inner_path, *args, **kwargs):
  447. file_info = kwargs.get("file_info")
  448. if file_info and "piecemap" in file_info: # Bigfile
  449. self.site.settings["has_bigfile"] = True
  450. piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
  451. piecemap_task = None
  452. if not self.site.storage.isFile(piecemap_inner_path):
  453. # Start download piecemap
  454. piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30)
  455. autodownload_bigfile_size_limit = self.site.settings.get("autodownload_bigfile_size_limit", config.autodownload_bigfile_size_limit)
  456. if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= autodownload_bigfile_size_limit:
  457. gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces
  458. if "|" in inner_path:
  459. # Start download piece
  460. task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
  461. inner_path, file_range = inner_path.split("|")
  462. pos_from, pos_to = map(int, file_range.split("-"))
  463. task["piece_i"] = pos_from / file_info["piece_size"]
  464. task["sha512"] = file_info["sha512"]
  465. else:
  466. if inner_path in self.site.bad_files:
  467. del self.site.bad_files[inner_path]
  468. if piecemap_task:
  469. task = piecemap_task
  470. else:
  471. fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified
  472. fake_evt.set(True)
  473. task = {"evt": fake_evt}
  474. if not self.site.storage.isFile(inner_path):
  475. self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"])
  476. piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
  477. self.site.storage.piecefields[file_info["sha512"]].fromstring("0" * piece_num)
  478. else:
  479. task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
  480. return task
  481. def taskAddPeer(self, task, peer):
  482. if "piece_i" in task:
  483. if not peer.piecefields[task["sha512"]][task["piece_i"]]:
  484. if task["sha512"] not in peer.piecefields:
  485. gevent.spawn(peer.updatePiecefields, force=True)
  486. elif not task["peers"]:
  487. gevent.spawn(peer.updatePiecefields)
  488. return False # Deny to add peers to task if file not in piecefield
  489. return super(WorkerManagerPlugin, self).taskAddPeer(task, peer)
  490. @PluginManager.registerTo("FileRequest")
  491. class FileRequestPlugin(object):
  492. def isReadable(self, site, inner_path, file, pos):
  493. # Peek into file
  494. if file.read(10) == "\0" * 10:
  495. # Looks empty, but makes sures we don't have that piece
  496. file_info = site.content_manager.getFileInfo(inner_path)
  497. piece_i = pos / file_info["piece_size"]
  498. if not site.storage.piecefields[file_info["sha512"]][piece_i]:
  499. return False
  500. # Seek back to position we want to read
  501. file.seek(pos)
  502. return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos)
  503. def actionGetPiecefields(self, params):
  504. site = self.sites.get(params["site"])
  505. if not site or not site.settings["serving"]: # Site unknown or not serving
  506. self.response({"error": "Unknown site"})
  507. return False
  508. # Add peer to site if not added before
  509. peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
  510. if not peer.connection: # Just added
  511. peer.connect(self.connection) # Assign current connection to peer
  512. piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.iteritems()}
  513. self.response({"piecefields_packed": piecefields_packed})
  514. def actionSetPiecefields(self, params):
  515. site = self.sites.get(params["site"])
  516. if not site or not site.settings["serving"]: # Site unknown or not serving
  517. self.response({"error": "Unknown site"})
  518. self.connection.badAction(5)
  519. return False
  520. # Add or get peer
  521. peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection)
  522. if not peer.connection:
  523. peer.connect(self.connection)
  524. peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  525. for sha512, piecefield_packed in params["piecefields_packed"].iteritems():
  526. peer.piecefields[sha512].unpack(piecefield_packed)
  527. site.settings["has_bigfile"] = True
  528. self.response({"ok": "Updated"})
  529. @PluginManager.registerTo("Peer")
  530. class PeerPlugin(object):
  531. def __getattr__(self, key):
  532. if key == "piecefields":
  533. self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  534. return self.piecefields
  535. elif key == "time_piecefields_updated":
  536. self.time_piecefields_updated = None
  537. return self.time_piecefields_updated
  538. else:
  539. return super(PeerPlugin, self).__getattr__(key)
  540. @util.Noparallel(ignore_args=True)
  541. def updatePiecefields(self, force=False):
  542. if self.connection and self.connection.handshake.get("rev", 0) < 2190:
  543. return False # Not supported
  544. # Don't update piecefield again in 1 min
  545. if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force:
  546. return False
  547. self.time_piecefields_updated = time.time()
  548. res = self.request("getPiecefields", {"site": self.site.address})
  549. if not res or "error" in res:
  550. return False
  551. self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  552. try:
  553. for sha512, piecefield_packed in res["piecefields_packed"].iteritems():
  554. self.piecefields[sha512].unpack(piecefield_packed)
  555. except Exception as err:
  556. self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err))
  557. return self.piecefields
  558. def sendMyHashfield(self, *args, **kwargs):
  559. return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs)
  560. def updateHashfield(self, *args, **kwargs):
  561. if self.site.settings.get("has_bigfile"):
  562. thread = gevent.spawn(self.updatePiecefields, *args, **kwargs)
  563. back = super(PeerPlugin, self).updateHashfield(*args, **kwargs)
  564. thread.join()
  565. return back
  566. else:
  567. return super(PeerPlugin, self).updateHashfield(*args, **kwargs)
  568. def getFile(self, site, inner_path, *args, **kwargs):
  569. if "|" in inner_path:
  570. inner_path, file_range = inner_path.split("|")
  571. pos_from, pos_to = map(int, file_range.split("-"))
  572. kwargs["pos_from"] = pos_from
  573. kwargs["pos_to"] = pos_to
  574. return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs)
  575. @PluginManager.registerTo("Site")
  576. class SitePlugin(object):
  577. def isFileDownloadAllowed(self, inner_path, file_info):
  578. if "piecemap" in file_info:
  579. file_info = file_info.copy()
  580. file_info["size"] = file_info["piece_size"]
  581. return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info)
  582. def getSettingsCache(self):
  583. back = super(SitePlugin, self).getSettingsCache()
  584. if self.storage.piecefields:
  585. back["piecefields"] = {sha512: piecefield.pack().encode("base64") for sha512, piecefield in self.storage.piecefields.iteritems()}
  586. return back
  587. def needFile(self, inner_path, *args, **kwargs):
  588. if inner_path.endswith("|all"):
  589. @util.Pooled(20)
  590. def pooledNeedBigfile(inner_path, *args, **kwargs):
  591. if inner_path not in self.bad_files:
  592. self.log.debug("Cancelled piece, skipping %s" % inner_path)
  593. return False
  594. return self.needFile(inner_path, *args, **kwargs)
  595. inner_path = inner_path.replace("|all", "")
  596. file_info = self.needFileInfo(inner_path)
  597. file_size = file_info["size"]
  598. piece_size = file_info["piece_size"]
  599. piece_num = int(math.ceil(float(file_size) / piece_size))
  600. file_threads = []
  601. piecefield = self.storage.piecefields.get(file_info["sha512"])
  602. for piece_i in range(piece_num):
  603. piece_from = piece_i * piece_size
  604. piece_to = min(file_size, piece_from + piece_size)
  605. if not piecefield or not piecefield[piece_i]:
  606. inner_path_piece = "%s|%s-%s" % (inner_path, piece_from, piece_to)
  607. self.bad_files[inner_path_piece] = self.bad_files.get(inner_path_piece, 1)
  608. res = pooledNeedBigfile(inner_path_piece, blocking=False)
  609. if res is not True and res is not False:
  610. file_threads.append(res)
  611. gevent.joinall(file_threads)
  612. else:
  613. return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
  614. @PluginManager.registerTo("ConfigPlugin")
  615. class ConfigPlugin(object):
  616. def createArguments(self):
  617. group = self.parser.add_argument_group("Bigfile plugin")
  618. group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles smaller than this limit if help distribute option is checked', default=1, metavar="MB", type=int)
  619. return super(ConfigPlugin, self).createArguments()