ContentDbPlugin.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. import time
  2. import collections
  3. import itertools
  4. import re
  5. import gevent
  6. from util import helper
  7. from Plugin import PluginManager
  8. from Config import config
  9. from Debug import Debug
  10. if "content_db" not in locals().keys(): # To keep between module reloads
  11. content_db = None
  12. @PluginManager.registerTo("ContentDb")
  13. class ContentDbPlugin(object):
  14. def __init__(self, *args, **kwargs):
  15. global content_db
  16. content_db = self
  17. self.filled = {} # Site addresses that already filled from content.json
  18. self.need_filling = False # file_optional table just created, fill data from content.json files
  19. self.time_peer_numbers_updated = 0
  20. self.my_optional_files = {} # Last 50 site_address/inner_path called by fileWrite (auto-pinning these files)
  21. self.optional_files = collections.defaultdict(dict)
  22. self.optional_files_loading = False
  23. helper.timer(60 * 5, self.checkOptionalLimit)
  24. super(ContentDbPlugin, self).__init__(*args, **kwargs)
  25. def getSchema(self):
  26. schema = super(ContentDbPlugin, self).getSchema()
  27. # Need file_optional table
  28. schema["tables"]["file_optional"] = {
  29. "cols": [
  30. ["file_id", "INTEGER PRIMARY KEY UNIQUE NOT NULL"],
  31. ["site_id", "INTEGER REFERENCES site (site_id) ON DELETE CASCADE"],
  32. ["inner_path", "TEXT"],
  33. ["hash_id", "INTEGER"],
  34. ["size", "INTEGER"],
  35. ["peer", "INTEGER DEFAULT 0"],
  36. ["uploaded", "INTEGER DEFAULT 0"],
  37. ["is_downloaded", "INTEGER DEFAULT 0"],
  38. ["is_pinned", "INTEGER DEFAULT 0"],
  39. ["time_added", "INTEGER DEFAULT 0"],
  40. ["time_downloaded", "INTEGER DEFAULT 0"],
  41. ["time_accessed", "INTEGER DEFAULT 0"]
  42. ],
  43. "indexes": [
  44. "CREATE UNIQUE INDEX file_optional_key ON file_optional (site_id, inner_path)",
  45. "CREATE INDEX is_downloaded ON file_optional (is_downloaded)"
  46. ],
  47. "schema_changed": 11
  48. }
  49. return schema
  50. def initSite(self, site):
  51. super(ContentDbPlugin, self).initSite(site)
  52. if self.need_filling:
  53. self.fillTableFileOptional(site)
  54. if not self.optional_files_loading:
  55. gevent.spawn_later(1, self.loadFilesOptional)
  56. self.optional_files_loading = True
  57. def checkTables(self):
  58. changed_tables = super(ContentDbPlugin, self).checkTables()
  59. if "file_optional" in changed_tables:
  60. self.need_filling = True
  61. return changed_tables
  62. # Load optional files ending
  63. def loadFilesOptional(self):
  64. s = time.time()
  65. num = 0
  66. total = 0
  67. total_downloaded = 0
  68. res = content_db.execute("SELECT site_id, inner_path, size, is_downloaded FROM file_optional")
  69. site_sizes = collections.defaultdict(lambda: collections.defaultdict(int))
  70. for row in res:
  71. self.optional_files[row["site_id"]][row["inner_path"][-8:]] = 1
  72. num += 1
  73. # Update site size stats
  74. site_sizes[row["site_id"]]["size_optional"] += row["size"]
  75. if row["is_downloaded"]:
  76. site_sizes[row["site_id"]]["optional_downloaded"] += row["size"]
  77. # Site site size stats to sites.json settings
  78. site_ids_reverse = {val: key for key, val in self.site_ids.iteritems()}
  79. for site_id, stats in site_sizes.iteritems():
  80. site_address = site_ids_reverse.get(site_id)
  81. if not site_address:
  82. self.log.error("Not found site_id: %s" % site_id)
  83. continue
  84. site = self.sites[site_address]
  85. site.settings["size_optional"] = stats["size_optional"]
  86. site.settings["optional_downloaded"] = stats["optional_downloaded"]
  87. total += stats["size_optional"]
  88. total_downloaded += stats["optional_downloaded"]
  89. self.log.debug(
  90. "Loaded %s optional files: %.2fMB, downloaded: %.2fMB in %.3fs" %
  91. (num, float(total) / 1024 / 1024, float(total_downloaded) / 1024 / 1024, time.time() - s)
  92. )
  93. if self.need_filling and self.getOptionalLimitBytes() >= 0 and self.getOptionalLimitBytes() < total_downloaded:
  94. limit_bytes = self.getOptionalLimitBytes()
  95. limit_new = round((float(total_downloaded) / 1024 / 1024 / 1024) * 1.1, 2) # Current limit + 10%
  96. self.log.debug(
  97. "First startup after update and limit is smaller than downloaded files size (%.2fGB), increasing it from %.2fGB to %.2fGB" %
  98. (float(total_downloaded) / 1024 / 1024 / 1024, float(limit_bytes) / 1024 / 1024 / 1024, limit_new)
  99. )
  100. config.saveValue("optional_limit", limit_new)
  101. config.optional_limit = str(limit_new)
  102. # Predicts if the file is optional
  103. def isOptionalFile(self, site_id, inner_path):
  104. return self.optional_files[site_id].get(inner_path[-8:])
  105. # Fill file_optional table with optional files found in sites
  106. def fillTableFileOptional(self, site):
  107. s = time.time()
  108. site_id = self.site_ids.get(site.address)
  109. if not site_id:
  110. return False
  111. cur = self.getCursor()
  112. cur.execute("BEGIN")
  113. res = cur.execute("SELECT * FROM content WHERE size_files_optional > 0 AND site_id = %s" % site_id)
  114. num = 0
  115. for row in res.fetchall():
  116. content = site.content_manager.contents[row["inner_path"]]
  117. try:
  118. num += self.setContentFilesOptional(site, row["inner_path"], content, cur=cur)
  119. except Exception as err:
  120. self.log.error("Error loading %s into file_optional: %s" % (row["inner_path"], err))
  121. cur.execute("COMMIT")
  122. cur.close()
  123. # Set my files to pinned
  124. from User import UserManager
  125. user = UserManager.user_manager.get()
  126. if not user:
  127. user = UserManager.user_manager.create()
  128. auth_address = user.getAuthAddress(site.address)
  129. self.execute(
  130. "UPDATE file_optional SET is_pinned = 1 WHERE site_id = :site_id AND inner_path LIKE :inner_path",
  131. {"site_id": site_id, "inner_path": "%%/%s/%%" % auth_address}
  132. )
  133. self.log.debug(
  134. "Filled file_optional table for %s in %.3fs (loaded: %s, is_pinned: %s)" %
  135. (site.address, time.time() - s, num, self.cur.cursor.rowcount)
  136. )
  137. self.filled[site.address] = True
  138. def setContentFilesOptional(self, site, content_inner_path, content, cur=None):
  139. if not cur:
  140. cur = self
  141. try:
  142. cur.execute("BEGIN")
  143. except Exception as err:
  144. self.log.warning("Transaction begin error %s %s: %s" % (site, content_inner_path, Debug.formatException(err)))
  145. num = 0
  146. site_id = self.site_ids[site.address]
  147. content_inner_dir = helper.getDirname(content_inner_path)
  148. for relative_inner_path, file in content.get("files_optional", {}).iteritems():
  149. file_inner_path = content_inner_dir + relative_inner_path
  150. hash_id = int(file["sha512"][0:4], 16)
  151. if hash_id in site.content_manager.hashfield:
  152. is_downloaded = 1
  153. else:
  154. is_downloaded = 0
  155. if site.address + "/" + content_inner_dir in self.my_optional_files:
  156. is_pinned = 1
  157. else:
  158. is_pinned = 0
  159. cur.insertOrUpdate("file_optional", {
  160. "hash_id": hash_id,
  161. "size": int(file["size"])
  162. }, {
  163. "site_id": site_id,
  164. "inner_path": file_inner_path
  165. }, oninsert={
  166. "time_added": int(time.time()),
  167. "time_downloaded": int(time.time()) if is_downloaded else 0,
  168. "is_downloaded": is_downloaded,
  169. "peer": is_downloaded,
  170. "is_pinned": is_pinned
  171. })
  172. self.optional_files[site_id][file_inner_path[-8:]] = 1
  173. num += 1
  174. if cur == self:
  175. try:
  176. cur.execute("END")
  177. except Exception as err:
  178. self.log.warning("Transaction end error %s %s: %s" % (site, content_inner_path, Debug.formatException(err)))
  179. return num
  180. def setContent(self, site, inner_path, content, size=0):
  181. super(ContentDbPlugin, self).setContent(site, inner_path, content, size=size)
  182. old_content = site.content_manager.contents.get(inner_path, {})
  183. if (not self.need_filling or self.filled.get(site.address)) and ("files_optional" in content or "files_optional" in old_content):
  184. self.setContentFilesOptional(site, inner_path, content)
  185. # Check deleted files
  186. if old_content:
  187. old_files = old_content.get("files_optional", {}).keys()
  188. new_files = content.get("files_optional", {}).keys()
  189. content_inner_dir = helper.getDirname(inner_path)
  190. deleted = [content_inner_dir + key for key in old_files if key not in new_files]
  191. if deleted:
  192. site_id = self.site_ids[site.address]
  193. self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": deleted})
  194. def deleteContent(self, site, inner_path):
  195. content = site.content_manager.contents.get(inner_path)
  196. if content and "files_optional" in content:
  197. site_id = self.site_ids[site.address]
  198. content_inner_dir = helper.getDirname(inner_path)
  199. optional_inner_paths = [
  200. content_inner_dir + relative_inner_path
  201. for relative_inner_path in content.get("files_optional", {}).keys()
  202. ]
  203. self.execute("DELETE FROM file_optional WHERE ?", {"site_id": site_id, "inner_path": optional_inner_paths})
  204. super(ContentDbPlugin, self).deleteContent(site, inner_path)
  205. def updatePeerNumbers(self):
  206. s = time.time()
  207. num_file = 0
  208. num_updated = 0
  209. num_site = 0
  210. for site in self.sites.values():
  211. if not site.content_manager.has_optional_files:
  212. continue
  213. if not site.settings["serving"]:
  214. continue
  215. has_updated_hashfield = next((
  216. peer
  217. for peer in site.peers.itervalues()
  218. if peer.has_hashfield and peer.hashfield.time_changed > self.time_peer_numbers_updated
  219. ), None)
  220. if not has_updated_hashfield and site.content_manager.hashfield.time_changed < self.time_peer_numbers_updated:
  221. continue
  222. hashfield_peers = itertools.chain.from_iterable(
  223. peer.hashfield.storage
  224. for peer in site.peers.itervalues()
  225. if peer.has_hashfield
  226. )
  227. peer_nums = collections.Counter(
  228. itertools.chain(
  229. hashfield_peers,
  230. site.content_manager.hashfield
  231. )
  232. )
  233. site_id = self.site_ids[site.address]
  234. if not site_id:
  235. continue
  236. res = self.execute("SELECT file_id, hash_id, peer FROM file_optional WHERE ?", {"site_id": site_id})
  237. updates = {}
  238. for row in res:
  239. peer_num = peer_nums.get(row["hash_id"], 0)
  240. if peer_num != row["peer"]:
  241. updates[row["file_id"]] = peer_num
  242. self.execute("BEGIN")
  243. for file_id, peer_num in updates.iteritems():
  244. self.execute("UPDATE file_optional SET peer = ? WHERE file_id = ?", (peer_num, file_id))
  245. self.execute("END")
  246. num_updated += len(updates)
  247. num_file += len(peer_nums)
  248. num_site += 1
  249. self.time_peer_numbers_updated = time.time()
  250. self.log.debug("%s/%s peer number for %s site updated in %.3fs" % (num_updated, num_file, num_site, time.time() - s))
  251. def queryDeletableFiles(self):
  252. # First return the files with atleast 10 seeder and not accessed in last week
  253. query = """
  254. SELECT * FROM file_optional
  255. WHERE peer > 10 AND %s
  256. ORDER BY time_accessed < %s DESC, uploaded / size
  257. """ % (self.getOptionalUsedWhere(), int(time.time() - 60 * 60 * 7))
  258. limit_start = 0
  259. while 1:
  260. num = 0
  261. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  262. for row in res:
  263. yield row
  264. num += 1
  265. if num < 50:
  266. break
  267. limit_start += 50
  268. self.log.debug("queryDeletableFiles returning less-seeded files")
  269. # Then return files less seeder but still not accessed in last week
  270. query = """
  271. SELECT * FROM file_optional
  272. WHERE peer <= 10 AND %s
  273. ORDER BY peer DESC, time_accessed < %s DESC, uploaded / size
  274. """ % (self.getOptionalUsedWhere(), int(time.time() - 60 * 60 * 7))
  275. limit_start = 0
  276. while 1:
  277. num = 0
  278. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  279. for row in res:
  280. yield row
  281. num += 1
  282. if num < 50:
  283. break
  284. limit_start += 50
  285. self.log.debug("queryDeletableFiles returning everyting")
  286. # At the end return all files
  287. query = """
  288. SELECT * FROM file_optional
  289. WHERE peer <= 10 AND %s
  290. ORDER BY peer DESC, time_accessed, uploaded / size
  291. """ % self.getOptionalUsedWhere()
  292. limit_start = 0
  293. while 1:
  294. num = 0
  295. res = self.execute("%s LIMIT %s, 50" % (query, limit_start))
  296. for row in res:
  297. yield row
  298. num += 1
  299. if num < 50:
  300. break
  301. limit_start += 50
  302. def getOptionalLimitBytes(self):
  303. if config.optional_limit.endswith("%"):
  304. limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit))
  305. limit_bytes = helper.getFreeSpace() * (limit_percent / 100)
  306. else:
  307. limit_bytes = float(re.sub("[^0-9.]", "", config.optional_limit)) * 1024 * 1024 * 1024
  308. return limit_bytes
  309. def getOptionalUsedWhere(self):
  310. maxsize = config.optional_limit_exclude_minsize * 1024 * 1024
  311. query = "is_downloaded = 1 AND is_pinned = 0 AND size < %s" % maxsize
  312. # Don't delete optional files from owned sites
  313. my_site_ids = []
  314. for address, site in self.sites.items():
  315. if site.settings["own"]:
  316. my_site_ids.append(str(self.site_ids[address]))
  317. if my_site_ids:
  318. query += " AND site_id NOT IN (%s)" % ", ".join(my_site_ids)
  319. return query
  320. def getOptionalUsedBytes(self):
  321. size = self.execute("SELECT SUM(size) FROM file_optional WHERE %s" % self.getOptionalUsedWhere()).fetchone()[0]
  322. if not size:
  323. size = 0
  324. return size
  325. def getOptionalNeedDelete(self, size):
  326. if config.optional_limit.endswith("%"):
  327. limit_percent = float(re.sub("[^0-9.]", "", config.optional_limit))
  328. need_delete = size - ((helper.getFreeSpace() + size) * (limit_percent / 100))
  329. else:
  330. need_delete = size - self.getOptionalLimitBytes()
  331. return need_delete
  332. def checkOptionalLimit(self, limit=None):
  333. if not limit:
  334. limit = self.getOptionalLimitBytes()
  335. if limit < 0:
  336. self.log.debug("Invalid limit for optional files: %s" % limit)
  337. return False
  338. size = self.getOptionalUsedBytes()
  339. need_delete = self.getOptionalNeedDelete(size)
  340. self.log.debug(
  341. "Optional size: %.1fMB/%.1fMB, Need delete: %.1fMB" %
  342. (float(size) / 1024 / 1024, float(limit) / 1024 / 1024, float(need_delete) / 1024 / 1024)
  343. )
  344. if need_delete <= 0:
  345. return False
  346. self.updatePeerNumbers()
  347. site_ids_reverse = {val: key for key, val in self.site_ids.iteritems()}
  348. deleted_file_ids = []
  349. for row in self.queryDeletableFiles():
  350. site_address = site_ids_reverse.get(row["site_id"])
  351. site = self.sites.get(site_address)
  352. if not site:
  353. self.log.error("No site found for id: %s" % row["site_id"])
  354. continue
  355. site.log.debug("Deleting %s %.3f MB left" % (row["inner_path"], float(need_delete) / 1024 / 1024))
  356. deleted_file_ids.append(row["file_id"])
  357. try:
  358. site.content_manager.optionalRemoved(row["inner_path"], row["hash_id"], row["size"])
  359. site.storage.delete(row["inner_path"])
  360. need_delete -= row["size"]
  361. except Exception as err:
  362. site.log.error("Error deleting %s: %s" % (row["inner_path"], err))
  363. if need_delete <= 0:
  364. break
  365. cur = self.getCursor()
  366. cur.execute("BEGIN")
  367. for file_id in deleted_file_ids:
  368. cur.execute("UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE ?", {"file_id": file_id})
  369. cur.execute("COMMIT")
  370. cur.close()