Noparallel.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155
  1. import gevent
  2. import time
  3. class Noparallel(object): # Only allow function running once in same time
  4. def __init__(self, blocking=True, ignore_args=False, ignore_class=False, queue=False):
  5. self.threads = {}
  6. self.blocking = blocking # Blocking: Acts like normal function else thread returned
  7. self.queue = queue
  8. self.queued = False
  9. self.ignore_args = ignore_args
  10. self.ignore_class = ignore_class
  11. def __call__(self, func):
  12. def wrapper(*args, **kwargs):
  13. if self.ignore_class:
  14. key = func # Unique key only by function and class object
  15. elif self.ignore_args:
  16. key = (func, args[0]) # Unique key only by function and class object
  17. else:
  18. key = (func, tuple(args), str(kwargs)) # Unique key for function including parameters
  19. if key in self.threads: # Thread already running (if using blocking mode)
  20. if self.queue:
  21. self.queued = True
  22. thread = self.threads[key]
  23. if self.blocking:
  24. thread.join() # Blocking until its finished
  25. if self.queued:
  26. self.queued = False
  27. return wrapper(*args, **kwargs) # Run again after the end
  28. else:
  29. return thread.value # Return the value
  30. else: # No blocking
  31. if thread.ready(): # Its finished, create a new
  32. thread = gevent.spawn(func, *args, **kwargs)
  33. self.threads[key] = thread
  34. return thread
  35. else: # Still running
  36. return thread
  37. else: # Thread not running
  38. thread = gevent.spawn(func, *args, **kwargs) # Spawning new thread
  39. thread.link(lambda thread: self.cleanup(key, thread))
  40. self.threads[key] = thread
  41. if self.blocking: # Wait for finish
  42. thread.join()
  43. ret = thread.value
  44. return ret
  45. else: # No blocking just return the thread
  46. return thread
  47. wrapper.func_name = func.func_name
  48. return wrapper
  49. # Cleanup finished threads
  50. def cleanup(self, key, thread):
  51. if key in self.threads:
  52. del(self.threads[key])
  53. if __name__ == "__main__":
  54. class Test():
  55. @Noparallel()
  56. def count(self, num=5):
  57. for i in range(num):
  58. print self, i
  59. time.sleep(1)
  60. return "%s return:%s" % (self, i)
  61. class TestNoblock():
  62. @Noparallel(blocking=False)
  63. def count(self, num=5):
  64. for i in range(num):
  65. print self, i
  66. time.sleep(1)
  67. return "%s return:%s" % (self, i)
  68. def testBlocking():
  69. test = Test()
  70. test2 = Test()
  71. print "Counting..."
  72. print "Creating class1/thread1"
  73. thread1 = gevent.spawn(test.count)
  74. print "Creating class1/thread2 (ignored)"
  75. thread2 = gevent.spawn(test.count)
  76. print "Creating class2/thread3"
  77. thread3 = gevent.spawn(test2.count)
  78. print "Joining class1/thread1"
  79. thread1.join()
  80. print "Joining class1/thread2"
  81. thread2.join()
  82. print "Joining class2/thread3"
  83. thread3.join()
  84. print "Creating class1/thread4 (its finished, allowed again)"
  85. thread4 = gevent.spawn(test.count)
  86. print "Joining thread4"
  87. thread4.join()
  88. print thread1.value, thread2.value, thread3.value, thread4.value
  89. print "Done."
  90. def testNoblocking():
  91. test = TestNoblock()
  92. test2 = TestNoblock()
  93. print "Creating class1/thread1"
  94. thread1 = test.count()
  95. print "Creating class1/thread2 (ignored)"
  96. thread2 = test.count()
  97. print "Creating class2/thread3"
  98. thread3 = test2.count()
  99. print "Joining class1/thread1"
  100. thread1.join()
  101. print "Joining class1/thread2"
  102. thread2.join()
  103. print "Joining class2/thread3"
  104. thread3.join()
  105. print "Creating class1/thread4 (its finished, allowed again)"
  106. thread4 = test.count()
  107. print "Joining thread4"
  108. thread4.join()
  109. print thread1.value, thread2.value, thread3.value, thread4.value
  110. print "Done."
  111. def testBenchmark():
  112. import time
  113. def printThreadNum():
  114. import gc
  115. from greenlet import greenlet
  116. objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
  117. print "Greenlets: %s" % len(objs)
  118. printThreadNum()
  119. test = TestNoblock()
  120. s = time.time()
  121. for i in range(3):
  122. gevent.spawn(test.count, i + 1)
  123. print "Created in %.3fs" % (time.time() - s)
  124. printThreadNum()
  125. time.sleep(5)
  126. from gevent import monkey
  127. monkey.patch_all()
  128. testBenchmark()
  129. print "Testing blocking mode..."
  130. testBlocking()
  131. print "Testing noblocking mode..."
  132. testNoblocking()