WorkerManager.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511
  1. import time
  2. import logging
  3. import random
  4. import collections
  5. import gevent
  6. from Worker import Worker
  7. from Config import config
  8. from util import helper
  9. from Plugin import PluginManager
  10. import util
  11. @PluginManager.acceptPlugins
  12. class WorkerManager(object):
  13. def __init__(self, site):
  14. self.site = site
  15. self.workers = {} # Key: ip:port, Value: Worker.Worker
  16. self.tasks = []
  17. # {"evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False, "optional_hash_id": None,
  18. # "time_started": None, "time_added": time.time(), "peers": peers, "priority": 0, "failed": peer_ids}
  19. self.started_task_num = 0 # Last added task num
  20. self.asked_peers = []
  21. self.running = True
  22. self.time_task_added = 0
  23. self.log = logging.getLogger("WorkerManager:%s" % self.site.address_short)
  24. self.process_taskchecker = gevent.spawn(self.checkTasks)
  25. def __str__(self):
  26. return "WorkerManager %s" % self.site.address_short
  27. def __repr__(self):
  28. return "<%s>" % self.__str__()
  29. # Check expired tasks
  30. def checkTasks(self):
  31. while self.running:
  32. tasks = task = worker = workers = None # Cleanup local variables
  33. time.sleep(15) # Check every 15 sec
  34. # Clean up workers
  35. for worker in self.workers.values():
  36. if worker.task and worker.task["done"]:
  37. worker.skip() # Stop workers with task done
  38. if not self.tasks:
  39. continue
  40. tasks = self.tasks[:] # Copy it so removing elements wont cause any problem
  41. for task in tasks:
  42. size_extra_time = task["size"] / (1024 * 100) # 1 second for every 100k
  43. if task["time_started"] and time.time() >= task["time_started"] + 60 + size_extra_time:
  44. self.log.debug("Timeout, Skipping: %s" % task) # Task taking too long time, skip it
  45. # Skip to next file workers
  46. workers = self.findWorkers(task)
  47. if workers:
  48. for worker in workers:
  49. worker.skip()
  50. else:
  51. self.failTask(task)
  52. elif time.time() >= task["time_added"] + 60 + size_extra_time and not self.workers: # No workers left
  53. self.log.debug("Timeout, Cleanup task: %s" % task)
  54. # Remove task
  55. self.failTask(task)
  56. elif (task["time_started"] and time.time() >= task["time_started"] + 15) or not self.workers:
  57. # Find more workers: Task started more than 15 sec ago or no workers
  58. workers = self.findWorkers(task)
  59. self.log.debug(
  60. "Slow task: %s 15+%ss, (workers: %s, optional_hash_id: %s, peers: %s, failed: %s, asked: %s)" %
  61. (
  62. task["inner_path"], size_extra_time, len(workers), task["optional_hash_id"],
  63. len(task["peers"] or []), len(task["failed"]), len(self.asked_peers)
  64. )
  65. )
  66. task["site"].announce(mode="more") # Find more peers
  67. if task["optional_hash_id"]:
  68. if self.workers:
  69. if not task["time_started"]:
  70. ask_limit = 20
  71. elif task["priority"] > 0:
  72. ask_limit = max(10, time.time() - task["time_started"])
  73. else:
  74. ask_limit = max(10, (time.time() - task["time_started"]) / 2)
  75. if len(self.asked_peers) < ask_limit and len(task["peers"] or []) <= len(task["failed"]) * 2:
  76. # Re-search for high priority
  77. self.startFindOptional(find_more=True)
  78. elif task["peers"]:
  79. peers_try = [peer for peer in task["peers"] if peer not in task["failed"]]
  80. if peers_try:
  81. self.startWorkers(peers_try)
  82. else:
  83. self.startFindOptional(find_more=True)
  84. else:
  85. if task["peers"]: # Release the peer lock
  86. self.log.debug("Task peer lock release: %s" % task["inner_path"])
  87. task["peers"] = []
  88. self.startWorkers()
  89. break # One reannounce per loop
  90. self.log.debug("checkTasks stopped running")
  91. # Returns the next free or less worked task
  92. def getTask(self, peer):
  93. # Sort tasks by priority and worker numbers
  94. self.tasks.sort(key=lambda task: task["priority"] - task["workers_num"] * 5, reverse=True)
  95. for task in self.tasks: # Find a task
  96. if task["peers"] and peer not in task["peers"]:
  97. continue # This peer not allowed to pick this task
  98. if peer in task["failed"]:
  99. continue # Peer already tried to solve this, but failed
  100. if task["optional_hash_id"] and task["peers"] is None:
  101. continue # No peers found yet for the optional task
  102. return task
  103. def removeGoodFileTasks(self):
  104. for task in self.tasks[:]:
  105. if task["inner_path"] not in self.site.bad_files:
  106. self.log.debug("No longer in bad_files, marking as good: %s" % task["inner_path"])
  107. task["done"] = True
  108. task["evt"].set(True)
  109. self.tasks.remove(task)
  110. if not self.tasks:
  111. self.started_task_num = 0
  112. self.site.updateWebsocket()
  113. # New peers added to site
  114. def onPeers(self):
  115. self.startWorkers()
  116. def getMaxWorkers(self):
  117. if len(self.tasks) > 50:
  118. return config.workers * 3
  119. else:
  120. return config.workers
  121. # Add new worker
  122. def addWorker(self, peer):
  123. key = peer.key
  124. if key not in self.workers and len(self.workers) < self.getMaxWorkers():
  125. # We dont have worker for that peer and workers num less than max
  126. worker = Worker(self, peer)
  127. self.workers[key] = worker
  128. worker.key = key
  129. worker.start()
  130. return worker
  131. else: # We have woker for this peer or its over the limit
  132. return False
  133. # Start workers to process tasks
  134. def startWorkers(self, peers=None):
  135. if not self.tasks:
  136. return False # No task for workers
  137. if len(self.workers) >= self.getMaxWorkers() and not peers:
  138. return False # Workers number already maxed and no starting peers defined
  139. self.log.debug("Starting workers, tasks: %s, peers: %s, workers: %s" % (len(self.tasks), len(peers or []), len(self.workers)))
  140. if not peers:
  141. peers = self.site.getConnectedPeers()
  142. if len(peers) < self.getMaxWorkers():
  143. peers += self.site.peers.values()[0:self.getMaxWorkers()]
  144. if type(peers) is set:
  145. peers = list(peers)
  146. # Sort by ping
  147. peers.sort(key = lambda peer: peer.connection.last_ping_delay if peer.connection and len(peer.connection.waiting_requests) == 0 else 9999)
  148. for peer in peers: # One worker for every peer
  149. if peers and peer not in peers:
  150. continue # If peers defined and peer not valid
  151. worker = self.addWorker(peer)
  152. if worker:
  153. self.log.debug("Added worker: %s, workers: %s/%s" % (peer.key, len(self.workers), self.getMaxWorkers()))
  154. # Find peers for optional hash in local hash tables and add to task peers
  155. def findOptionalTasks(self, optional_tasks, reset_task=False):
  156. found = collections.defaultdict(list) # { found_hash: [peer1, peer2...], ...}
  157. for peer in self.site.peers.values():
  158. if not peer.has_hashfield:
  159. continue
  160. hashfield_set = set(peer.hashfield) # Finding in set is much faster
  161. for task in optional_tasks:
  162. optional_hash_id = task["optional_hash_id"]
  163. if optional_hash_id in hashfield_set:
  164. if reset_task and len(task["failed"]) > 0:
  165. task["failed"] = []
  166. if peer in task["failed"]:
  167. continue
  168. found[optional_hash_id].append(peer)
  169. if task["peers"] and peer not in task["peers"]:
  170. task["peers"].append(peer)
  171. else:
  172. task["peers"] = [peer]
  173. return found
  174. # Find peers for optional hash ids in local hash tables
  175. def findOptionalHashIds(self, optional_hash_ids, limit=0):
  176. found = collections.defaultdict(list) # { found_hash_id: [peer1, peer2...], ...}
  177. for peer in self.site.peers.values():
  178. if not peer.has_hashfield:
  179. continue
  180. hashfield_set = set(peer.hashfield) # Finding in set is much faster
  181. for optional_hash_id in optional_hash_ids:
  182. if optional_hash_id in hashfield_set:
  183. found[optional_hash_id].append(peer)
  184. if limit and len(found[optional_hash_id]) >= limit:
  185. optional_hash_ids.remove(optional_hash_id)
  186. return found
  187. # Add peers to tasks from found result
  188. def addOptionalPeers(self, found_ips):
  189. found = collections.defaultdict(list)
  190. for hash_id, peer_ips in found_ips.iteritems():
  191. task = [task for task in self.tasks if task["optional_hash_id"] == hash_id]
  192. if task: # Found task, lets take the first
  193. task = task[0]
  194. else:
  195. continue
  196. for peer_ip in peer_ips:
  197. peer = self.site.addPeer(peer_ip[0], peer_ip[1], return_peer=True)
  198. if not peer:
  199. continue
  200. if task["peers"] is None:
  201. task["peers"] = []
  202. if peer not in task["peers"]:
  203. task["peers"].append(peer)
  204. found[hash_id].append(peer)
  205. if peer.hashfield.appendHashId(hash_id): # Peer has this file
  206. peer.time_hashfield = None # Peer hashfield probably outdated
  207. return found
  208. # Start find peers for optional files
  209. @util.Noparallel(blocking=False, ignore_args=True)
  210. def startFindOptional(self, reset_task=False, find_more=False, high_priority=False):
  211. # Wait for more file requests
  212. if len(self.tasks) < 20 or high_priority:
  213. time.sleep(0.01)
  214. if len(self.tasks) > 90:
  215. time.sleep(5)
  216. else:
  217. time.sleep(0.5)
  218. optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
  219. if not optional_tasks:
  220. return False
  221. optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
  222. time_tasks = self.time_task_added
  223. self.log.debug(
  224. "Finding peers for optional files: %s (reset_task: %s, find_more: %s)" %
  225. (optional_hash_ids, reset_task, find_more)
  226. )
  227. found = self.findOptionalTasks(optional_tasks, reset_task=reset_task)
  228. if found:
  229. found_peers = set([peer for peers in found.values() for peer in peers])
  230. self.startWorkers(found_peers)
  231. if len(found) < len(optional_hash_ids) or find_more or (high_priority and any(len(peers) < 10 for peers in found.itervalues())):
  232. self.log.debug("No local result for optional files: %s" % (optional_hash_ids - set(found)))
  233. # Query hashfield from connected peers
  234. threads = []
  235. peers = self.site.getConnectedPeers()
  236. if not peers:
  237. peers = self.site.getConnectablePeers()
  238. for peer in peers:
  239. if not peer.time_hashfield:
  240. threads.append(gevent.spawn(peer.updateHashfield))
  241. gevent.joinall(threads, timeout=5)
  242. if time_tasks != self.time_task_added: # New task added since start
  243. optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
  244. optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
  245. found = self.findOptionalTasks(optional_tasks)
  246. self.log.debug("Found optional files after query hashtable connected peers: %s/%s" % (
  247. len(found), len(optional_hash_ids)
  248. ))
  249. if found:
  250. found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
  251. self.startWorkers(found_peers)
  252. if len(found) < len(optional_hash_ids) or find_more:
  253. self.log.debug("No connected hashtable result for optional files: %s" % (optional_hash_ids - set(found)))
  254. # Try to query connected peers
  255. threads = []
  256. peers = [peer for peer in self.site.getConnectedPeers() if peer not in self.asked_peers]
  257. if not peers:
  258. peers = self.site.getConnectablePeers()
  259. for peer in peers:
  260. threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids)))
  261. self.asked_peers.append(peer)
  262. for i in range(5):
  263. time.sleep(1)
  264. thread_values = [thread.value for thread in threads if thread.value]
  265. if not thread_values:
  266. continue
  267. found_ips = helper.mergeDicts(thread_values)
  268. found = self.addOptionalPeers(found_ips)
  269. self.log.debug("Found optional files after findhash connected peers: %s/%s (asked: %s)" % (
  270. len(found), len(optional_hash_ids), len(threads)
  271. ))
  272. if found:
  273. found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
  274. self.startWorkers(found_peers)
  275. if len(thread_values) == len(threads):
  276. # Got result from all started thread
  277. break
  278. if len(found) < len(optional_hash_ids):
  279. self.log.debug("No findHash result, try random peers: %s" % (optional_hash_ids - set(found)))
  280. # Try to query random peers
  281. if time_tasks != self.time_task_added: # New task added since start
  282. optional_tasks = [task for task in self.tasks if task["optional_hash_id"]]
  283. optional_hash_ids = set([task["optional_hash_id"] for task in optional_tasks])
  284. threads = []
  285. peers = self.site.getConnectablePeers(ignore=self.asked_peers)
  286. for peer in peers:
  287. threads.append(gevent.spawn(peer.findHashIds, list(optional_hash_ids)))
  288. self.asked_peers.append(peer)
  289. gevent.joinall(threads, timeout=15)
  290. found_ips = helper.mergeDicts([thread.value for thread in threads if thread.value])
  291. found = self.addOptionalPeers(found_ips)
  292. self.log.debug("Found optional files after findhash random peers: %s/%s" % (len(found), len(optional_hash_ids)))
  293. if found:
  294. found_peers = set([peer for hash_id_peers in found.values() for peer in hash_id_peers])
  295. self.startWorkers(found_peers)
  296. if len(found) < len(optional_hash_ids):
  297. self.log.debug("No findhash result for optional files: %s" % (optional_hash_ids - set(found)))
  298. # Stop all worker
  299. def stopWorkers(self):
  300. for worker in self.workers.values():
  301. worker.stop()
  302. tasks = self.tasks[:] # Copy
  303. for task in tasks: # Mark all current task as failed
  304. self.failTask(task)
  305. # Find workers by task
  306. def findWorkers(self, task):
  307. workers = []
  308. for worker in self.workers.values():
  309. if worker.task == task:
  310. workers.append(worker)
  311. return workers
  312. # Ends and remove a worker
  313. def removeWorker(self, worker):
  314. worker.running = False
  315. if worker.key in self.workers:
  316. del(self.workers[worker.key])
  317. self.log.debug("Removed worker, workers: %s/%s" % (len(self.workers), self.getMaxWorkers()))
  318. if len(self.workers) <= self.getMaxWorkers() / 3 and len(self.asked_peers) < 10:
  319. important_task = (task for task in self.tasks if task["priority"] > 0)
  320. if next(important_task, None) or len(self.asked_peers) == 0:
  321. self.startFindOptional(find_more=True)
  322. else:
  323. self.startFindOptional()
  324. # Tasks sorted by this
  325. def getPriorityBoost(self, inner_path):
  326. if inner_path == "content.json":
  327. return 9999 # Content.json always priority
  328. if inner_path == "index.html":
  329. return 9998 # index.html also important
  330. if "-default" in inner_path:
  331. return -4 # Default files are cloning not important
  332. elif inner_path.endswith("all.css"):
  333. return 13 # boost css files priority
  334. elif inner_path.endswith("all.js"):
  335. return 12 # boost js files priority
  336. elif inner_path.endswith("dbschema.json"):
  337. return 11 # boost database specification
  338. elif inner_path.endswith("content.json"):
  339. return 1 # boost included content.json files priority a bit
  340. elif inner_path.endswith(".json"):
  341. if len(inner_path) < 50: # Boost non-user json files
  342. return 10
  343. else:
  344. return 2
  345. return 0
  346. # Create new task and return asyncresult
  347. def addTask(self, inner_path, peer=None, priority=0):
  348. self.site.onFileStart(inner_path) # First task, trigger site download started
  349. task = self.findTask(inner_path)
  350. if task: # Already has task for that file
  351. if peer and task["peers"]: # This peer also has new version, add it to task possible peers
  352. task["peers"].append(peer)
  353. self.log.debug("Added peer %s to %s" % (peer.key, task["inner_path"]))
  354. self.startWorkers([peer])
  355. elif peer and peer in task["failed"]:
  356. task["failed"].remove(peer) # New update arrived, remove the peer from failed peers
  357. self.log.debug("Removed peer %s from failed %s" % (peer.key, task["inner_path"]))
  358. self.startWorkers([peer])
  359. if priority:
  360. task["priority"] += priority # Boost on priority
  361. return task["evt"]
  362. else: # No task for that file yet
  363. evt = gevent.event.AsyncResult()
  364. if peer:
  365. peers = [peer] # Only download from this peer
  366. else:
  367. peers = None
  368. file_info = self.site.content_manager.getFileInfo(inner_path)
  369. if file_info and file_info["optional"]:
  370. optional_hash_id = helper.toHashId(file_info["sha512"])
  371. else:
  372. optional_hash_id = None
  373. if file_info:
  374. size = file_info.get("size", 0)
  375. else:
  376. size = 0
  377. priority += self.getPriorityBoost(inner_path)
  378. if self.started_task_num == 0: # Boost priority for first requested file
  379. priority += 1
  380. task = {
  381. "evt": evt, "workers_num": 0, "site": self.site, "inner_path": inner_path, "done": False,
  382. "optional_hash_id": optional_hash_id, "time_added": time.time(), "time_started": None,
  383. "time_action": None, "peers": peers, "priority": priority, "failed": [], "size": size
  384. }
  385. self.tasks.append(task)
  386. self.started_task_num += 1
  387. self.log.debug(
  388. "New task: %s, peer lock: %s, priority: %s, optional_hash_id: %s, tasks started: %s" %
  389. (task["inner_path"], peers, priority, optional_hash_id, self.started_task_num)
  390. )
  391. self.time_task_added = time.time()
  392. if optional_hash_id:
  393. if self.asked_peers:
  394. del self.asked_peers[:] # Reset asked peers
  395. self.startFindOptional(high_priority=priority > 0)
  396. if peers:
  397. self.startWorkers(peers)
  398. else:
  399. self.startWorkers(peers)
  400. return evt
  401. # Find a task using inner_path
  402. def findTask(self, inner_path):
  403. for task in self.tasks:
  404. if task["inner_path"] == inner_path:
  405. return task
  406. return None # Not found
  407. # Wait for other tasks
  408. def checkComplete(self):
  409. time.sleep(0.1)
  410. if not self.tasks:
  411. self.log.debug("Check compelte: No tasks")
  412. self.onComplete()
  413. def onComplete(self):
  414. self.started_task_num = 0
  415. del self.asked_peers[:]
  416. self.site.onComplete() # No more task trigger site complete
  417. # Mark a task done
  418. def doneTask(self, task):
  419. task["done"] = True
  420. self.tasks.remove(task) # Remove from queue
  421. if task["optional_hash_id"]:
  422. self.log.debug("Downloaded optional file, adding to hashfield: %s" % task["inner_path"])
  423. self.site.content_manager.optionalDownloaded(task["inner_path"], task["optional_hash_id"], task["size"])
  424. self.site.onFileDone(task["inner_path"])
  425. task["evt"].set(True)
  426. if not self.tasks:
  427. gevent.spawn(self.checkComplete)
  428. # Mark a task failed
  429. def failTask(self, task):
  430. if task in self.tasks:
  431. task["done"] = True
  432. self.tasks.remove(task) # Remove from queue
  433. self.site.onFileFail(task["inner_path"])
  434. task["evt"].set(False)
  435. if not self.tasks:
  436. self.started_task_num = 0