wheel_timer.py 2.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192
  1. # -*- coding: utf-8 -*-
  2. # Copyright 2016 OpenMarket Ltd
  3. #
  4. # Licensed under the Apache License, Version 2.0 (the "License");
  5. # you may not use this file except in compliance with the License.
  6. # You may obtain a copy of the License at
  7. #
  8. # http://www.apache.org/licenses/LICENSE-2.0
  9. #
  10. # Unless required by applicable law or agreed to in writing, software
  11. # distributed under the License is distributed on an "AS IS" BASIS,
  12. # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13. # See the License for the specific language governing permissions and
  14. # limitations under the License.
  15. class _Entry(object):
  16. __slots__ = ["end_key", "queue"]
  17. def __init__(self, end_key):
  18. self.end_key = end_key
  19. self.queue = []
  20. class WheelTimer(object):
  21. """Stores arbitrary objects that will be returned after their timers have
  22. expired.
  23. """
  24. def __init__(self, bucket_size=5000):
  25. """
  26. Args:
  27. bucket_size (int): Size of buckets in ms. Corresponds roughly to the
  28. accuracy of the timer.
  29. """
  30. self.bucket_size = bucket_size
  31. self.entries = []
  32. self.current_tick = 0
  33. def insert(self, now, obj, then):
  34. """Inserts object into timer.
  35. Args:
  36. now (int): Current time in msec
  37. obj (object): Object to be inserted
  38. then (int): When to return the object strictly after.
  39. """
  40. then_key = int(then / self.bucket_size) + 1
  41. if self.entries:
  42. min_key = self.entries[0].end_key
  43. max_key = self.entries[-1].end_key
  44. if then_key <= max_key:
  45. # The max here is to protect against inserts for times in the past
  46. self.entries[max(min_key, then_key) - min_key].queue.append(obj)
  47. return
  48. next_key = int(now / self.bucket_size) + 1
  49. if self.entries:
  50. last_key = self.entries[-1].end_key
  51. else:
  52. last_key = next_key
  53. # Handle the case when `then` is in the past and `entries` is empty.
  54. then_key = max(last_key, then_key)
  55. # Add empty entries between the end of the current list and when we want
  56. # to insert. This ensures there are no gaps.
  57. self.entries.extend(_Entry(key) for key in range(last_key, then_key + 1))
  58. self.entries[-1].queue.append(obj)
  59. def fetch(self, now):
  60. """Fetch any objects that have timed out
  61. Args:
  62. now (ms): Current time in msec
  63. Returns:
  64. list: List of objects that have timed out
  65. """
  66. now_key = int(now / self.bucket_size)
  67. ret = []
  68. while self.entries and self.entries[0].end_key <= now_key:
  69. ret.extend(self.entries.pop(0).queue)
  70. return ret
  71. def __len__(self):
  72. return sum(len(entry.queue) for entry in self.entries)