shared_timed_stack.rb 2.1 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. # frozen_string_literal: true
  2. class ConnectionPool::SharedTimedStack
  3. def initialize(max = 0, &block)
  4. @create_block = block
  5. @max = max
  6. @created = 0
  7. @queue = []
  8. @tagged_queue = Hash.new { |hash, key| hash[key] = [] }
  9. @mutex = Mutex.new
  10. @resource = ConditionVariable.new
  11. end
  12. def push(connection)
  13. @mutex.synchronize do
  14. store_connection(connection)
  15. @resource.broadcast
  16. end
  17. end
  18. alias << push
  19. def pop(preferred_tag, timeout = 5.0)
  20. deadline = current_time + timeout
  21. @mutex.synchronize do
  22. loop do
  23. return fetch_preferred_connection(preferred_tag) unless @tagged_queue[preferred_tag].empty?
  24. connection = try_create(preferred_tag)
  25. return connection if connection
  26. to_wait = deadline - current_time
  27. raise Timeout::Error, "Waited #{timeout} sec" if to_wait <= 0
  28. @resource.wait(@mutex, to_wait)
  29. end
  30. end
  31. end
  32. def empty?
  33. size.zero?
  34. end
  35. def size
  36. @mutex.synchronize do
  37. @queue.size
  38. end
  39. end
  40. def flush
  41. @mutex.synchronize do
  42. @queue.delete_if do |connection|
  43. delete = !connection.in_use && (connection.dead || connection.seconds_idle >= RequestPool::MAX_IDLE_TIME)
  44. if delete
  45. @tagged_queue[connection.site].delete(connection)
  46. connection.close
  47. @created -= 1
  48. end
  49. delete
  50. end
  51. end
  52. end
  53. private
  54. def try_create(preferred_tag)
  55. if @created == @max && !@queue.empty?
  56. throw_away_connection = @queue.pop
  57. @tagged_queue[throw_away_connection.site].delete(throw_away_connection)
  58. @create_block.call(preferred_tag)
  59. elsif @created != @max
  60. connection = @create_block.call(preferred_tag)
  61. @created += 1
  62. connection
  63. end
  64. end
  65. def fetch_preferred_connection(preferred_tag)
  66. connection = @tagged_queue[preferred_tag].pop
  67. @queue.delete(connection)
  68. connection
  69. end
  70. def current_time
  71. Process.clock_gettime(Process::CLOCK_MONOTONIC)
  72. end
  73. def store_connection(connection)
  74. @tagged_queue[connection.site].push(connection)
  75. @queue.push(connection)
  76. end
  77. end