TestRateLimit.py 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. import time
  2. import gevent
  3. from gevent import monkey
  4. monkey.patch_all()
  5. from util import RateLimit
  6. # Time is around limit +/- 0.05 sec
  7. def around(t, limit):
  8. return t >= limit - 0.05 and t <= limit + 0.05
  9. class ExampleClass(object):
  10. def __init__(self):
  11. self.counted = 0
  12. self.last_called = None
  13. def count(self, back="counted"):
  14. self.counted += 1
  15. self.last_called = back
  16. return back
  17. class TestRateLimit:
  18. def testCall(self):
  19. obj1 = ExampleClass()
  20. obj2 = ExampleClass()
  21. s = time.time()
  22. assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted"
  23. assert around(time.time() - s, 0.0) # First allow to call instantly
  24. assert obj1.counted == 1
  25. # Call again
  26. assert not RateLimit.isAllowed("counting", 0.1)
  27. assert RateLimit.isAllowed("something else", 0.1)
  28. assert RateLimit.call("counting", allowed_again=0.1, func=obj1.count) == "counted"
  29. assert around(time.time() - s, 0.1) # Delays second call within interval
  30. assert obj1.counted == 2
  31. # Call 3 times async
  32. s = time.time()
  33. assert obj2.counted == 0
  34. threads = [
  35. gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # Instant
  36. gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)), # 0.1s delay
  37. gevent.spawn(lambda: RateLimit.call("counting", allowed_again=0.1, func=obj2.count)) # 0.2s delay
  38. ]
  39. gevent.joinall(threads)
  40. assert [thread.value for thread in threads] == ["counted", "counted", "counted"]
  41. assert around(time.time() - s, 0.2)
  42. # No queue = instant again
  43. s = time.time()
  44. assert RateLimit.isAllowed("counting", 0.1)
  45. assert RateLimit.call("counting", allowed_again=0.1, func=obj2.count) == "counted"
  46. assert around(time.time() - s, 0.0)
  47. assert obj2.counted == 4
  48. def testCallAsync(self):
  49. obj1 = ExampleClass()
  50. obj2 = ExampleClass()
  51. s = time.time()
  52. RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #1").join()
  53. assert obj1.counted == 1 # First instant
  54. assert around(time.time() - s, 0.0)
  55. # After that the calls delayed
  56. s = time.time()
  57. t1 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #2") # Dumped by the next call
  58. time.sleep(0.03)
  59. t2 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #3") # Dumped by the next call
  60. time.sleep(0.03)
  61. t3 = RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #4") # Will be called
  62. assert obj1.counted == 1 # Delay still in progress: Not called yet
  63. t3.join()
  64. assert t3.value == "call #4"
  65. assert around(time.time() - s, 0.1)
  66. # Only the last one called
  67. assert obj1.counted == 2
  68. assert obj1.last_called == "call #4"
  69. # Allowed again instantly
  70. assert RateLimit.isAllowed("counting async", 0.1)
  71. s = time.time()
  72. RateLimit.callAsync("counting async", allowed_again=0.1, func=obj1.count, back="call #5").join()
  73. assert obj1.counted == 3
  74. assert around(time.time() - s, 0.0)
  75. assert not RateLimit.isAllowed("counting async", 0.1)
  76. time.sleep(0.11)
  77. assert RateLimit.isAllowed("counting async", 0.1)