123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- # frozen_string_literal: true
- require_relative './connection_pool/shared_connection_pool'
- class RequestPool
- def self.current
- @current ||= RequestPool.new
- end
- class Reaper
- attr_reader :pool, :frequency
- def initialize(pool, frequency)
- @pool = pool
- @frequency = frequency
- end
- def run
- return unless frequency&.positive?
- Thread.new(frequency, pool) do |t, p|
- loop do
- sleep t
- p.flush
- end
- end
- end
- end
- MAX_IDLE_TIME = 30
- WAIT_TIMEOUT = 5
- MAX_POOL_SIZE = ENV.fetch('MAX_REQUEST_POOL_SIZE', 512).to_i
- class Connection
- attr_reader :site, :last_used_at, :created_at, :in_use, :dead, :fresh
- def initialize(site)
- @site = site
- @http_client = http_client
- @last_used_at = nil
- @created_at = current_time
- @dead = false
- @fresh = true
- end
- def use
- @last_used_at = current_time
- @in_use = true
- retries = 0
- begin
- yield @http_client
- rescue HTTP::ConnectionError
- # It's possible the connection was closed, so let's
- # try re-opening it once
- close
- if @fresh || retries.positive?
- raise
- else
- @http_client = http_client
- retries += 1
- retry
- end
- rescue StandardError
- # If this connection raises errors of any kind, it's
- # better if it gets reaped as soon as possible
- close
- @dead = true
- raise
- end
- ensure
- @fresh = false
- @in_use = false
- end
- def seconds_idle
- current_time - (@last_used_at || @created_at)
- end
- def close
- @http_client.close
- end
- private
- def http_client
- Request.http_client.persistent(@site, timeout: MAX_IDLE_TIME)
- end
- def current_time
- Process.clock_gettime(Process::CLOCK_MONOTONIC)
- end
- end
- def initialize
- @pool = ConnectionPool::SharedConnectionPool.new(size: MAX_POOL_SIZE, timeout: WAIT_TIMEOUT) { |site| Connection.new(site) }
- @reaper = Reaper.new(self, 30)
- @reaper.run
- end
- def with(site, &block)
- @pool.with(site) do |connection|
- ActiveSupport::Notifications.instrument('with.request_pool', miss: connection.fresh, host: connection.site) do
- connection.use(&block)
- end
- end
- end
- delegate :size, :flush, to: :@pool
- end
|