ContentDbPlugin.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406
  1. import time
  2. import collections
  3. import itertools
  4. import re
  5. import gevent
  6. from util import helper
  7. from Plugin import PluginManager
  8. from Config import config
  9. from Debug import Debug
  10. if "content_db" not in locals().keys(): # To keep between module reloads
  11. content_db = None
  12. @PluginManager.registerTo("ContentDb")
  13. class ContentDbPlugin(object):
  14. def __init__(self, *args, **kwargs):
  15. global content_db
  16. content_db = self
  17. self.filled = {} # Site addresses that already filled from content.json
  18. self.need_filling = False # file_optional table just created, fill data from content.json files
  19. self.time_peer_numbers_updated = 0
  20. self.my_optional_files = {} # Last 50 site_address/inner_path called by fileWrite (auto-pinning these files)
  21. self.optional_files = collections.defaultdict(dict)
  22. self.optional_files_loading = False
  23. helper.timer(60 * 5, self.checkOptionalLimit)
  24. super(ContentDbPlugin, self).__init__(*args, **kwargs)
  25. def getSchema(self):
  26. schema = super(ContentDbPlugin, self).getSchema()
  27. # Need file_optional table
  28. schema["tables"]["file_optional"] = {
  29. "cols": [
  30. ["file_id", "INTEGER PRIMARY KEY UNIQUE NOT NULL"],
  31. ["site_id", "INTEGER REFERENCES site (site_id) ON DELETE CASCADE"],
  32. ["inner_path", "TEXT"],
  33. ["hash_id", "INTEGER"],
  34. ["size", "INTEGER"],
  35. ["peer", "INTEGER DEFAULT 0"],
  36. ["uploaded", "INTEGER DEFAULT 0"],
  37. ["is_downloaded", "INTEGER DEFAULT 0"],
  38. ["is_pinned", "INTEGER DEFAULT 0"],
  39. ["time_added", "INTEGER DEFAULT 0"],
  40. ["time_downloaded", "INTEGER DEFAULT 0"],
  41. ["time_accessed", "INTEGER DEFAULT 0"]
  42. ],
  43. "indexes": [
  44. "CREATE UNIQUE INDEX file_optional_key ON file_optional (site_id, inner_path)",
  45. "CREATE INDEX is_downloaded ON file_optional (is_downloaded)"
  46. ],
  47. "schema_changed": 11
  48. }
  49. return schema
  50. def initSite(self, site):
  51. super(ContentDbPlugin, self).initSite(site)
  52. if self.need_filling:
  53. self.fillTableFileOptional(site)
  54. if not self.optional_files_loading:
  55. gevent.spawn_later(1, self.loadFilesOptional)
  56. self.optional_files_loading = True
  57. def checkTables(self):
  58. changed_tables = super(ContentDbPlugin, self).checkTables()
  59. if "file_optional" in changed_tables:
  60. self.need_filling = True
  61. return changed_tables
  62. # Load optional files ending
  63. def loadFilesOptional(self):
  64. s = time.time()
  65. num = 0
  66. total = 0
  67. total_downloaded = 0
  68. res = content_db.execute("SELECT site_id, inner_path, size, is_downloaded FROM file_optional")
  69. site_sizes = collections.defaultdict(lambda: collections.defaultdict(int))
  70. for row in res:
  71. self.optional_files[row["site_id"]][row["inner_path"][-8:]] = 1
  72. num += 1
  73. # Update site size stats
  74. site_sizes[row["site_id"]]["size_optional"] += row["size"]
  75. if row["is_downloaded"]:
  76. site_sizes[row["site_id"]]["optional_downloaded"] += row["size"]
  77. # Site site size stats to sites.json settings
  78. site_ids_reverse = {val: key for key, val in self.site_ids.iteritems()}
  79. for site_id, stats in site_sizes.iteritems():
  80. site_address = site_ids_reverse.get(site_id)
  81. if not site_address:
  82. self.log.error("Not found site_id: %s" % site_id)
  83. continue
  84. site = self.sites[site_address]
  85. site.settings["size_optional"] = stats["size_optional"]
  86. site.settings["optional_downloaded"] = stats["optional_downloaded"]
  87. total += stats["size_optional"]
  88. total_downloaded += stats["optional_downloaded"]
  89. self.log.debug(
  90. "Loaded %s optional files: %.2fMB, downloaded: %.2fMB in %.3fs" %
  91. (num, float(total) / 1024 / 1024, float(total_downloaded) / 1024 / 1024, time.time() - s)
  92. )
  93. if self.need_filling and self.getOptionalLimitBytes() >= 0 and self.getOptionalLimitBytes() < total_downloaded:
  94. limit_bytes = self.getOptionalLimitBytes()
  95. limit_new = round((float(total_downloaded) / 1024 / 1024 / 1024) * 1.1, 2) # Current limit + 10%
  96. self.log.debug(
  97. "First startup after update and limit is smaller than downloaded files size (%.2fGB), increasing it from %.2fGB to %.2fGB" %
  98. (float(total_downloaded) / 1024 / 1024 / 1024, float(limit_bytes) / 1024 / 1024 / 1024, limit_new)
  99. )
  100. config.saveValue("optional_limit", limit_new)
  101. config.optional_limit = str(limit_new)
  102. # Predicts if the file is optional
  103. def isOptionalFile(self, site_id, inner_path):
  104. return self.optional_files[site_id].get(inner_path[-8:])
  105. # Fill file_optional table with optional files found in sites
  106. def fillTableFileOptional(self, site):
  107. s = time.time()
  108. site_id = self.site_ids.get(site.address)
  109. if not site_id:
  110. return False
  111. cur = self.getCursor()
  112. cur.execute("BEGIN")
  113. res = cur.execute("SELECT * FROM content WHERE size_files_optional > 0 AND site_id = %s" % site_id)
  114. num = 0
  115. for row in res.fetchall():
  116. content = site.content_manager.contents[row["inner_path"]]
  117. try:
  118. num += self.setContentFilesOptional(site, row["inner_path"], content, cur=cur)
  119. except Exception as err:
  120. self.log.error("Error loading %s into file_optional: %s" % (row["inner_path"], err))
  121. cur.execute("COMMIT")
  122. cur.close()
  123. # Set my files to pinned
  124. from User import UserManager
  125. user = UserManager.user_manager.get()
  126. if not user:
  127. user = UserManager.user_manager.create()
  128. auth_address = user.getAuthAddress(site.address)
  129. self.execute(
  130. "UPDATE file_optional SET is_pinned = 1 WHERE site_id = :site_id AND inner_path LIKE :inner_path",
  131. {"site_id": site_id, "inner_path": "%%/%s/%%" % auth_address}
  132. )
  133. self.log.debug(
  134. "Filled file_optional table for %s in %.3fs (loaded: %s, is_pinned: %s)" %
  135. (site.address, time.time() - s, num, self.cur.cursor.rowcount)
  136. )
  137. self.filled[site.address] = True
  138. def setContentFilesOptional(self, site, content_inner_path, content, cur=None):
  139. if not cur:
  140. cur = self
  141. try:
  142. cur.execute("BEGIN")
  143. except Exception as err:
  144. self.log.warning("Transaction begin error %s %s: %s" % (site, content_inner_path, Debug.formatException(err)))
  145. num = 0
  146. site_id = self.site_ids[site.address]
  147. content_inner_dir = helper.getDirname(content_inner_path)
  148. for relative_inner_path, file in content.get("files_optional", {}).iteritems():
  149. file_inner_path = content_inner_dir + relative_inner_path
  150. hash_id = int(file["sha512"][0:4], 16)
  151. if hash_id in site.content_manager.hashfield:
  152. is_downloaded = 1
  153. else:
  154. is_downloaded = 0
  155. if site.address + "/" + content_inner_dir in self.my_optional_files:
  156. is_pinned = 1
  157. else:
  158. is_pinned = 0
  159. cur.insertOrUpdate("file_optional", {
  160. "hash_id": hash_id,
  161. "size": int(file["size"])
  162. }, {
  163. "site_id": site_id,
  164. "inner_path": file_inner_path
  165. }, oninsert={
  166. "time_added": int(time.time()),
  167. "time_downloaded": int(time.time()) if is_downloaded else 0,
  168. "is_downloaded": is_downloaded,
  169. "peer": is_downloaded,
  170. "is_pinned": is_pinned
  171. })
  172. self.optional_files[site_id][file_inner_path[-8:]] = 1
  173. num += 1
  174. if cur == self:
  175. try:
  176. cur.execute("END")
  177. except Exception as err:
  178. self.log.warning("Transaction end error %s %s: %s" % (site, content_inner_path, Debug.formatException(err)))
  179. return num
  180. def setContent(self, site, inner_path, content, size=0):
  181. super(ContentDbPlugin, self).setContent(site, inner_path, content, size=size)
  182. old_content = site.content_manager.contents.get(inner_path, {})
  183. if (not self.need_filling or self.filled.get(site.address)) and ("files_optional" in content or "files_optional" in old_content):
  184. self.setContentFilesOptional(site, inner_path, content)
  185. # Check deleted files
  186. if old_content:
  187. old_files = old_content.get("files_optional", {}).keys()
  188. new_files = content.get("files_optional", {}).keys()
  189. content_inner_dir = helper.getDirname(inner_path)
  190. deleted = [content_inner_dir + key for key in old_files if key not in new_files]
  191. if deleted:
  192. site_id = self.site_ids[site.address]
  193. self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": deleted})
  194. def deleteContent(self, site, inner_path):
  195. content = site.content_manager.contents.get(inner_path)
  196. if content and "files_optional" in content:
  197. site_id = self.site_ids[site.address]
  198. content_inner_dir = helper.getDirname(inner_path)
  199. optional_inner_paths = [
  200. content_inner_dir + relative_inner_path
  201. for relative_inner_path in content.get("files_optional", {}).keys()
  202. ]
  203. self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": optional_inner_paths})
  204. super(ContentDbPlugin, self).deleteContent(site, inner_path)
  205. def updatePeerNumbers(self):
  206. s = time.time()
  207. num_file = 0
  208. num_updated = 0
  209. num_site = 0
  210. for site in self.sites.values():
  211. if not site.content_manager.has_optional_files:
  212. continue
  213. has_updated_hashfield = next((
  214. peer
  215. for peer in site.peers.itervalues()
  216. if peer.has_hashfield and peer.hashfield.time_changed > self.time_peer_numbers_updated
  217. ), None)
  218. if not has_updated_hashfield and site.content_manager.hashfield.time_changed < self.time_peer_numbers_updated:
  219. continue
  220. hashfield_peers = itertools.chain.from_iterable(
  221. peer.hashfield.storage
  222. for peer in site.peers.itervalues()
  223. if peer.has_hashfield
  224. )
  225. peer_nums = collections.Counter(
  226. itertools.chain(
  227. hashfield_peers,
  228. site.content_manager.hashfield
  229. )
  230. )
  231. site_id = self.site_ids[site.address]
  232. if not site_id:
  233. continue
  234. res = self.execute("SELECT file_id, hash_id, peer FROM file_optional WHERE ?", {"site_id": site_id})
  235. updates = {}
  236. for row in res:
  237. peer_num = peer_nums.get(row["hash_id"], 0)
  238. if peer_num != row["peer"]:
  239. updates[row["file_id"]] = peer_num
  240. self.execute("BEGIN")
  241. for file_id, peer_num in updates.iteritems():
  242. self.execute("UPDATE file_optional SET peer = ? WHERE file_id = ?", (peer_num, file_id))
  243. self.execute("END")
  244. num_updated += len(updates)
  245. num_file += len(peer_nums)
  246. num_site += 1
  247. self.time_peer_numbers_updated = time.time()
  248. self.log.debug("%s/%s peer number for %s site updated in %.3fs" % (num_updated, num_file, num_site, time.time() - s))
  249. def queryDeletableFiles(self):
  250. # First return the files with atleast 10 seeder and not accessed in last weed
  251. query = """
  252. SELECT * FROM file_optional
  253. WHERE peer > 10 AND is_downloaded = 1 AND is_pinned = 0
  254. ORDER BY time_accessed < %s DESC, uploaded / size
  255. """ % int(time.time() - 60 * 60 * 7)
  256. limit_start = 0
  257. while 1:
  258. num = 0
  259. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  260. for row in res:
  261. yield row
  262. num += 1
  263. if num < 50:
  264. break
  265. limit_start += 50
  266. self.log.debug("queryDeletableFiles returning less-seeded files")
  267. # Then return files less seeder but still not accessed in last week
  268. query = """
  269. SELECT * FROM file_optional
  270. WHERE is_downloaded = 1 AND peer <= 10 AND is_pinned = 0
  271. ORDER BY peer DESC, time_accessed < %s DESC, uploaded / size
  272. """ % int(time.time() - 60 * 60 * 7)
  273. limit_start = 0
  274. while 1:
  275. num = 0
  276. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  277. for row in res:
  278. yield row
  279. num += 1
  280. if num < 50:
  281. break
  282. limit_start += 50
  283. self.log.debug("queryDeletableFiles returning everyting")
  284. # At the end return all files
  285. query = """
  286. SELECT * FROM file_optional
  287. WHERE is_downloaded = 1 AND peer <= 10 AND is_pinned = 0
  288. ORDER BY peer DESC, time_accessed, uploaded / size
  289. """
  290. limit_start = 0
  291. while 1:
  292. num = 0
  293. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  294. for row in res:
  295. yield row
  296. num += 1
  297. if num < 50:
  298. break
  299. limit_start += 50
  300. def getOptionalLimitBytes(self):
  301. if config.optional_limit.endswith("%"):
  302. limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit))
  303. limit_bytes = helper.getFreeSpace() * (limit_percent / 100)
  304. else:
  305. limit_bytes = float(re.sub("[^0-9.]", "", config.optional_limit)) * 1024 * 1024 * 1024
  306. return limit_bytes
  307. def getOptionalUsedBytes(self):
  308. size = self.execute("SELECT SUM(size) FROM file_optional WHERE is_downloaded = 1 AND is_pinned = 0").fetchone()[0]
  309. if not size:
  310. size = 0
  311. return size
  312. def getOptionalNeedDelete(self, size):
  313. if config.optional_limit.endswith("%"):
  314. limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit))
  315. need_delete = size - ((helper.getFreeSpace() + size) * (limit_percent / 100))
  316. else:
  317. need_delete = size - self.getOptionalLimitBytes()
  318. return need_delete
  319. def checkOptionalLimit(self, limit=None):
  320. if not limit:
  321. limit = self.getOptionalLimitBytes()
  322. if limit < 0:
  323. self.log.debug("Invalid limit for optional files: %s" % limit)
  324. return False
  325. size = self.getOptionalUsedBytes()
  326. need_delete = self.getOptionalNeedDelete(size)
  327. self.log.debug(
  328. "Optional size: %.1fMB/%.1fMB, Need delete: %.1fMB" %
  329. (float(size) / 1024 / 1024, float(limit) / 1024 / 1024, float(need_delete) / 1024 / 1024)
  330. )
  331. if need_delete <= 0:
  332. return False
  333. self.updatePeerNumbers()
  334. site_ids_reverse = {val: key for key, val in self.site_ids.iteritems()}
  335. deleted_file_ids = []
  336. for row in self.queryDeletableFiles():
  337. site_address = site_ids_reverse.get(row["site_id"])
  338. site = self.sites.get(site_address)
  339. if not site:
  340. self.log.error("No site found for id: %s" % row["site_id"])
  341. continue
  342. site.log.debug("Deleting %s %.3f MB left" % (row["inner_path"], float(need_delete) / 1024 / 1024))
  343. deleted_file_ids.append(row["file_id"])
  344. try:
  345. site.content_manager.optionalRemoved(row["inner_path"], row["hash_id"], row["size"])
  346. site.storage.delete(row["inner_path"])
  347. need_delete -= row["size"]
  348. except Exception as err:
  349. site.log.error("Error deleting %s: %s" % (row["inner_path"], err))
  350. if need_delete <= 0:
  351. break
  352. cur = self.getCursor()
  353. cur.execute("BEGIN")
  354. for file_id in deleted_file_ids:
  355. cur.execute("UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE ?", {"file_id": file_id})
  356. cur.execute("COMMIT")
  357. cur.close()