BigfilePlugin.py 32 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752
  1. import time
  2. import os
  3. import subprocess
  4. import shutil
  5. import collections
  6. import math
  7. import msgpack
  8. import gevent
  9. import gevent.lock
  10. from Plugin import PluginManager
  11. from Debug import Debug
  12. from Crypt import CryptHash
  13. from lib import merkletools
  14. from util import helper
  15. import util
  16. from BigfilePiecefield import BigfilePiecefield, BigfilePiecefieldPacked
  17. # We can only import plugin host clases after the plugins are loaded
  18. @PluginManager.afterLoad
  19. def importPluginnedClasses():
  20. global VerifyError, config
  21. from Content.ContentManager import VerifyError
  22. from Config import config
  23. if "upload_nonces" not in locals():
  24. upload_nonces = {}
  25. @PluginManager.registerTo("UiRequest")
  26. class UiRequestPlugin(object):
  27. def isCorsAllowed(self, path):
  28. if path == "/ZeroNet-Internal/BigfileUpload":
  29. return True
  30. else:
  31. return super(UiRequestPlugin, self).isCorsAllowed(path)
  32. def actionBigfileUpload(self):
  33. nonce = self.get.get("upload_nonce")
  34. if nonce not in upload_nonces:
  35. return self.error403("Upload nonce error.")
  36. upload_info = upload_nonces[nonce]
  37. del upload_nonces[nonce]
  38. self.sendHeader(200, "text/html", noscript=True, extra_headers={
  39. "Access-Control-Allow-Origin": "null",
  40. "Access-Control-Allow-Credentials": "true"
  41. })
  42. self.readMultipartHeaders(self.env['wsgi.input']) # Skip http headers
  43. site = upload_info["site"]
  44. inner_path = upload_info["inner_path"]
  45. with site.storage.open(inner_path, "wb", create_dirs=True) as out_file:
  46. merkle_root, piece_size, piecemap_info = site.content_manager.hashBigfile(
  47. self.env['wsgi.input'], upload_info["size"], upload_info["piece_size"], out_file
  48. )
  49. if len(piecemap_info["sha512_pieces"]) == 1: # Small file, don't split
  50. hash = piecemap_info["sha512_pieces"][0].encode("hex")
  51. hash_id = site.content_manager.hashfield.getHashId(hash)
  52. site.content_manager.optionalDownloaded(inner_path, hash_id, upload_info["size"], own=True)
  53. else: # Big file
  54. file_name = helper.getFilename(inner_path)
  55. msgpack.pack({file_name: piecemap_info}, site.storage.open(upload_info["piecemap"], "wb"))
  56. # Find piecemap and file relative path to content.json
  57. file_info = site.content_manager.getFileInfo(inner_path, new_file=True)
  58. content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
  59. piecemap_relative_path = upload_info["piecemap"][len(content_inner_path_dir):]
  60. file_relative_path = inner_path[len(content_inner_path_dir):]
  61. # Add file to content.json
  62. if site.storage.isFile(file_info["content_inner_path"]):
  63. content = site.storage.loadJson(file_info["content_inner_path"])
  64. else:
  65. content = {}
  66. if "files_optional" not in content:
  67. content["files_optional"] = {}
  68. content["files_optional"][file_relative_path] = {
  69. "sha512": merkle_root,
  70. "size": upload_info["size"],
  71. "piecemap": piecemap_relative_path,
  72. "piece_size": piece_size
  73. }
  74. merkle_root_hash_id = site.content_manager.hashfield.getHashId(merkle_root)
  75. site.content_manager.optionalDownloaded(inner_path, merkle_root_hash_id, upload_info["size"], own=True)
  76. site.storage.writeJson(file_info["content_inner_path"], content)
  77. site.content_manager.contents.loadItem(file_info["content_inner_path"]) # reload cache
  78. return {
  79. "merkle_root": merkle_root,
  80. "piece_num": len(piecemap_info["sha512_pieces"]),
  81. "piece_size": piece_size,
  82. "inner_path": inner_path
  83. }
  84. def readMultipartHeaders(self, wsgi_input):
  85. for i in range(100):
  86. line = wsgi_input.readline()
  87. if line == "\r\n":
  88. break
  89. return i
  90. def actionFile(self, file_path, *args, **kwargs):
  91. if kwargs.get("file_size", 0) > 1024 * 1024 and kwargs.get("path_parts"): # Only check files larger than 1MB
  92. path_parts = kwargs["path_parts"]
  93. site = self.server.site_manager.get(path_parts["address"])
  94. big_file = site.storage.openBigfile(path_parts["inner_path"], prebuffer=2 * 1024 * 1024)
  95. if big_file:
  96. kwargs["file_obj"] = big_file
  97. kwargs["file_size"] = big_file.size
  98. return super(UiRequestPlugin, self).actionFile(file_path, *args, **kwargs)
  99. @PluginManager.registerTo("UiWebsocket")
  100. class UiWebsocketPlugin(object):
  101. def actionBigfileUploadInit(self, to, inner_path, size):
  102. valid_signers = self.site.content_manager.getValidSigners(inner_path)
  103. auth_address = self.user.getAuthAddress(self.site.address)
  104. if not self.site.settings["own"] and auth_address not in valid_signers:
  105. self.log.error("FileWrite forbidden %s not in valid_signers %s" % (auth_address, valid_signers))
  106. return self.response(to, {"error": "Forbidden, you can only modify your own files"})
  107. nonce = CryptHash.random()
  108. piece_size = 1024 * 1024
  109. inner_path = self.site.content_manager.sanitizePath(inner_path)
  110. file_info = self.site.content_manager.getFileInfo(inner_path, new_file=True)
  111. content_inner_path_dir = helper.getDirname(file_info["content_inner_path"])
  112. file_relative_path = inner_path[len(content_inner_path_dir):]
  113. upload_nonces[nonce] = {
  114. "added": time.time(),
  115. "site": self.site,
  116. "inner_path": inner_path,
  117. "websocket_client": self,
  118. "size": size,
  119. "piece_size": piece_size,
  120. "piecemap": inner_path + ".piecemap.msgpack"
  121. }
  122. return {
  123. "url": "/ZeroNet-Internal/BigfileUpload?upload_nonce=" + nonce,
  124. "piece_size": piece_size,
  125. "inner_path": inner_path,
  126. "file_relative_path": file_relative_path
  127. }
  128. def actionSiteSetAutodownloadBigfileLimit(self, to, limit):
  129. permissions = self.getPermissions(to)
  130. if "ADMIN" not in permissions:
  131. return self.response(to, "You don't have permission to run this command")
  132. self.site.settings["autodownload_bigfile_size_limit"] = int(limit)
  133. self.response(to, "ok")
  134. def actionFileDelete(self, to, inner_path):
  135. piecemap_inner_path = inner_path + ".piecemap.msgpack"
  136. if self.hasFilePermission(inner_path) and self.site.storage.isFile(piecemap_inner_path):
  137. # Also delete .piecemap.msgpack file if exists
  138. self.log.debug("Deleting piecemap: %s" % piecemap_inner_path)
  139. file_info = self.site.content_manager.getFileInfo(piecemap_inner_path)
  140. if file_info:
  141. content_json = self.site.storage.loadJson(file_info["content_inner_path"])
  142. relative_path = file_info["relative_path"]
  143. if relative_path in content_json.get("files_optional", {}):
  144. del content_json["files_optional"][relative_path]
  145. self.site.storage.writeJson(file_info["content_inner_path"], content_json)
  146. self.site.content_manager.loadContent(file_info["content_inner_path"], add_bad_files=False, force=True)
  147. try:
  148. self.site.storage.delete(piecemap_inner_path)
  149. except Exception, err:
  150. self.log.error("File %s delete error: %s" % (piecemap_inner_path, err))
  151. return super(UiWebsocketPlugin, self).actionFileDelete(to, inner_path)
  152. @PluginManager.registerTo("ContentManager")
  153. class ContentManagerPlugin(object):
  154. def getFileInfo(self, inner_path, *args, **kwargs):
  155. if "|" not in inner_path:
  156. return super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
  157. inner_path, file_range = inner_path.split("|")
  158. pos_from, pos_to = map(int, file_range.split("-"))
  159. file_info = super(ContentManagerPlugin, self).getFileInfo(inner_path, *args, **kwargs)
  160. return file_info
  161. def readFile(self, file_in, size, buff_size=1024 * 64):
  162. part_num = 0
  163. recv_left = size
  164. while 1:
  165. part_num += 1
  166. read_size = min(buff_size, recv_left)
  167. part = file_in.read(read_size)
  168. if not part:
  169. break
  170. yield part
  171. if part_num % 100 == 0: # Avoid blocking ZeroNet execution during upload
  172. time.sleep(0.001)
  173. recv_left -= read_size
  174. if recv_left <= 0:
  175. break
  176. def hashBigfile(self, file_in, size, piece_size=1024 * 1024, file_out=None):
  177. self.site.settings["has_bigfile"] = True
  178. recv = 0
  179. try:
  180. piece_hash = CryptHash.sha512t()
  181. piece_hashes = []
  182. piece_recv = 0
  183. mt = merkletools.MerkleTools()
  184. mt.hash_function = CryptHash.sha512t
  185. part = ""
  186. for part in self.readFile(file_in, size):
  187. if file_out:
  188. file_out.write(part)
  189. recv += len(part)
  190. piece_recv += len(part)
  191. piece_hash.update(part)
  192. if piece_recv >= piece_size:
  193. piece_digest = piece_hash.digest()
  194. piece_hashes.append(piece_digest)
  195. mt.leaves.append(piece_digest)
  196. piece_hash = CryptHash.sha512t()
  197. piece_recv = 0
  198. if len(piece_hashes) % 100 == 0 or recv == size:
  199. self.log.info("- [HASHING:%.0f%%] Pieces: %s, %.1fMB/%.1fMB" % (
  200. float(recv) / size * 100, len(piece_hashes), recv / 1024 / 1024, size / 1024 / 1024
  201. ))
  202. part = ""
  203. if len(part) > 0:
  204. piece_digest = piece_hash.digest()
  205. piece_hashes.append(piece_digest)
  206. mt.leaves.append(piece_digest)
  207. except Exception as err:
  208. raise err
  209. finally:
  210. if file_out:
  211. file_out.close()
  212. mt.make_tree()
  213. return mt.get_merkle_root(), piece_size, {
  214. "sha512_pieces": piece_hashes
  215. }
  216. def hashFile(self, dir_inner_path, file_relative_path, optional=False):
  217. inner_path = dir_inner_path + file_relative_path
  218. file_size = self.site.storage.getSize(inner_path)
  219. # Only care about optional files >1MB
  220. if not optional or file_size < 1 * 1024 * 1024:
  221. return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
  222. back = {}
  223. content = self.contents.get(dir_inner_path + "content.json")
  224. hash = None
  225. piecemap_relative_path = None
  226. piece_size = None
  227. # Don't re-hash if it's already in content.json
  228. if content and file_relative_path in content.get("files_optional", {}):
  229. file_node = content["files_optional"][file_relative_path]
  230. if file_node["size"] == file_size:
  231. self.log.info("- [SAME SIZE] %s" % file_relative_path)
  232. hash = file_node.get("sha512")
  233. piecemap_relative_path = file_node.get("piecemap")
  234. piece_size = file_node.get("piece_size")
  235. if not hash or not piecemap_relative_path: # Not in content.json yet
  236. if file_size < 5 * 1024 * 1024: # Don't create piecemap automatically for files smaller than 5MB
  237. return super(ContentManagerPlugin, self).hashFile(dir_inner_path, file_relative_path, optional)
  238. self.log.info("- [HASHING] %s" % file_relative_path)
  239. merkle_root, piece_size, piecemap_info = self.hashBigfile(self.site.storage.open(inner_path, "rb"), file_size)
  240. if not hash:
  241. hash = merkle_root
  242. if not piecemap_relative_path:
  243. file_name = helper.getFilename(file_relative_path)
  244. piecemap_relative_path = file_relative_path + ".piecemap.msgpack"
  245. piecemap_inner_path = inner_path + ".piecemap.msgpack"
  246. msgpack.pack({file_name: piecemap_info}, self.site.storage.open(piecemap_inner_path, "wb"))
  247. back.update(super(ContentManagerPlugin, self).hashFile(dir_inner_path, piecemap_relative_path, optional=True))
  248. piece_num = int(math.ceil(float(file_size) / piece_size))
  249. # Add the merkle root to hashfield
  250. hash_id = self.site.content_manager.hashfield.getHashId(hash)
  251. self.optionalDownloaded(inner_path, hash_id, file_size, own=True)
  252. self.site.storage.piecefields[hash].fromstring("1" * piece_num)
  253. back[file_relative_path] = {"sha512": hash, "size": file_size, "piecemap": piecemap_relative_path, "piece_size": piece_size}
  254. return back
  255. def getPiecemap(self, inner_path):
  256. file_info = self.site.content_manager.getFileInfo(inner_path)
  257. piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
  258. self.site.needFile(piecemap_inner_path, priority=20)
  259. piecemap = msgpack.unpack(self.site.storage.open(piecemap_inner_path))[helper.getFilename(inner_path)]
  260. piecemap["piece_size"] = file_info["piece_size"]
  261. return piecemap
  262. def verifyPiece(self, inner_path, pos, piece):
  263. piecemap = self.getPiecemap(inner_path)
  264. piece_i = pos / piecemap["piece_size"]
  265. if CryptHash.sha512sum(piece, format="digest") != piecemap["sha512_pieces"][piece_i]:
  266. raise VerifyError("Invalid hash")
  267. return True
  268. def verifyFile(self, inner_path, file, ignore_same=True):
  269. if "|" not in inner_path:
  270. return super(ContentManagerPlugin, self).verifyFile(inner_path, file, ignore_same)
  271. inner_path, file_range = inner_path.split("|")
  272. pos_from, pos_to = map(int, file_range.split("-"))
  273. return self.verifyPiece(inner_path, pos_from, file)
  274. def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
  275. if "|" in inner_path:
  276. inner_path, file_range = inner_path.split("|")
  277. pos_from, pos_to = map(int, file_range.split("-"))
  278. file_info = self.getFileInfo(inner_path)
  279. # Mark piece downloaded
  280. piece_i = pos_from / file_info["piece_size"]
  281. self.site.storage.piecefields[file_info["sha512"]][piece_i] = True
  282. # Only add to site size on first request
  283. if hash_id in self.hashfield:
  284. size = 0
  285. elif size > 1024 * 1024:
  286. file_info = self.getFileInfo(inner_path)
  287. if file_info and "sha512" in file_info: # We already have the file, but not in piecefield
  288. sha512 = file_info["sha512"]
  289. if sha512 not in self.site.storage.piecefields:
  290. self.site.storage.checkBigfile(inner_path)
  291. return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
  292. def optionalRemoved(self, inner_path, hash_id, size=None):
  293. if size and size > 1024 * 1024:
  294. file_info = self.getFileInfo(inner_path)
  295. sha512 = file_info["sha512"]
  296. if sha512 in self.site.storage.piecefields:
  297. del self.site.storage.piecefields[sha512]
  298. # Also remove other pieces of the file from download queue
  299. for key in self.site.bad_files.keys():
  300. if key.startswith(inner_path + "|"):
  301. del self.site.bad_files[key]
  302. return super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
  303. @PluginManager.registerTo("SiteStorage")
  304. class SiteStoragePlugin(object):
  305. def __init__(self, *args, **kwargs):
  306. super(SiteStoragePlugin, self).__init__(*args, **kwargs)
  307. self.piecefields = collections.defaultdict(BigfilePiecefield)
  308. if "piecefields" in self.site.settings.get("cache", {}):
  309. for sha512, piecefield_packed in self.site.settings["cache"].get("piecefields").iteritems():
  310. if piecefield_packed:
  311. self.piecefields[sha512].unpack(piecefield_packed.decode("base64"))
  312. self.site.settings["cache"]["piecefields"] = {}
  313. def createSparseFile(self, inner_path, size, sha512=None):
  314. file_path = self.getPath(inner_path)
  315. file_dir = os.path.dirname(file_path)
  316. if not os.path.isdir(file_dir):
  317. os.makedirs(file_dir)
  318. f = open(file_path, 'wb')
  319. f.truncate(min(1024 * 1024 * 5, size)) # Only pre-allocate up to 5MB
  320. f.close()
  321. if os.name == "nt":
  322. startupinfo = subprocess.STARTUPINFO()
  323. startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
  324. subprocess.call(["fsutil", "sparse", "setflag", file_path], close_fds=True, startupinfo=startupinfo)
  325. if sha512 and sha512 in self.piecefields:
  326. self.log.debug("%s: File not exists, but has piecefield. Deleting piecefield." % inner_path)
  327. del self.piecefields[sha512]
  328. def write(self, inner_path, content):
  329. if "|" not in inner_path:
  330. return super(SiteStoragePlugin, self).write(inner_path, content)
  331. # Write to specific position by passing |{pos} after the filename
  332. inner_path, file_range = inner_path.split("|")
  333. pos_from, pos_to = map(int, file_range.split("-"))
  334. file_path = self.getPath(inner_path)
  335. # Create dir if not exist
  336. file_dir = os.path.dirname(file_path)
  337. if not os.path.isdir(file_dir):
  338. os.makedirs(file_dir)
  339. if not os.path.isfile(file_path):
  340. file_info = self.site.content_manager.getFileInfo(inner_path)
  341. self.createSparseFile(inner_path, file_info["size"])
  342. # Write file
  343. with open(file_path, "rb+") as file:
  344. file.seek(pos_from)
  345. if hasattr(content, 'read'): # File-like object
  346. shutil.copyfileobj(content, file) # Write buff to disk
  347. else: # Simple string
  348. file.write(content)
  349. del content
  350. self.onUpdated(inner_path)
  351. def checkBigfile(self, inner_path):
  352. file_info = self.site.content_manager.getFileInfo(inner_path)
  353. if not file_info or (file_info and "piecemap" not in file_info): # It's not a big file
  354. return False
  355. self.site.settings["has_bigfile"] = True
  356. file_path = self.getPath(inner_path)
  357. sha512 = file_info["sha512"]
  358. piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
  359. if os.path.isfile(file_path):
  360. if sha512 not in self.piecefields:
  361. if open(file_path).read(128) == "\0" * 128:
  362. piece_data = "0"
  363. else:
  364. piece_data = "1"
  365. self.log.debug("%s: File exists, but not in piecefield. Filling piecefiled with %s * %s." % (inner_path, piece_num, piece_data))
  366. self.piecefields[sha512].fromstring(piece_data * piece_num)
  367. else:
  368. self.log.debug("Creating bigfile: %s" % inner_path)
  369. self.createSparseFile(inner_path, file_info["size"], sha512)
  370. self.piecefields[sha512].fromstring("0" * piece_num)
  371. return True
  372. def openBigfile(self, inner_path, prebuffer=0):
  373. if not self.checkBigfile(inner_path):
  374. return False
  375. self.site.needFile(inner_path, blocking=False) # Download piecemap
  376. return BigFile(self.site, inner_path, prebuffer=prebuffer)
  377. class BigFile(object):
  378. def __init__(self, site, inner_path, prebuffer=0):
  379. self.site = site
  380. self.inner_path = inner_path
  381. file_path = site.storage.getPath(inner_path)
  382. file_info = self.site.content_manager.getFileInfo(inner_path)
  383. self.piece_size = file_info["piece_size"]
  384. self.sha512 = file_info["sha512"]
  385. self.size = file_info["size"]
  386. self.prebuffer = prebuffer
  387. self.read_bytes = 0
  388. self.piecefield = self.site.storage.piecefields[self.sha512]
  389. self.f = open(file_path, "rb+")
  390. self.read_lock = gevent.lock.Semaphore()
  391. def read(self, buff=64 * 1024):
  392. with self.read_lock:
  393. pos = self.f.tell()
  394. read_until = min(self.size, pos + buff)
  395. requests = []
  396. # Request all required blocks
  397. while 1:
  398. piece_i = pos / self.piece_size
  399. if piece_i * self.piece_size >= read_until:
  400. break
  401. pos_from = piece_i * self.piece_size
  402. pos_to = pos_from + self.piece_size
  403. if not self.piecefield[piece_i]:
  404. requests.append(self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=10))
  405. pos += self.piece_size
  406. if not all(requests):
  407. return None
  408. # Request prebuffer
  409. if self.prebuffer:
  410. prebuffer_until = min(self.size, read_until + self.prebuffer)
  411. priority = 3
  412. while 1:
  413. piece_i = pos / self.piece_size
  414. if piece_i * self.piece_size >= prebuffer_until:
  415. break
  416. pos_from = piece_i * self.piece_size
  417. pos_to = pos_from + self.piece_size
  418. if not self.piecefield[piece_i]:
  419. self.site.needFile("%s|%s-%s" % (self.inner_path, pos_from, pos_to), blocking=False, update=True, priority=max(0, priority))
  420. priority -= 1
  421. pos += self.piece_size
  422. gevent.joinall(requests)
  423. self.read_bytes += buff
  424. # Increase buffer for long reads
  425. if self.read_bytes > 7 * 1024 * 1024 and self.prebuffer < 5 * 1024 * 1024:
  426. self.site.log.debug("%s: Increasing bigfile buffer size to 5MB..." % self.inner_path)
  427. self.prebuffer = 5 * 1024 * 1024
  428. return self.f.read(buff)
  429. def seek(self, pos, whence=0):
  430. with self.read_lock:
  431. if whence == 2: # Relative from file end
  432. pos = self.size + pos # Use the real size instead of size on the disk
  433. whence = 0
  434. return self.f.seek(pos, whence)
  435. def tell(self):
  436. return self.f.tell()
  437. def close(self):
  438. self.f.close()
  439. def __enter__(self):
  440. return self
  441. def __exit__(self, exc_type, exc_val, exc_tb):
  442. self.close()
  443. @PluginManager.registerTo("WorkerManager")
  444. class WorkerManagerPlugin(object):
  445. def addTask(self, inner_path, *args, **kwargs):
  446. file_info = kwargs.get("file_info")
  447. if file_info and "piecemap" in file_info: # Bigfile
  448. self.site.settings["has_bigfile"] = True
  449. piecemap_inner_path = helper.getDirname(file_info["content_inner_path"]) + file_info["piecemap"]
  450. piecemap_task = None
  451. if not self.site.storage.isFile(piecemap_inner_path):
  452. # Start download piecemap
  453. piecemap_task = super(WorkerManagerPlugin, self).addTask(piecemap_inner_path, priority=30)
  454. autodownload_bigfile_size_limit = self.site.settings.get("autodownload_bigfile_size_limit", config.autodownload_bigfile_size_limit)
  455. if "|" not in inner_path and self.site.isDownloadable(inner_path) and file_info["size"] / 1024 / 1024 <= autodownload_bigfile_size_limit:
  456. gevent.spawn_later(0.1, self.site.needFile, inner_path + "|all") # Download all pieces
  457. if "|" in inner_path:
  458. # Start download piece
  459. task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
  460. inner_path, file_range = inner_path.split("|")
  461. pos_from, pos_to = map(int, file_range.split("-"))
  462. task["piece_i"] = pos_from / file_info["piece_size"]
  463. task["sha512"] = file_info["sha512"]
  464. else:
  465. if inner_path in self.site.bad_files:
  466. del self.site.bad_files[inner_path]
  467. if piecemap_task:
  468. task = piecemap_task
  469. else:
  470. fake_evt = gevent.event.AsyncResult() # Don't download anything if no range specified
  471. fake_evt.set(True)
  472. task = {"evt": fake_evt}
  473. if not self.site.storage.isFile(inner_path):
  474. self.site.storage.createSparseFile(inner_path, file_info["size"], file_info["sha512"])
  475. piece_num = int(math.ceil(float(file_info["size"]) / file_info["piece_size"]))
  476. self.site.storage.piecefields[file_info["sha512"]].fromstring("0" * piece_num)
  477. else:
  478. task = super(WorkerManagerPlugin, self).addTask(inner_path, *args, **kwargs)
  479. return task
  480. def taskAddPeer(self, task, peer):
  481. if "piece_i" in task:
  482. if not peer.piecefields[task["sha512"]][task["piece_i"]]:
  483. if task["sha512"] not in peer.piecefields:
  484. gevent.spawn(peer.updatePiecefields, force=True)
  485. elif not task["peers"]:
  486. gevent.spawn(peer.updatePiecefields)
  487. return False # Deny to add peers to task if file not in piecefield
  488. return super(WorkerManagerPlugin, self).taskAddPeer(task, peer)
  489. @PluginManager.registerTo("FileRequest")
  490. class FileRequestPlugin(object):
  491. def isReadable(self, site, inner_path, file, pos):
  492. # Peek into file
  493. if file.read(10) == "\0" * 10:
  494. # Looks empty, but makes sures we don't have that piece
  495. file_info = site.content_manager.getFileInfo(inner_path)
  496. piece_i = pos / file_info["piece_size"]
  497. if not site.storage.piecefields[file_info["sha512"]][piece_i]:
  498. return False
  499. # Seek back to position we want to read
  500. file.seek(pos)
  501. return super(FileRequestPlugin, self).isReadable(site, inner_path, file, pos)
  502. def actionGetPiecefields(self, params):
  503. site = self.sites.get(params["site"])
  504. if not site or not site.settings["serving"]: # Site unknown or not serving
  505. self.response({"error": "Unknown site"})
  506. return False
  507. # Add peer to site if not added before
  508. peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True)
  509. if not peer.connection: # Just added
  510. peer.connect(self.connection) # Assign current connection to peer
  511. piecefields_packed = {sha512: piecefield.pack() for sha512, piecefield in site.storage.piecefields.iteritems()}
  512. self.response({"piecefields_packed": piecefields_packed})
  513. def actionSetPiecefields(self, params):
  514. site = self.sites.get(params["site"])
  515. if not site or not site.settings["serving"]: # Site unknown or not serving
  516. self.response({"error": "Unknown site"})
  517. self.connection.badAction(5)
  518. return False
  519. # Add or get peer
  520. peer = site.addPeer(self.connection.ip, self.connection.port, return_peer=True, connection=self.connection)
  521. if not peer.connection:
  522. peer.connect(self.connection)
  523. peer.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  524. for sha512, piecefield_packed in params["piecefields_packed"].iteritems():
  525. peer.piecefields[sha512].unpack(piecefield_packed)
  526. site.settings["has_bigfile"] = True
  527. self.response({"ok": "Updated"})
  528. @PluginManager.registerTo("Peer")
  529. class PeerPlugin(object):
  530. def __getattr__(self, key):
  531. if key == "piecefields":
  532. self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  533. return self.piecefields
  534. elif key == "time_piecefields_updated":
  535. self.time_piecefields_updated = None
  536. return self.time_piecefields_updated
  537. else:
  538. return super(PeerPlugin, self).__getattr__(key)
  539. @util.Noparallel(ignore_args=True)
  540. def updatePiecefields(self, force=False):
  541. if self.connection and self.connection.handshake.get("rev", 0) < 2190:
  542. return False # Not supported
  543. # Don't update piecefield again in 1 min
  544. if self.time_piecefields_updated and time.time() - self.time_piecefields_updated < 60 and not force:
  545. return False
  546. self.time_piecefields_updated = time.time()
  547. res = self.request("getPiecefields", {"site": self.site.address})
  548. if not res or "error" in res:
  549. return False
  550. self.piecefields = collections.defaultdict(BigfilePiecefieldPacked)
  551. try:
  552. for sha512, piecefield_packed in res["piecefields_packed"].iteritems():
  553. self.piecefields[sha512].unpack(piecefield_packed)
  554. except Exception as err:
  555. self.log("Invalid updatePiecefields response: %s" % Debug.formatException(err))
  556. return self.piecefields
  557. def sendMyHashfield(self, *args, **kwargs):
  558. return super(PeerPlugin, self).sendMyHashfield(*args, **kwargs)
  559. def updateHashfield(self, *args, **kwargs):
  560. if self.site.settings.get("has_bigfile"):
  561. thread = gevent.spawn(self.updatePiecefields, *args, **kwargs)
  562. back = super(PeerPlugin, self).updateHashfield(*args, **kwargs)
  563. thread.join()
  564. return back
  565. else:
  566. return super(PeerPlugin, self).updateHashfield(*args, **kwargs)
  567. def getFile(self, site, inner_path, *args, **kwargs):
  568. if "|" in inner_path:
  569. inner_path, file_range = inner_path.split("|")
  570. pos_from, pos_to = map(int, file_range.split("-"))
  571. kwargs["pos_from"] = pos_from
  572. kwargs["pos_to"] = pos_to
  573. return super(PeerPlugin, self).getFile(site, inner_path, *args, **kwargs)
  574. @PluginManager.registerTo("Site")
  575. class SitePlugin(object):
  576. def isFileDownloadAllowed(self, inner_path, file_info):
  577. if "piecemap" in file_info:
  578. file_info = file_info.copy()
  579. file_info["size"] = file_info["piece_size"]
  580. return super(SitePlugin, self).isFileDownloadAllowed(inner_path, file_info)
  581. def getSettingsCache(self):
  582. back = super(SitePlugin, self).getSettingsCache()
  583. if self.storage.piecefields:
  584. back["piecefields"] = {sha512: piecefield.pack().encode("base64") for sha512, piecefield in self.storage.piecefields.iteritems()}
  585. return back
  586. def needFile(self, inner_path, *args, **kwargs):
  587. if inner_path.endswith("|all"):
  588. @util.Pooled(20)
  589. def pooledNeedBigfile(*args, **kwargs):
  590. return self.needFile(*args, **kwargs)
  591. inner_path = inner_path.replace("|all", "")
  592. file_info = self.needFileInfo(inner_path)
  593. file_size = file_info["size"]
  594. piece_size = file_info["piece_size"]
  595. piece_num = int(math.ceil(float(file_size) / piece_size))
  596. file_threads = []
  597. piecefield = self.storage.piecefields.get(file_info["sha512"])
  598. for piece_i in range(piece_num):
  599. piece_from = piece_i * piece_size
  600. piece_to = min(file_size, piece_from + piece_size)
  601. if not piecefield or not piecefield[piece_i]:
  602. res = pooledNeedBigfile("%s|%s-%s" % (inner_path, piece_from, piece_to), blocking=False)
  603. if res is not True and res is not False:
  604. file_threads.append(res)
  605. gevent.joinall(file_threads)
  606. else:
  607. return super(SitePlugin, self).needFile(inner_path, *args, **kwargs)
  608. @PluginManager.registerTo("ConfigPlugin")
  609. class ConfigPlugin(object):
  610. def createArguments(self):
  611. group = self.parser.add_argument_group("Bigfile plugin")
  612. group.add_argument('--autodownload_bigfile_size_limit', help='Also download bigfiles smaller than this limit if help distribute option is checked', default=1, metavar="MB", type=int)
  613. return super(ConfigPlugin, self).createArguments()