OptionalManagerPlugin.py 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. import time
  2. import re
  3. import collections
  4. import gevent
  5. from util import helper
  6. from Plugin import PluginManager
  7. import ContentDbPlugin
  8. # We can only import plugin host clases after the plugins are loaded
  9. @PluginManager.afterLoad
  10. def importPluginnedClasses():
  11. global config
  12. from Config import config
  13. def processAccessLog():
  14. if access_log:
  15. content_db = ContentDbPlugin.content_db
  16. now = int(time.time())
  17. num = 0
  18. for site_id in access_log:
  19. content_db.execute(
  20. "UPDATE file_optional SET time_accessed = %s WHERE ?" % now,
  21. {"site_id": site_id, "inner_path": access_log[site_id].keys()}
  22. )
  23. num += len(access_log[site_id])
  24. access_log.clear()
  25. def processRequestLog():
  26. if request_log:
  27. content_db = ContentDbPlugin.content_db
  28. cur = content_db.getCursor()
  29. num = 0
  30. cur.execute("BEGIN")
  31. for site_id in request_log:
  32. for inner_path, uploaded in request_log[site_id].iteritems():
  33. content_db.execute(
  34. "UPDATE file_optional SET uploaded = uploaded + %s WHERE ?" % uploaded,
  35. {"site_id": site_id, "inner_path": inner_path}
  36. )
  37. num += 1
  38. cur.execute("END")
  39. request_log.clear()
  40. if "access_log" not in locals().keys(): # To keep between module reloads
  41. access_log = collections.defaultdict(dict) # {site_id: {inner_path1: 1, inner_path2: 1...}}
  42. request_log = collections.defaultdict(lambda: collections.defaultdict(int)) # {site_id: {inner_path1: 1, inner_path2: 1...}}
  43. helper.timer(61, processAccessLog)
  44. helper.timer(60, processRequestLog)
  45. @PluginManager.registerTo("ContentManager")
  46. class ContentManagerPlugin(object):
  47. def __init__(self, *args, **kwargs):
  48. self.cache_is_pinned = {}
  49. super(ContentManagerPlugin, self).__init__(*args, **kwargs)
  50. def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
  51. if "|" in inner_path: # Big file piece
  52. file_inner_path, file_range = inner_path.split("|")
  53. else:
  54. file_inner_path = inner_path
  55. self.contents.db.executeDelayed(
  56. "UPDATE file_optional SET time_downloaded = :now, is_downloaded = 1, peer = peer + 1 WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 0",
  57. {"now": int(time.time()), "site_id": self.contents.db.site_ids[self.site.address], "inner_path": file_inner_path}
  58. )
  59. return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
  60. def optionalRemoved(self, inner_path, hash_id, size=None):
  61. self.contents.db.execute(
  62. "UPDATE file_optional SET is_downloaded = 0, is_pinned = 0, peer = peer - 1 WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 1",
  63. {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
  64. )
  65. if self.contents.db.cur.cursor.rowcount > 0:
  66. back = super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
  67. # Re-add to hashfield if we have other file with the same hash_id
  68. if self.isDownloaded(hash_id=hash_id, force_check_db=True):
  69. self.hashfield.appendHashId(hash_id)
  70. return back
  71. def isDownloaded(self, inner_path=None, hash_id=None, force_check_db=False):
  72. if hash_id and not force_check_db and hash_id not in self.hashfield:
  73. return False
  74. if inner_path:
  75. res = self.contents.db.execute(
  76. "SELECT is_downloaded FROM file_optional WHERE site_id = :site_id AND inner_path = :inner_path LIMIT 1",
  77. {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
  78. )
  79. else:
  80. res = self.contents.db.execute(
  81. "SELECT is_downloaded FROM file_optional WHERE site_id = :site_id AND hash_id = :hash_id AND is_downloaded = 1 LIMIT 1",
  82. {"site_id": self.contents.db.site_ids[self.site.address], "hash_id": hash_id}
  83. )
  84. row = res.fetchone()
  85. if row and row[0]:
  86. return True
  87. else:
  88. return False
  89. def isPinned(self, inner_path):
  90. if inner_path in self.cache_is_pinned:
  91. self.site.log.debug("Cached is pinned: %s" % inner_path)
  92. return self.cache_is_pinned[inner_path]
  93. res = self.contents.db.execute(
  94. "SELECT is_pinned FROM file_optional WHERE site_id = :site_id AND inner_path = :inner_path LIMIT 1",
  95. {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
  96. )
  97. row = res.fetchone()
  98. if row and row[0]:
  99. is_pinned = True
  100. else:
  101. is_pinned = False
  102. self.cache_is_pinned[inner_path] = is_pinned
  103. self.site.log.debug("Cache set is pinned: %s %s" % (inner_path, is_pinned))
  104. return is_pinned
  105. def setPin(self, inner_path, is_pinned):
  106. content_db = self.contents.db
  107. site_id = content_db.site_ids[self.site.address]
  108. content_db.execute("UPDATE file_optional SET is_pinned = %d WHERE ?" % is_pinned, {"site_id": site_id, "inner_path": inner_path})
  109. self.cache_is_pinned = {}
  110. def optionalDelete(self, inner_path):
  111. if self.isPinned(inner_path):
  112. self.site.log.debug("Skip deleting pinned optional file: %s" % inner_path)
  113. return False
  114. else:
  115. return super(ContentManagerPlugin, self).optionalDelete(inner_path)
  116. @PluginManager.registerTo("WorkerManager")
  117. class WorkerManagerPlugin(object):
  118. def doneTask(self, task):
  119. super(WorkerManagerPlugin, self).doneTask(task)
  120. if task["optional_hash_id"] and not self.tasks: # Execute delayed queries immedietly after tasks finished
  121. ContentDbPlugin.content_db.processDelayed()
  122. @PluginManager.registerTo("UiRequest")
  123. class UiRequestPlugin(object):
  124. def parsePath(self, path):
  125. global access_log
  126. path_parts = super(UiRequestPlugin, self).parsePath(path)
  127. if path_parts:
  128. site_id = ContentDbPlugin.content_db.site_ids.get(path_parts["request_address"])
  129. if site_id:
  130. if ContentDbPlugin.content_db.isOptionalFile(site_id, path_parts["inner_path"]):
  131. access_log[site_id][path_parts["inner_path"]] = 1
  132. return path_parts
  133. @PluginManager.registerTo("FileRequest")
  134. class FileRequestPlugin(object):
  135. def actionGetFile(self, params):
  136. stats = super(FileRequestPlugin, self).actionGetFile(params)
  137. self.recordFileRequest(params["site"], params["inner_path"], stats)
  138. return stats
  139. def actionStreamFile(self, params):
  140. stats = super(FileRequestPlugin, self).actionStreamFile(params)
  141. self.recordFileRequest(params["site"], params["inner_path"], stats)
  142. return stats
  143. def recordFileRequest(self, site_address, inner_path, stats):
  144. if not stats:
  145. # Only track the last request of files
  146. return False
  147. site_id = ContentDbPlugin.content_db.site_ids[site_address]
  148. if site_id and ContentDbPlugin.content_db.isOptionalFile(site_id, inner_path):
  149. request_log[site_id][inner_path] += stats["bytes_sent"]
  150. @PluginManager.registerTo("Site")
  151. class SitePlugin(object):
  152. def isDownloadable(self, inner_path):
  153. is_downloadable = super(SitePlugin, self).isDownloadable(inner_path)
  154. if is_downloadable:
  155. return is_downloadable
  156. for path in self.settings.get("optional_help", {}).iterkeys():
  157. if inner_path.startswith(path):
  158. return True
  159. return False
  160. def fileForgot(self, inner_path):
  161. if "|" in inner_path and self.content_manager.isPinned(re.sub(r"\|.*", "", inner_path)):
  162. self.log.debug("File %s is pinned, no fileForgot" % inner_path)
  163. return False
  164. else:
  165. return super(SitePlugin, self).fileForgot(inner_path)
  166. def fileDone(self, inner_path):
  167. if "|" in inner_path and self.bad_files.get(inner_path, 0) > 5: # Idle optional file done
  168. inner_path_file = re.sub(r"\|.*", "", inner_path)
  169. num_changed = 0
  170. for key, val in self.bad_files.items():
  171. if key.startswith(inner_path_file) and val > 1:
  172. self.bad_files[key] = 1
  173. num_changed += 1
  174. self.log.debug("Idle optional file piece done, changed retry number of %s pieces." % num_changed)
  175. if num_changed:
  176. gevent.spawn(self.retryBadFiles)
  177. return super(SitePlugin, self).fileDone(inner_path)
  178. @PluginManager.registerTo("ConfigPlugin")
  179. class ConfigPlugin(object):
  180. def createArguments(self):
  181. group = self.parser.add_argument_group("OptionalManager plugin")
  182. group.add_argument('--optional_limit', help='Limit total size of optional files', default="10%", metavar="GB or free space %")
  183. group.add_argument('--optional_limit_exclude_minsize', help='Exclude files larger than this limit from optional size limit calculation', default=20, metavar="MB", type=int)
  184. return super(ConfigPlugin, self).createArguments()