ContentDbPlugin.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379
  1. import time
  2. import collections
  3. import itertools
  4. import re
  5. import gevent
  6. from util import helper
  7. from Plugin import PluginManager
  8. from Config import config
  9. if "content_db" not in locals().keys(): # To keep between module reloads
  10. content_db = None
  11. @PluginManager.registerTo("ContentDb")
  12. class ContentDbPlugin(object):
  13. def __init__(self, *args, **kwargs):
  14. global content_db
  15. content_db = self
  16. self.filled = {} # Site addresses that already filled from content.json
  17. self.need_filling = False # file_optional table just created, fill data from content.json files
  18. self.time_peer_numbers_updated = 0
  19. self.my_optional_files = {} # Last 50 site_address/inner_path called by fileWrite (auto-pinning these files)
  20. self.optional_files = collections.defaultdict(dict)
  21. self.optional_files_loading = False
  22. helper.timer(60 * 5, self.checkOptionalLimit)
  23. super(ContentDbPlugin, self).__init__(*args, **kwargs)
  24. def getSchema(self):
  25. schema = super(ContentDbPlugin, self).getSchema()
  26. # Need file_optional table
  27. schema["tables"]["file_optional"] = {
  28. "cols": [
  29. ["file_id", "INTEGER PRIMARY KEY UNIQUE NOT NULL"],
  30. ["site_id", "INTEGER REFERENCES site (site_id) ON DELETE CASCADE"],
  31. ["inner_path", "TEXT"],
  32. ["hash_id", "INTEGER"],
  33. ["size", "INTEGER"],
  34. ["peer", "INTEGER DEFAULT 0"],
  35. ["uploaded", "INTEGER DEFAULT 0"],
  36. ["is_downloaded", "INTEGER DEFAULT 0"],
  37. ["is_pinned", "INTEGER DEFAULT 0"],
  38. ["time_added", "INTEGER DEFAULT 0"],
  39. ["time_downloaded", "INTEGER DEFAULT 0"],
  40. ["time_accessed", "INTEGER DEFAULT 0"]
  41. ],
  42. "indexes": [
  43. "CREATE UNIQUE INDEX file_optional_key ON file_optional (site_id, inner_path)",
  44. "CREATE INDEX is_downloaded ON file_optional (is_downloaded)"
  45. ],
  46. "schema_changed": 11
  47. }
  48. return schema
  49. def initSite(self, site):
  50. super(ContentDbPlugin, self).initSite(site)
  51. if self.need_filling:
  52. self.fillTableFileOptional(site)
  53. if not self.optional_files_loading:
  54. gevent.spawn_later(1, self.loadFilesOptional)
  55. self.optional_files_loading = True
  56. def checkTables(self):
  57. changed_tables = super(ContentDbPlugin, self).checkTables()
  58. if "file_optional" in changed_tables:
  59. self.need_filling = True
  60. return changed_tables
  61. # Load optional files ending
  62. def loadFilesOptional(self):
  63. s = time.time()
  64. num = 0
  65. total = 0
  66. total_downloaded = 0
  67. res = content_db.execute("SELECT site_id, inner_path, size, is_downloaded FROM file_optional")
  68. site_sizes = collections.defaultdict(lambda: collections.defaultdict(int))
  69. for row in res:
  70. self.optional_files[row["site_id"]][row["inner_path"][-8:]] = 1
  71. num += 1
  72. # Update site size stats
  73. site_sizes[row["site_id"]]["size_optional"] += row["size"]
  74. if row["is_downloaded"]:
  75. site_sizes[row["site_id"]]["optional_downloaded"] += row["size"]
  76. # Site site size stats to sites.json settings
  77. site_ids_reverse = {val: key for key, val in self.site_ids.iteritems()}
  78. for site_id, stats in site_sizes.iteritems():
  79. site_address = site_ids_reverse.get(site_id)
  80. if not site_address:
  81. self.log.error("Not found site_id: %s" % site_id)
  82. continue
  83. site = self.sites[site_address]
  84. site.settings["size_optional"] = stats["size_optional"]
  85. site.settings["optional_downloaded"] = stats["optional_downloaded"]
  86. total += stats["size_optional"]
  87. total_downloaded += stats["optional_downloaded"]
  88. self.log.debug(
  89. "Loaded %s optional files: %.2fMB, downloaded: %.2fMB in %.3fs" %
  90. (num, float(total) / 1024 / 1024, float(total_downloaded) / 1024 / 1024, time.time() - s)
  91. )
  92. if self.need_filling and self.getOptionalLimitBytes() < total_downloaded:
  93. limit_bytes = self.getOptionalLimitBytes()
  94. limit_new = round((float(total_downloaded) / 1024 / 1024 / 1024) * 1.1, 2) # Current limit + 10%
  95. self.log.debug(
  96. "First startup after update and limit is smaller than downloaded files size (%.2fGB), increasing it from %.2fGB to %.2fGB" %
  97. (float(total_downloaded) / 1024 / 1024 / 1024, float(limit_bytes) / 1024 / 1024 / 1024, limit_new)
  98. )
  99. config.saveValue("optional_limit", limit_new)
  100. config.optional_limit = str(limit_new)
  101. # Predicts if the file is optional
  102. def isOptionalFile(self, site_id, inner_path):
  103. return self.optional_files[site_id].get(inner_path[-8:])
  104. # Fill file_optional table with optional files found in sites
  105. def fillTableFileOptional(self, site):
  106. s = time.time()
  107. site_id = self.site_ids.get(site.address)
  108. if not site_id:
  109. return False
  110. cur = self.getCursor()
  111. cur.execute("BEGIN")
  112. res = cur.execute("SELECT * FROM content WHERE size_files_optional > 0 AND site_id = %s" % site_id)
  113. num = 0
  114. for row in res.fetchall():
  115. content = site.content_manager.contents[row["inner_path"]]
  116. try:
  117. num += self.setContentFilesOptional(site, row["inner_path"], content, cur=cur)
  118. except Exception, err:
  119. self.log.error("Error loading %s into file_optional: %s" % (row["inner_path"], err))
  120. cur.execute("COMMIT")
  121. cur.close()
  122. # Set my files to pinned
  123. from User import UserManager
  124. user = UserManager.user_manager.get()
  125. if not user:
  126. user = UserManager.user_manager.create()
  127. auth_address = user.getAuthAddress(site.address)
  128. self.execute(
  129. "UPDATE file_optional SET is_pinned = 1 WHERE site_id = :site_id AND inner_path LIKE :inner_path",
  130. {"site_id": site_id, "inner_path": "%%/%s/%%" % auth_address}
  131. )
  132. self.log.debug(
  133. "Filled file_optional table for %s in %.3fs (loaded: %s, is_pinned: %s)" %
  134. (site.address, time.time() - s, num, self.cur.cursor.rowcount)
  135. )
  136. self.filled[site.address] = True
  137. def setContentFilesOptional(self, site, content_inner_path, content, cur=None):
  138. if not cur:
  139. cur = self
  140. cur.execute("BEGIN")
  141. num = 0
  142. site_id = self.site_ids[site.address]
  143. content_inner_dir = helper.getDirname(content_inner_path)
  144. for relative_inner_path, file in content.get("files_optional", {}).iteritems():
  145. file_inner_path = content_inner_dir + relative_inner_path
  146. hash_id = int(file["sha512"][0:4], 16)
  147. if hash_id in site.content_manager.hashfield:
  148. is_downloaded = 1
  149. else:
  150. is_downloaded = 0
  151. if site.address + "/" + file_inner_path in self.my_optional_files:
  152. is_pinned = 1
  153. else:
  154. is_pinned = 0
  155. cur.insertOrUpdate("file_optional", {
  156. "hash_id": hash_id,
  157. "size": int(file["size"]),
  158. "is_pinned": is_pinned
  159. }, {
  160. "site_id": site_id,
  161. "inner_path": file_inner_path
  162. }, oninsert={
  163. "time_added": int(time.time()),
  164. "time_downloaded": int(time.time()) if is_downloaded else 0,
  165. "is_downloaded": is_downloaded,
  166. "peer": is_downloaded
  167. })
  168. self.optional_files[site_id][file_inner_path[-8:]] = 1
  169. num += 1
  170. if cur == self:
  171. cur.execute("END")
  172. return num
  173. def setContent(self, site, inner_path, content, size=0):
  174. super(ContentDbPlugin, self).setContent(site, inner_path, content, size=size)
  175. old_content = site.content_manager.contents.get(inner_path, {})
  176. if (not self.need_filling or self.filled.get(site.address)) and "files_optional" in content or "files_optional" in old_content:
  177. self.setContentFilesOptional(site, inner_path, content)
  178. # Check deleted files
  179. if old_content:
  180. old_files = old_content.get("files_optional", {}).keys()
  181. new_files = content.get("files_optional", {}).keys()
  182. content_inner_dir = helper.getDirname(inner_path)
  183. deleted = [content_inner_dir + key for key in old_files if key not in new_files]
  184. if deleted:
  185. site_id = self.site_ids[site.address]
  186. self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": deleted})
  187. def deleteContent(self, site, inner_path):
  188. content = site.content_manager.contents.get(inner_path)
  189. if content and "files_optional" in content:
  190. site_id = self.site_ids[site.address]
  191. content_inner_dir = helper.getDirname(inner_path)
  192. optional_inner_paths = [
  193. content_inner_dir + relative_inner_path
  194. for relative_inner_path in content.get("files_optional", {}).keys()
  195. ]
  196. self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": optional_inner_paths})
  197. super(ContentDbPlugin, self).deleteContent(site, inner_path)
  198. def updatePeerNumbers(self):
  199. s = time.time()
  200. num_file = 0
  201. num_updated = 0
  202. num_site = 0
  203. for site in self.sites.values():
  204. if not site.content_manager.has_optional_files:
  205. continue
  206. has_updated_hashfield = next((
  207. peer
  208. for peer in site.peers.itervalues()
  209. if peer.has_hashfield and peer.hashfield.time_changed > self.time_peer_numbers_updated
  210. ), None)
  211. if not has_updated_hashfield and site.content_manager.hashfield.time_changed < self.time_peer_numbers_updated:
  212. continue
  213. hashfield_peers = itertools.chain.from_iterable(
  214. peer.hashfield.storage
  215. for peer in site.peers.itervalues()
  216. if peer.has_hashfield
  217. )
  218. peer_nums = collections.Counter(
  219. itertools.chain(
  220. hashfield_peers,
  221. site.content_manager.hashfield
  222. )
  223. )
  224. site_id = self.site_ids[site.address]
  225. if not site_id:
  226. continue
  227. res = self.execute("SELECT file_id, hash_id, peer FROM file_optional WHERE ?", {"site_id": site_id})
  228. updates = {}
  229. for row in res:
  230. peer_num = peer_nums.get(row["hash_id"], 0)
  231. if peer_num != row["peer"]:
  232. updates[row["file_id"]] = peer_num
  233. self.execute("BEGIN")
  234. for file_id, peer_num in updates.iteritems():
  235. self.execute("UPDATE file_optional SET peer = ? WHERE file_id = ?", (peer_num, file_id))
  236. self.execute("END")
  237. num_updated += len(updates)
  238. num_file += len(peer_nums)
  239. num_site += 1
  240. self.time_peer_numbers_updated = time.time()
  241. self.log.debug("%s/%s peer number for %s site updated in %.3fs" % (num_updated, num_file, num_site, time.time() - s))
  242. def queryDeletableFiles(self):
  243. # First return the files with atleast 10 seeder and not accessed in last weed
  244. query = """
  245. SELECT * FROM file_optional
  246. WHERE peer > 10 AND is_downloaded = 1 AND is_pinned = 0
  247. ORDER BY time_accessed < %s DESC, uploaded / size
  248. """ % int(time.time() - 60 * 60 * 7)
  249. limit_start = 0
  250. while 1:
  251. num = 0
  252. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  253. for row in res:
  254. yield row
  255. num += 1
  256. if num < 50:
  257. break
  258. limit_start += 50
  259. self.log.debug("queryDeletableFiles returning less-seeded files")
  260. # Then return files less seeder but still not accessed in last week
  261. query = """
  262. SELECT * FROM file_optional
  263. WHERE is_downloaded = 1 AND peer <= 10 AND is_pinned = 0
  264. ORDER BY peer DESC, time_accessed < %s DESC, uploaded / size
  265. """ % int(time.time() - 60 * 60 * 7)
  266. limit_start = 0
  267. while 1:
  268. num = 0
  269. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  270. for row in res:
  271. yield row
  272. num += 1
  273. if num < 50:
  274. break
  275. limit_start += 50
  276. self.log.debug("queryDeletableFiles returning everyting")
  277. # At the end return all files
  278. query = """
  279. SELECT * FROM file_optional
  280. WHERE is_downloaded = 1 AND peer <= 10 AND is_pinned = 0
  281. ORDER BY peer DESC, time_accessed, uploaded / size
  282. """
  283. limit_start = 0
  284. while 1:
  285. num = 0
  286. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  287. for row in res:
  288. yield row
  289. num += 1
  290. if num < 50:
  291. break
  292. limit_start += 50
  293. def getOptionalLimitBytes(self):
  294. if config.optional_limit.endswith("%"):
  295. limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit))
  296. limit_bytes = helper.getFreeSpace() * (limit_percent / 100)
  297. else:
  298. limit_bytes = float(re.sub("[^0-9.]", "", config.optional_limit)) * 1024 * 1024 * 1024
  299. return limit_bytes
  300. def checkOptionalLimit(self, limit=None):
  301. if not limit:
  302. limit = self.getOptionalLimitBytes()
  303. size = self.execute("SELECT SUM(size) FROM file_optional WHERE is_downloaded = 1 AND is_pinned = 0").fetchone()[0]
  304. if not size:
  305. size = 0
  306. need_delete = size - limit
  307. self.log.debug("Optional size: %.1fMB/%.1fMB" % (float(size) / 1024 / 1024, float(limit) / 1024 / 1024))
  308. if need_delete <= 0:
  309. return False
  310. self.updatePeerNumbers()
  311. site_ids_reverse = {val: key for key, val in self.site_ids.iteritems()}
  312. deleted_file_ids = []
  313. for row in self.queryDeletableFiles():
  314. site_address = site_ids_reverse.get(row["site_id"])
  315. site = self.sites.get(site_address)
  316. if not site:
  317. self.log.error("No site found for id: %s" % row["site_id"])
  318. continue
  319. site.log.debug("Deleting %s %.3f MB left" % (row["inner_path"], float(need_delete) / 1024 / 1024))
  320. deleted_file_ids.append(row["file_id"])
  321. try:
  322. site.content_manager.optionalRemove(row["inner_path"], row["hash_id"], row["size"])
  323. site.storage.delete(row["inner_path"])
  324. need_delete -= row["size"]
  325. except Exception, err:
  326. site.log.error("Error deleting %s: %s" % (row["inner_path"], err))
  327. if need_delete <= 0:
  328. break
  329. cur = self.getCursor()
  330. cur.execute("BEGIN")
  331. for file_id in deleted_file_ids:
  332. cur.execute("UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE ?", {"file_id": file_id})
  333. cur.execute("COMMIT")
  334. cur.close()