OptionalManagerPlugin.py 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171
  1. import time
  2. import collections
  3. from util import helper
  4. from Plugin import PluginManager
  5. import ContentDbPlugin
  6. # We can only import plugin host clases after the plugins are loaded
  7. @PluginManager.afterLoad
  8. def importPluginnedClasses():
  9. global config
  10. from Config import config
  11. def processAccessLog():
  12. if access_log:
  13. content_db = ContentDbPlugin.content_db
  14. now = int(time.time())
  15. num = 0
  16. for site_id in access_log:
  17. content_db.execute(
  18. "UPDATE file_optional SET time_accessed = %s WHERE ?" % now,
  19. {"site_id": site_id, "inner_path": access_log[site_id].keys()}
  20. )
  21. num += len(access_log[site_id])
  22. access_log.clear()
  23. def processRequestLog():
  24. if request_log:
  25. content_db = ContentDbPlugin.content_db
  26. cur = content_db.getCursor()
  27. num = 0
  28. cur.execute("BEGIN")
  29. for site_id in request_log:
  30. for inner_path, uploaded in request_log[site_id].iteritems():
  31. content_db.execute(
  32. "UPDATE file_optional SET uploaded = uploaded + %s WHERE ?" % uploaded,
  33. {"site_id": site_id, "inner_path": inner_path}
  34. )
  35. num += 1
  36. cur.execute("END")
  37. request_log.clear()
  38. if "access_log" not in locals().keys(): # To keep between module reloads
  39. access_log = collections.defaultdict(dict) # {site_id: {inner_path1: 1, inner_path2: 1...}}
  40. request_log = collections.defaultdict(lambda: collections.defaultdict(int)) # {site_id: {inner_path1: 1, inner_path2: 1...}}
  41. helper.timer(61, processAccessLog)
  42. helper.timer(60, processRequestLog)
  43. @PluginManager.registerTo("ContentManager")
  44. class ContentManagerPlugin(object):
  45. def optionalDownloaded(self, inner_path, hash_id, size=None, own=False):
  46. is_pinned = 0
  47. if "|" in inner_path: # Big file piece
  48. file_inner_path, file_range = inner_path.split("|")
  49. # Auto-pin bigfiles
  50. if size and config.pin_bigfile and size > 1024 * 1024 * config.pin_bigfile:
  51. is_pinned = 1
  52. else:
  53. file_inner_path = inner_path
  54. self.contents.db.executeDelayed(
  55. "UPDATE file_optional SET time_downloaded = :now, is_downloaded = 1, peer = peer + 1, is_pinned = :is_pinned WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 0",
  56. {"now": int(time.time()), "site_id": self.contents.db.site_ids[self.site.address], "inner_path": file_inner_path, "is_pinned": is_pinned}
  57. )
  58. return super(ContentManagerPlugin, self).optionalDownloaded(inner_path, hash_id, size, own)
  59. def optionalRemoved(self, inner_path, hash_id, size=None):
  60. self.contents.db.execute(
  61. "UPDATE file_optional SET is_downloaded = 0, peer = peer - 1 WHERE site_id = :site_id AND inner_path = :inner_path AND is_downloaded = 1",
  62. {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
  63. )
  64. print "Removed hash_id: %s" % hash_id, self.contents.db.cur.cursor.rowcount
  65. if self.contents.db.cur.cursor.rowcount > 0:
  66. back = super(ContentManagerPlugin, self).optionalRemoved(inner_path, hash_id, size)
  67. # Re-add to hashfield if we have other file with the same hash_id
  68. if self.isDownloaded(hash_id=hash_id, force_check_db=True):
  69. self.hashfield.appendHashId(hash_id)
  70. def isDownloaded(self, inner_path=None, hash_id=None, force_check_db=False):
  71. if hash_id and not force_check_db and hash_id not in self.hashfield:
  72. return False
  73. if inner_path:
  74. res = self.contents.db.execute(
  75. "SELECT is_downloaded FROM file_optional WHERE site_id = :site_id AND inner_path = :inner_path LIMIT 1",
  76. {"site_id": self.contents.db.site_ids[self.site.address], "inner_path": inner_path}
  77. )
  78. else:
  79. res = self.contents.db.execute(
  80. "SELECT is_downloaded FROM file_optional WHERE site_id = :site_id AND hash_id = :hash_id AND is_downloaded = 1 LIMIT 1",
  81. {"site_id": self.contents.db.site_ids[self.site.address], "hash_id": hash_id}
  82. )
  83. row = res.fetchone()
  84. if row and row[0]:
  85. return True
  86. else:
  87. return False
  88. @PluginManager.registerTo("WorkerManager")
  89. class WorkerManagerPlugin(object):
  90. def doneTask(self, task):
  91. super(WorkerManagerPlugin, self).doneTask(task)
  92. if task["optional_hash_id"] and not self.tasks: # Execute delayed queries immedietly after tasks finished
  93. ContentDbPlugin.content_db.processDelayed()
  94. @PluginManager.registerTo("UiRequest")
  95. class UiRequestPlugin(object):
  96. def parsePath(self, path):
  97. global access_log
  98. path_parts = super(UiRequestPlugin, self).parsePath(path)
  99. if path_parts:
  100. site_id = ContentDbPlugin.content_db.site_ids.get(path_parts["request_address"])
  101. if site_id:
  102. if ContentDbPlugin.content_db.isOptionalFile(site_id, path_parts["inner_path"]):
  103. access_log[site_id][path_parts["inner_path"]] = 1
  104. return path_parts
  105. @PluginManager.registerTo("FileRequest")
  106. class FileRequestPlugin(object):
  107. def actionGetFile(self, params):
  108. stats = super(FileRequestPlugin, self).actionGetFile(params)
  109. self.recordFileRequest(params["site"], params["inner_path"], stats)
  110. return stats
  111. def actionStreamFile(self, params):
  112. stats = super(FileRequestPlugin, self).actionStreamFile(params)
  113. self.recordFileRequest(params["site"], params["inner_path"], stats)
  114. return stats
  115. def recordFileRequest(self, site_address, inner_path, stats):
  116. if not stats:
  117. # Only track the last request of files
  118. return False
  119. site_id = ContentDbPlugin.content_db.site_ids[site_address]
  120. if site_id and ContentDbPlugin.content_db.isOptionalFile(site_id, inner_path):
  121. request_log[site_id][inner_path] += stats["bytes_sent"]
  122. @PluginManager.registerTo("Site")
  123. class SitePlugin(object):
  124. def isDownloadable(self, inner_path):
  125. is_downloadable = super(SitePlugin, self).isDownloadable(inner_path)
  126. if is_downloadable:
  127. return is_downloadable
  128. for path in self.settings.get("optional_help", {}).iterkeys():
  129. if inner_path.startswith(path):
  130. return True
  131. return False
  132. @PluginManager.registerTo("ConfigPlugin")
  133. class ConfigPlugin(object):
  134. def createArguments(self):
  135. group = self.parser.add_argument_group("OptionalManager plugin")
  136. group.add_argument('--optional_limit', help='Limit total size of optional files', default="10%", metavar="GB or free space %")
  137. group.add_argument('--pin_bigfile', help='Automatically pin files larger than this limit', default=20, metavar="MB", type=int)
  138. return super(ConfigPlugin, self).createArguments()