123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155 |
- import gevent
- import time
- class Noparallel(object): # Only allow function running once in same time
- def __init__(self, blocking=True, ignore_args=False, ignore_class=False, queue=False):
- self.threads = {}
- self.blocking = blocking # Blocking: Acts like normal function else thread returned
- self.queue = queue
- self.queued = False
- self.ignore_args = ignore_args
- self.ignore_class = ignore_class
- def __call__(self, func):
- def wrapper(*args, **kwargs):
- if self.ignore_class:
- key = func # Unique key only by function and class object
- elif self.ignore_args:
- key = (func, args[0]) # Unique key only by function and class object
- else:
- key = (func, tuple(args), str(kwargs)) # Unique key for function including parameters
- if key in self.threads: # Thread already running (if using blocking mode)
- if self.queue:
- self.queued = True
- thread = self.threads[key]
- if self.blocking:
- thread.join() # Blocking until its finished
- if self.queued:
- self.queued = False
- return wrapper(*args, **kwargs) # Run again after the end
- else:
- return thread.value # Return the value
- else: # No blocking
- if thread.ready(): # Its finished, create a new
- thread = gevent.spawn(func, *args, **kwargs)
- self.threads[key] = thread
- return thread
- else: # Still running
- return thread
- else: # Thread not running
- thread = gevent.spawn(func, *args, **kwargs) # Spawning new thread
- thread.link(lambda thread: self.cleanup(key, thread))
- self.threads[key] = thread
- if self.blocking: # Wait for finish
- thread.join()
- ret = thread.value
- return ret
- else: # No blocking just return the thread
- return thread
- wrapper.func_name = func.func_name
- return wrapper
- # Cleanup finished threads
- def cleanup(self, key, thread):
- if key in self.threads:
- del(self.threads[key])
- if __name__ == "__main__":
- class Test():
- @Noparallel()
- def count(self, num=5):
- for i in range(num):
- print self, i
- time.sleep(1)
- return "%s return:%s" % (self, i)
- class TestNoblock():
- @Noparallel(blocking=False)
- def count(self, num=5):
- for i in range(num):
- print self, i
- time.sleep(1)
- return "%s return:%s" % (self, i)
- def testBlocking():
- test = Test()
- test2 = Test()
- print "Counting..."
- print "Creating class1/thread1"
- thread1 = gevent.spawn(test.count)
- print "Creating class1/thread2 (ignored)"
- thread2 = gevent.spawn(test.count)
- print "Creating class2/thread3"
- thread3 = gevent.spawn(test2.count)
- print "Joining class1/thread1"
- thread1.join()
- print "Joining class1/thread2"
- thread2.join()
- print "Joining class2/thread3"
- thread3.join()
- print "Creating class1/thread4 (its finished, allowed again)"
- thread4 = gevent.spawn(test.count)
- print "Joining thread4"
- thread4.join()
- print thread1.value, thread2.value, thread3.value, thread4.value
- print "Done."
- def testNoblocking():
- test = TestNoblock()
- test2 = TestNoblock()
- print "Creating class1/thread1"
- thread1 = test.count()
- print "Creating class1/thread2 (ignored)"
- thread2 = test.count()
- print "Creating class2/thread3"
- thread3 = test2.count()
- print "Joining class1/thread1"
- thread1.join()
- print "Joining class1/thread2"
- thread2.join()
- print "Joining class2/thread3"
- thread3.join()
- print "Creating class1/thread4 (its finished, allowed again)"
- thread4 = test.count()
- print "Joining thread4"
- thread4.join()
- print thread1.value, thread2.value, thread3.value, thread4.value
- print "Done."
- def testBenchmark():
- import time
- def printThreadNum():
- import gc
- from greenlet import greenlet
- objs = [obj for obj in gc.get_objects() if isinstance(obj, greenlet)]
- print "Greenlets: %s" % len(objs)
- printThreadNum()
- test = TestNoblock()
- s = time.time()
- for i in range(3):
- gevent.spawn(test.count, i + 1)
- print "Created in %.3fs" % (time.time() - s)
- printThreadNum()
- time.sleep(5)
- from gevent import monkey
- monkey.patch_all()
- testBenchmark()
- print "Testing blocking mode..."
- testBlocking()
- print "Testing noblocking mode..."
- testNoblocking()
|