WorkerManager.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442
  1. import time
  2. import logging
  3. import random
  4. import collections
  5. import gevent
  6. from Worker import Worker
  7. from Config import config
  8. from util import helper
  9. from Plugin import PluginManager
  10. import util
  11. @PluginManager.acceptPlugins
  12. class WorkerManager(object):
  13. def __init__(self, site):
  14. self.site = site
  15. self.workers = {} # Key: ip:port, Value: Worker.Worker
  16. self.tasks = []
  17. # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None,
  18. # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids}
  19. self.started_task_num = 0 # Last added task num
  20. self.running = True
  21. self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short)
  22. self.process_taskchecker = gevent.spawn(self.checkTasks)
  23. def __str__(self):
  24. return "WorkerManager %s" % self.site.address_short
  25. def __repr__(self):
  26. return "<%s>" % self.__str__()
  27. # Check expired tasks
  28. def checkTasks(self):
  29. while self.running:
  30. tasks = task = worker = workers = None # Cleanup local variables
  31. time.sleep(15) # Check every 15 sec
  32. # Clean up workers
  33. for worker in self.workers.values():
  34. if worker.task and worker.task["done"]:
  35. worker.skip() # Stop workers with task done
  36. if not self.tasks:
  37. continue
  38. tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
  39. for task in tasks:
  40. size_extra_time = task["size"] / (1024 * 100) # 1 second for every 100k
  41. if task["time_started"] and time.time() >= task["time_started"] + 60 + size_extra_time:
  42. self.log.debug("Timeout, Skipping: %s" % task) # Task taking too long time, skip it
  43. # Skip to next file workers
  44. workers = self.findWorkers(task)
  45. if workers:
  46. for worker in workers:
  47. worker.skip()
  48. else:
  49. self.failTask(task)
  50. elif time.time() >= task["time_added"] + 60 + size_extra_time and not self.workers: # No workers left
  51. self.log.debug("Timeout, Cleanup task: %s" % task)
  52. # Remove task
  53. self.failTask(task)
  54. elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers:
  55. # Find more workers: Task started more than 15 sec ago or no workers
  56. workers = self.findWorkers(task)
  57. self.log.debug(
  58. "Task taking more than 15+%s secs, workers: %s find more peers: %s (optional_hash_id: %s)" %
  59. (size_extra_time, len(workers), task["inner_path"], task["optional_hash_id"])
  60. )
  61. task["site"].announce(mode="more") # Find more peers
  62. if task["optional_hash_id"]:
  63. self.startFindOptional(find_more=True)
  64. else:
  65. if task["peers"]: # Release the peer lock
  66. self.log.debug("Task peer lock release: %s" % task["inner_path"])
  67. task["peers"] = []
  68. self.startWorkers()
  69. break # One reannounce per loop
  70. self.log.debug("checkTasks stopped running")
  71. # Returns the next free or less worked task
  72. def getTask(self, peer):
  73. # Sort tasks by priority and worker numbers
  74. self.tasks.sort(key=lambda task: task["priority"] - task["workers_num"] * 5, reverse=True)
  75. for task in self.tasks: # Find a task
  76. if task["peers"] and peer not in task["peers"]:
  77. continue # This peer not allowed to pick this task
  78. if peer in task["failed"]:
  79. continue # Peer already tried to solve this, but failed
  80. if task["optional_hash_id"] and task["peers"] is None:
  81. continue # No peers found yet for the optional task
  82. return task
  83. def removeGoodFileTasks(self):
  84. for task in self.tasks[:]:
  85. if task["inner_path"] not in self.site.bad_files:
  86. self.log.debug("No longer in bad_files, marking as good: %s" % task["inner_path"])
  87. task["done"] = True
  88. task["evt"].set(True)
  89. self.tasks.remove(task)
  90. if not self.tasks:
  91. self.started_task_num = 0
  92. self.site.updateWebsocket()
  93. # New peers added to site
  94. def onPeers(self):
  95. self.startWorkers()
  96. def getMaxWorkers(self):
  97. if len(self.tasks) > 100:
  98. return config.connected_limit * 2
  99. else:
  100. return config.connected_limit
  101. # Add new worker
  102. def addWorker(self, peer):
  103. key = peer.key
  104. if key not in self.workers and len(self.workers) < self.getMaxWorkers():
  105. # We dont have worker for that peer and workers num less than max
  106. worker = Worker(self, peer)
  107. self.workers[key] = worker
  108. worker.key = key
  109. worker.start()
  110. return worker
  111. else: # We have woker for this peer or its over the limit
  112. return False
  113. # Start workers to process tasks
  114. def startWorkers(self, peers=None):
  115. if not self.tasks:
  116. return False # No task for workers
  117. if len(self.workers) >= self.getMaxWorkers() and not peers:
  118. return False # Workers number already maxed and no starting peers defined
  119. if not peers:
  120. peers = self.site.getConnectedPeers()
  121. if len(peers) < self.getMaxWorkers():
  122. peers += self.site.peers.values()[0:self.getMaxWorkers()]
  123. if type(peers) is set:
  124. peers = list(peers)
  125. random.shuffle(peers)
  126. for peer in peers: # One worker for every peer
  127. if peers and peer not in peers:
  128. continue # If peers defined and peer not valid
  129. worker = self.addWorker(peer)
  130. if worker:
  131. self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), self.getMaxWorkers()))
  132. # Find peers for optional hash in local hash tables and add to task peers
  133. def findOptionalTasks(self, optional_tasks, reset_task=False):
  134. found = collections.defaultdict(list) # { found_hash: [peer1, peer2...], ...}
  135. for peer in self.site.peers.values():
  136. if not peer.has_hashfield:
  137. continue
  138. hashfield_set = set(peer.hashfield) # Finding in set is much faster
  139. for task in optional_tasks:
  140. optional_hash_id = task["optional_hash_id"]
  141. if optional_hash_id in hashfield_set:
  142. found[optional_hash_id].append(peer)
  143. if task["peers"] and peer not in task["peers"]:
  144. task["peers"].append(peer)
  145. else:
  146. task["peers"] = [peer]
  147. if reset_task and len(task["failed"]) > 0:
  148. task["failed"] = []
  149. return found
  150. # Find peers for optional hash ids in local hash tables
  151. def findOptionalHashIds(self, optional_hash_ids, limit=0):
  152. found = collections.defaultdict(list) # { found_hash_id: [peer1, peer2...], ...}
  153. for peer in self.site.peers.values():
  154. if not peer.has_hashfield:
  155. continue
  156. hashfield_set = set(peer.hashfield) # Finding in set is much faster
  157. for optional_hash_id in optional_hash_ids:
  158. if optional_hash_id in hashfield_set:
  159. found[optional_hash_id].append(peer)
  160. if limit and len(found[optional_hash_id]) >= limit:
  161. optional_hash_ids.remove(optional_hash_id)
  162. return found
  163. # Add peers to tasks from found result
  164. def addOptionalPeers(self, found_ips):
  165. found = collections.defaultdict(list)
  166. for hash_id, peer_ips in found_ips.iteritems():
  167. task = [task for task in self.tasks if task["optional_hash_id"] == hash_id]
  168. if task: # Found task, lets take the first
  169. task = task[0]
  170. else:
  171. continue
  172. for peer_ip in peer_ips:
  173. peer = self.site.addPeer(peer_ip[0], peer_ip[1], return_peer=True)
  174. if not peer:
  175. continue
  176. if task["peers"] is None:
  177. task["peers"] = []
  178. if peer not in task["peers"]:
  179. task["peers"].append(peer)
  180. if peer.hashfield.appendHashId(hash_id): # Peer has this file
  181. peer.time_hashfield = None # Peer hashfield probably outdated
  182. found[hash_id].append(peer)
  183. return found
  184. # Start find peers for optional files
  185. @util.Noparallel(blocking=False)
  186. def startFindOptional(self, reset_task=False, find_more=False):
  187. time.sleep(0.01) # Wait for more file requests
  188. optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
  189. optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
  190. self.log.debug(
  191. "Finding peers for optional files: %s (reset_task: %s, find_more: %s)" %
  192. (optional_hash_ids, reset_task, find_more)
  193. )
  194. found = self.findOptionalTasks(optional_tasks, reset_task=reset_task)
  195. if found:
  196. found_peers = set([peer for peers in found.values() for peer in peers])
  197. self.startWorkers(found_peers)
  198. if len(found) < len(optional_hash_ids) or find_more:
  199. self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found)))
  200. # Query hashfield from connected peers
  201. threads = []
  202. peers = self.site.getConnectedPeers()
  203. if not peers:
  204. peers = self.site.getConnectablePeers()
  205. for peer in peers:
  206. if not peer.time_hashfield:
  207. threads.append(gevent.spawn(peer.updateHashfield))
  208. gevent.joinall(threads, timeout=5)
  209. found = self.findOptionalTasks(optional_tasks)
  210. self.log.debug("Found optional files after query hashtable connected peers: %s/%s" % (
  211. len(found), len(optional_hash_ids)
  212. ))
  213. if found:
  214. found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
  215. self.startWorkers(found_peers)
  216. if len(found) < len(optional_hash_ids) or find_more:
  217. self.log.debug("No connected hashtable result for optional files: %s" % (optional_hash_ids - set(found)))
  218. # Try to query connected peers
  219. threads = []
  220. peers = self.site.getConnectedPeers()
  221. if not peers:
  222. peers = self.site.getConnectablePeers()
  223. for peer in peers:
  224. threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids)))
  225. for i in range(5):
  226. time.sleep(1)
  227. thread_values = [thread.value for thread in threads if thread.value]
  228. if not thread_values:
  229. continue
  230. found_ips = helper.mergeDicts(thread_values)
  231. found = self.addOptionalPeers(found_ips)
  232. self.log.debug("Found optional files after findhash connected peers: %s/%s" % (
  233. len(found), len(optional_hash_ids)
  234. ))
  235. if found:
  236. found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
  237. self.startWorkers(found_peers)
  238. if len(thread_values) == len(threads):
  239. # Got result from all started thread
  240. break
  241. if len(found) < len(optional_hash_ids):
  242. self.log.debug("No findHash result, try random peers: %s" % (optional_hash_ids - set(found)))
  243. # Try to query random peers
  244. threads = []
  245. peers = self.site.getConnectablePeers()
  246. for peer in peers[0:5]:
  247. threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids)))
  248. gevent.joinall(threads, timeout=15)
  249. found_ips = helper.mergeDicts([thread.value for thread in threads if thread.value])
  250. found = self.addOptionalPeers(found_ips)
  251. self.log.debug("Found optional files after findhash random peers: %s/%s" % (len(found), len(optional_hash_ids)))
  252. if found:
  253. found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
  254. self.startWorkers(found_peers)
  255. if len(found) < len(optional_hash_ids):
  256. self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found)))
  257. # Stop all worker
  258. def stopWorkers(self):
  259. for worker in self.workers.values():
  260. worker.stop()
  261. tasks = self.tasks[:] # Copy
  262. for task in tasks: # Mark all current task as failed
  263. self.failTask(task)
  264. # Find workers by task
  265. def findWorkers(self, task):
  266. workers = []
  267. for worker in self.workers.values():
  268. if worker.task == task:
  269. workers.append(worker)
  270. return workers
  271. # Ends and remove a worker
  272. def removeWorker(self, worker):
  273. worker.running = False
  274. if worker.key in self.workers:
  275. del(self.workers[worker.key])
  276. self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), self.getMaxWorkers()))
  277. if len(self.workers) <= self.getMaxWorkers()/2 and any(task["optional_hash_id"] for task in self.tasks):
  278. self.startFindOptional(find_more=True)
  279. # Tasks sorted by this
  280. def getPriorityBoost(self, inner_path):
  281. if inner_path == "content.json":
  282. return 9999 # Content.json always priority
  283. if inner_path == "index.html":
  284. return 9998 # index.html also important
  285. if "-default" in inner_path:
  286. return -4 # Default files are cloning not important
  287. elif inner_path.endswith(".css"):
  288. return 5 # boost css files priority
  289. elif inner_path.endswith(".js"):
  290. return 4 # boost js files priority
  291. elif inner_path.endswith("dbschema.json"):
  292. return 3 # boost database specification
  293. elif inner_path.endswith("content.json"):
  294. return 1 # boost included content.json files priority a bit
  295. elif inner_path.endswith(".json"):
  296. return 2 # boost data json files priority more
  297. return 0
  298. # Create new task and return asyncresult
  299. def addTask(self, inner_path, peer=None, priority=0):
  300. self.site.onFileStart(inner_path) # First task, trigger site download started
  301. task = self.findTask(inner_path)
  302. if task: # Already has task for that file
  303. if peer and task["peers"]: # This peer also has new version, add it to task possible peers
  304. task["peers"].append(peer)
  305. self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
  306. self.startWorkers([peer])
  307. elif peer and peer in task["failed"]:
  308. task["failed"].remove(peer) # New update arrived, remove the peer from failed peers
  309. self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"]))
  310. self.startWorkers([peer])
  311. if priority:
  312. task["priority"] += priority # Boost on priority
  313. return task["evt"]
  314. else: # No task for that file yet
  315. evt = gevent.event.AsyncResult()
  316. if peer:
  317. peers = [peer] # Only download from this peer
  318. else:
  319. peers = None
  320. file_info = self.site.content_manager.getFileInfo(inner_path)
  321. if file_info and file_info["optional"]:
  322. optional_hash_id = helper.toHashId(file_info["sha512"])
  323. else:
  324. optional_hash_id = None
  325. if file_info:
  326. size = file_info.get("size", 0)
  327. else:
  328. size = 0
  329. priority += self.getPriorityBoost(inner_path)
  330. task = {
  331. "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
  332. "optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None,
  333. "time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size
  334. }
  335. self.tasks.append(task)
  336. self.started_task_num += 1
  337. self.log.debug(
  338. "New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" %
  339. (task["inner_path"], peers, priority, optional_hash_id, self.started_task_num)
  340. )
  341. if optional_hash_id:
  342. self.startFindOptional()
  343. if peers:
  344. self.startWorkers(peers)
  345. else:
  346. self.startWorkers(peers)
  347. return evt
  348. # Find a task using inner_path
  349. def findTask(self, inner_path):
  350. for task in self.tasks:
  351. if task["inner_path"] == inner_path:
  352. return task
  353. return None # Not found
  354. # Wait for other tasks
  355. def checkComplete(self):
  356. time.sleep(0.1)
  357. if not self.tasks:
  358. self.log.debug("Check compelte: No tasks")
  359. self.started_task_num = 0
  360. self.site.onComplete() # No more task trigger site complete
  361. # Mark a task done
  362. def doneTask(self, task):
  363. task["done"] = True
  364. self.tasks.remove(task) # Remove from queue
  365. if task["optional_hash_id"]:
  366. self.log.debug("Downloaded optional file, adding to hashfield: %s" % task["inner_path"])
  367. self.site.content_manager.hashfield.appendHashId(task["optional_hash_id"])
  368. self.site.onFileDone(task["inner_path"])
  369. task["evt"].set(True)
  370. if not self.tasks:
  371. gevent.spawn(self.checkComplete)
  372. # Mark a task failed
  373. def failTask(self, task):
  374. if task in self.tasks:
  375. task["done"] = True
  376. self.tasks.remove(task) # Remove from queue
  377. self.site.onFileFail(task["inner_path"])
  378. task["evt"].set(False)
  379. if not self.tasks:
  380. self.started_task_num = 0