Worker.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. import time
  2. import gevent
  3. from Debug import Debug
  4. from Config import config
  5. class Worker(object):
  6. def __init__(self, manager, peer):
  7. self.manager = manager
  8. self.peer = peer
  9. self.task = None
  10. self.key = None
  11. self.running = False
  12. self.thread = None
  13. def __str__(self):
  14. return "Worker %s %s" % (self.manager.site.address_short, self.key)
  15. def __repr__(self):
  16. return "<%s>" % self.__str__()
  17. # Downloader thread
  18. def downloader(self):
  19. self.peer.hash_failed = 0 # Reset hash error counter
  20. while self.running:
  21. # Try to pickup free file download task
  22. task = self.manager.getTask(self.peer)
  23. if not task: # Die, no more task
  24. self.manager.log.debug("%s: No task found, stopping" % self.key)
  25. break
  26. if not task["time_started"]:
  27. task["time_started"] = time.time() # Task started now
  28. if task["workers_num"] > 0: # Wait a bit if someone already working on it
  29. if config.verbose:
  30. self.manager.log.debug("%s: Someone already working on %s, sleeping 1 sec..." % (self.key, task["inner_path"]))
  31. time.sleep(1)
  32. if config.verbose:
  33. self.manager.log.debug("%s: %s, task done after sleep: %s" % (self.key, task["inner_path"], task["done"]))
  34. if task["done"] is False:
  35. self.task = task
  36. site = task["site"]
  37. task["workers_num"] += 1
  38. try:
  39. buff = self.peer.getFile(site.address, task["inner_path"])
  40. except Exception, err:
  41. self.manager.log.debug("%s: getFile error: %s" % (self.key, err))
  42. buff = None
  43. if self.running is False: # Worker no longer needed or got killed
  44. self.manager.log.debug("%s: No longer needed, returning: %s" % (self.key, task["inner_path"]))
  45. break
  46. if task["done"] is True: # Task done, try to find new one
  47. continue
  48. if buff: # Download ok
  49. correct = site.content_manager.verifyFile(task["inner_path"], buff)
  50. else: # Download error
  51. correct = False
  52. if correct is True or correct is None: # Hash ok or same file
  53. self.manager.log.debug("%s: Hash correct: %s" % (self.key, task["inner_path"]))
  54. if correct is True and task["done"] is False: # Save if changed and task not done yet
  55. buff.seek(0)
  56. site.storage.write(task["inner_path"], buff)
  57. if task["done"] is False:
  58. self.manager.doneTask(task)
  59. task["workers_num"] -= 1
  60. self.task = None
  61. else: # Hash failed
  62. self.manager.log.debug(
  63. "%s: Hash failed: %s, failed peers: %s" %
  64. (self.key, task["inner_path"], len(task["failed"]))
  65. )
  66. task["failed"].append(self.peer)
  67. self.task = None
  68. self.peer.hash_failed += 1
  69. if self.peer.hash_failed >= max(len(self.manager.tasks), 3) or self.peer.connection_error > 10:
  70. # Broken peer: More fails than tasks number but atleast 3
  71. break
  72. task["workers_num"] -= 1
  73. time.sleep(1)
  74. self.peer.onWorkerDone()
  75. self.running = False
  76. self.manager.removeWorker(self)
  77. # Start the worker
  78. def start(self):
  79. self.running = True
  80. self.thread = gevent.spawn(self.downloader)
  81. # Skip current task
  82. def skip(self):
  83. self.manager.log.debug("%s: Force skipping" % self.key)
  84. if self.thread:
  85. self.thread.kill(exception=Debug.Notify("Worker stopped"))
  86. self.start()
  87. # Force stop the worker
  88. def stop(self):
  89. self.manager.log.debug("%s: Force stopping" % self.key)
  90. self.running = False
  91. if self.thread:
  92. self.thread.kill(exception=Debug.Notify("Worker stopped"))
  93. del self.thread
  94. self.manager.removeWorker(self)