ContentDbPlugin.py 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401
  1. import time
  2. import collections
  3. import itertools
  4. import re
  5. import gevent
  6. from util import helper
  7. from Plugin import PluginManager
  8. from Config import config
  9. if "content_db" not in locals().keys(): # To keep between module reloads
  10. content_db = None
  11. @PluginManager.registerTo("ContentDb")
  12. class ContentDbPlugin(object):
  13. def __init__(self, *args, **kwargs):
  14. global content_db
  15. content_db = self
  16. self.filled = {} # Site addresses that already filled from content.json
  17. self.need_filling = False # file_optional table just created, fill data from content.json files
  18. self.time_peer_numbers_updated = 0
  19. self.my_optional_files = {} # Last 50 site_address/inner_path called by fileWrite (auto-pinning these files)
  20. self.optional_files = collections.defaultdict(dict)
  21. self.optional_files_loading = False
  22. helper.timer(60 * 5, self.checkOptionalLimit)
  23. super(ContentDbPlugin, self).__init__(*args, **kwargs)
  24. def getSchema(self):
  25. schema = super(ContentDbPlugin, self).getSchema()
  26. # Need file_optional table
  27. schema["tables"]["file_optional"] = {
  28. "cols": [
  29. ["file_id", "INTEGER PRIMARY KEY UNIQUE NOT NULL"],
  30. ["site_id", "INTEGER REFERENCES site (site_id) ON DELETE CASCADE"],
  31. ["inner_path", "TEXT"],
  32. ["hash_id", "INTEGER"],
  33. ["size", "INTEGER"],
  34. ["peer", "INTEGER DEFAULT 0"],
  35. ["uploaded", "INTEGER DEFAULT 0"],
  36. ["is_downloaded", "INTEGER DEFAULT 0"],
  37. ["is_pinned", "INTEGER DEFAULT 0"],
  38. ["time_added", "INTEGER DEFAULT 0"],
  39. ["time_downloaded", "INTEGER DEFAULT 0"],
  40. ["time_accessed", "INTEGER DEFAULT 0"]
  41. ],
  42. "indexes": [
  43. "CREATE UNIQUE INDEX file_optional_key ON file_optional (site_id, inner_path)",
  44. "CREATE INDEX is_downloaded ON file_optional (is_downloaded)"
  45. ],
  46. "schema_changed": 11
  47. }
  48. return schema
  49. def initSite(self, site):
  50. super(ContentDbPlugin, self).initSite(site)
  51. if self.need_filling:
  52. self.fillTableFileOptional(site)
  53. if not self.optional_files_loading:
  54. gevent.spawn_later(1, self.loadFilesOptional)
  55. self.optional_files_loading = True
  56. def checkTables(self):
  57. changed_tables = super(ContentDbPlugin, self).checkTables()
  58. if "file_optional" in changed_tables:
  59. self.need_filling = True
  60. return changed_tables
  61. # Load optional files ending
  62. def loadFilesOptional(self):
  63. s = time.time()
  64. num = 0
  65. total = 0
  66. total_downloaded = 0
  67. res = content_db.execute("SELECT site_id, inner_path, size, is_downloaded FROM file_optional")
  68. site_sizes = collections.defaultdict(lambda: collections.defaultdict(int))
  69. for row in res:
  70. self.optional_files[row["site_id"]][row["inner_path"][-8:]] = 1
  71. num += 1
  72. # Update site size stats
  73. site_sizes[row["site_id"]]["size_optional"] += row["size"]
  74. if row["is_downloaded"]:
  75. site_sizes[row["site_id"]]["optional_downloaded"] += row["size"]
  76. # Site site size stats to sites.json settings
  77. site_ids_reverse = {val: key for key, val in self.site_ids.iteritems()}
  78. for site_id, stats in site_sizes.iteritems():
  79. site_address = site_ids_reverse.get(site_id)
  80. if not site_address:
  81. self.log.error("Not found site_id: %s" % site_id)
  82. continue
  83. site = self.sites[site_address]
  84. site.settings["size_optional"] = stats["size_optional"]
  85. site.settings["optional_downloaded"] = stats["optional_downloaded"]
  86. total += stats["size_optional"]
  87. total_downloaded += stats["optional_downloaded"]
  88. self.log.debug(
  89. "Loaded %s optional files: %.2fMB, downloaded: %.2fMB in %.3fs" %
  90. (num, float(total) / 1024 / 1024, float(total_downloaded) / 1024 / 1024, time.time() - s)
  91. )
  92. if self.need_filling and self.getOptionalLimitBytes() >= 0 and self.getOptionalLimitBytes() < total_downloaded:
  93. limit_bytes = self.getOptionalLimitBytes()
  94. limit_new = round((float(total_downloaded) / 1024 / 1024 / 1024) * 1.1, 2) # Current limit + 10%
  95. self.log.debug(
  96. "First startup after update and limit is smaller than downloaded files size (%.2fGB), increasing it from %.2fGB to %.2fGB" %
  97. (float(total_downloaded) / 1024 / 1024 / 1024, float(limit_bytes) / 1024 / 1024 / 1024, limit_new)
  98. )
  99. config.saveValue("optional_limit", limit_new)
  100. config.optional_limit = str(limit_new)
  101. # Predicts if the file is optional
  102. def isOptionalFile(self, site_id, inner_path):
  103. return self.optional_files[site_id].get(inner_path[-8:])
  104. # Fill file_optional table with optional files found in sites
  105. def fillTableFileOptional(self, site):
  106. s = time.time()
  107. site_id = self.site_ids.get(site.address)
  108. if not site_id:
  109. return False
  110. cur = self.getCursor()
  111. cur.execute("BEGIN")
  112. res = cur.execute("SELECT * FROM content WHERE size_files_optional > 0 AND site_id = %s" % site_id)
  113. num = 0
  114. for row in res.fetchall():
  115. content = site.content_manager.contents[row["inner_path"]]
  116. try:
  117. num += self.setContentFilesOptional(site, row["inner_path"], content, cur=cur)
  118. except Exception, err:
  119. self.log.error("Error loading %s into file_optional: %s" % (row["inner_path"], err))
  120. cur.execute("COMMIT")
  121. cur.close()
  122. # Set my files to pinned
  123. from User import UserManager
  124. user = UserManager.user_manager.get()
  125. if not user:
  126. user = UserManager.user_manager.create()
  127. auth_address = user.getAuthAddress(site.address)
  128. self.execute(
  129. "UPDATE file_optional SET is_pinned = 1 WHERE site_id = :site_id AND inner_path LIKE :inner_path",
  130. {"site_id": site_id, "inner_path": "%%/%s/%%" % auth_address}
  131. )
  132. self.log.debug(
  133. "Filled file_optional table for %s in %.3fs (loaded: %s, is_pinned: %s)" %
  134. (site.address, time.time() - s, num, self.cur.cursor.rowcount)
  135. )
  136. self.filled[site.address] = True
  137. def setContentFilesOptional(self, site, content_inner_path, content, cur=None):
  138. if not cur:
  139. cur = self
  140. try:
  141. cur.execute("BEGIN")
  142. except Exception, err:
  143. self.log.warning("Transaction begin error %s %s: %s" % (site, content_inner_path, Debug.formatException(err)))
  144. num = 0
  145. site_id = self.site_ids[site.address]
  146. content_inner_dir = helper.getDirname(content_inner_path)
  147. for relative_inner_path, file in content.get("files_optional", {}).iteritems():
  148. file_inner_path = content_inner_dir + relative_inner_path
  149. hash_id = int(file["sha512"][0:4], 16)
  150. if hash_id in site.content_manager.hashfield:
  151. is_downloaded = 1
  152. else:
  153. is_downloaded = 0
  154. if site.address + "/" + file_inner_path in self.my_optional_files:
  155. is_pinned = 1
  156. else:
  157. is_pinned = 0
  158. cur.insertOrUpdate("file_optional", {
  159. "hash_id": hash_id,
  160. "size": int(file["size"]),
  161. "is_pinned": is_pinned
  162. }, {
  163. "site_id": site_id,
  164. "inner_path": file_inner_path
  165. }, oninsert={
  166. "time_added": int(time.time()),
  167. "time_downloaded": int(time.time()) if is_downloaded else 0,
  168. "is_downloaded": is_downloaded,
  169. "peer": is_downloaded
  170. })
  171. self.optional_files[site_id][file_inner_path[-8:]] = 1
  172. num += 1
  173. if cur == self:
  174. try:
  175. cur.execute("END")
  176. except Exception, err:
  177. self.log.warning("Transaction end error %s %s: %s" % (site, content_inner_path, Debug.formatException(err)))
  178. return num
  179. def setContent(self, site, inner_path, content, size=0):
  180. super(ContentDbPlugin, self).setContent(site, inner_path, content, size=size)
  181. old_content = site.content_manager.contents.get(inner_path, {})
  182. if (not self.need_filling or self.filled.get(site.address)) and "files_optional" in content or "files_optional" in old_content:
  183. self.setContentFilesOptional(site, inner_path, content)
  184. # Check deleted files
  185. if old_content:
  186. old_files = old_content.get("files_optional", {}).keys()
  187. new_files = content.get("files_optional", {}).keys()
  188. content_inner_dir = helper.getDirname(inner_path)
  189. deleted = [content_inner_dir + key for key in old_files if key not in new_files]
  190. if deleted:
  191. site_id = self.site_ids[site.address]
  192. self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": deleted})
  193. def deleteContent(self, site, inner_path):
  194. content = site.content_manager.contents.get(inner_path)
  195. if content and "files_optional" in content:
  196. site_id = self.site_ids[site.address]
  197. content_inner_dir = helper.getDirname(inner_path)
  198. optional_inner_paths = [
  199. content_inner_dir + relative_inner_path
  200. for relative_inner_path in content.get("files_optional", {}).keys()
  201. ]
  202. self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": optional_inner_paths})
  203. super(ContentDbPlugin, self).deleteContent(site, inner_path)
  204. def updatePeerNumbers(self):
  205. s = time.time()
  206. num_file = 0
  207. num_updated = 0
  208. num_site = 0
  209. for site in self.sites.values():
  210. if not site.content_manager.has_optional_files:
  211. continue
  212. has_updated_hashfield = next((
  213. peer
  214. for peer in site.peers.itervalues()
  215. if peer.has_hashfield and peer.hashfield.time_changed > self.time_peer_numbers_updated
  216. ), None)
  217. if not has_updated_hashfield and site.content_manager.hashfield.time_changed < self.time_peer_numbers_updated:
  218. continue
  219. hashfield_peers = itertools.chain.from_iterable(
  220. peer.hashfield.storage
  221. for peer in site.peers.itervalues()
  222. if peer.has_hashfield
  223. )
  224. peer_nums = collections.Counter(
  225. itertools.chain(
  226. hashfield_peers,
  227. site.content_manager.hashfield
  228. )
  229. )
  230. site_id = self.site_ids[site.address]
  231. if not site_id:
  232. continue
  233. res = self.execute("SELECT file_id, hash_id, peer FROM file_optional WHERE ?", {"site_id": site_id})
  234. updates = {}
  235. for row in res:
  236. peer_num = peer_nums.get(row["hash_id"], 0)
  237. if peer_num != row["peer"]:
  238. updates[row["file_id"]] = peer_num
  239. self.execute("BEGIN")
  240. for file_id, peer_num in updates.iteritems():
  241. self.execute("UPDATE file_optional SET peer = ? WHERE file_id = ?", (peer_num, file_id))
  242. self.execute("END")
  243. num_updated += len(updates)
  244. num_file += len(peer_nums)
  245. num_site += 1
  246. self.time_peer_numbers_updated = time.time()
  247. self.log.debug("%s/%s peer number for %s site updated in %.3fs" % (num_updated, num_file, num_site, time.time() - s))
  248. def queryDeletableFiles(self):
  249. # First return the files with atleast 10 seeder and not accessed in last weed
  250. query = """
  251. SELECT * FROM file_optional
  252. WHERE peer > 10 AND is_downloaded = 1 AND is_pinned = 0
  253. ORDER BY time_accessed < %s DESC, uploaded / size
  254. """ % int(time.time() - 60 * 60 * 7)
  255. limit_start = 0
  256. while 1:
  257. num = 0
  258. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  259. for row in res:
  260. yield row
  261. num += 1
  262. if num < 50:
  263. break
  264. limit_start += 50
  265. self.log.debug("queryDeletableFiles returning less-seeded files")
  266. # Then return files less seeder but still not accessed in last week
  267. query = """
  268. SELECT * FROM file_optional
  269. WHERE is_downloaded = 1 AND peer <= 10 AND is_pinned = 0
  270. ORDER BY peer DESC, time_accessed < %s DESC, uploaded / size
  271. """ % int(time.time() - 60 * 60 * 7)
  272. limit_start = 0
  273. while 1:
  274. num = 0
  275. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  276. for row in res:
  277. yield row
  278. num += 1
  279. if num < 50:
  280. break
  281. limit_start += 50
  282. self.log.debug("queryDeletableFiles returning everyting")
  283. # At the end return all files
  284. query = """
  285. SELECT * FROM file_optional
  286. WHERE is_downloaded = 1 AND peer <= 10 AND is_pinned = 0
  287. ORDER BY peer DESC, time_accessed, uploaded / size
  288. """
  289. limit_start = 0
  290. while 1:
  291. num = 0
  292. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  293. for row in res:
  294. yield row
  295. num += 1
  296. if num < 50:
  297. break
  298. limit_start += 50
  299. def getOptionalLimitBytes(self):
  300. if config.optional_limit.endswith("%"):
  301. limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit))
  302. limit_bytes = helper.getFreeSpace() * (limit_percent / 100)
  303. else:
  304. limit_bytes = float(re.sub("[^0-9.]", "", config.optional_limit)) * 1024 * 1024 * 1024
  305. return limit_bytes
  306. def getOptionalNeedDelete(self, size):
  307. if config.optional_limit.endswith("%"):
  308. limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit))
  309. need_delete = size - ((helper.getFreeSpace() + size) * (limit_percent / 100))
  310. else:
  311. need_delete = size - self.getOptionalLimitBytes()
  312. return need_delete
  313. def checkOptionalLimit(self, limit=None):
  314. if not limit:
  315. limit = self.getOptionalLimitBytes()
  316. if limit < 0:
  317. self.log.debug("Invalid limit for optional files: %s" % limit)
  318. return False
  319. size = self.execute("SELECT SUM(size) FROM file_optional WHERE is_downloaded = 1 AND is_pinned = 0").fetchone()[0]
  320. if not size:
  321. size = 0
  322. need_delete = self.getOptionalNeedDelete(size)
  323. self.log.debug(
  324. "Optional size: %.1fMB/%.1fMB, Need delete: %.1fMB" %
  325. (float(size) / 1024 / 1024, float(limit) / 1024 / 1024, float(need_delete) / 1024 / 1024)
  326. )
  327. if need_delete <= 0:
  328. return False
  329. self.updatePeerNumbers()
  330. site_ids_reverse = {val: key for key, val in self.site_ids.iteritems()}
  331. deleted_file_ids = []
  332. for row in self.queryDeletableFiles():
  333. site_address = site_ids_reverse.get(row["site_id"])
  334. site = self.sites.get(site_address)
  335. if not site:
  336. self.log.error("No site found for id: %s" % row["site_id"])
  337. continue
  338. site.log.debug("Deleting %s %.3f MB left" % (row["inner_path"], float(need_delete) / 1024 / 1024))
  339. deleted_file_ids.append(row["file_id"])
  340. try:
  341. site.content_manager.optionalRemove(row["inner_path"], row["hash_id"], row["size"])
  342. site.storage.delete(row["inner_path"])
  343. need_delete -= row["size"]
  344. except Exception, err:
  345. site.log.error("Error deleting %s: %s" % (row["inner_path"], err))
  346. if need_delete <= 0:
  347. break
  348. cur = self.getCursor()
  349. cur.execute("BEGIN")
  350. for file_id in deleted_file_ids:
  351. cur.execute("UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE ?", {"file_id": file_id})
  352. cur.execute("COMMIT")
  353. cur.close()