123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131 |
- # frozen_string_literal: true
- class Scheduler::AccountsStatusesCleanupScheduler
- include Sidekiq::Worker
- include Redisable
- # This limit is mostly to be nice to the fediverse at large and not
- # generate too much traffic.
- # This also helps limiting the running time of the scheduler itself.
- MAX_BUDGET = 300
- # This is an attempt to spread the load across remote servers, as
- # spreading deletions across diverse accounts is likely to spread
- # the deletion across diverse followers. It also helps each individual
- # user see some effect sooner.
- PER_ACCOUNT_BUDGET = 5
- # This is an attempt to limit the workload generated by status removal
- # jobs to something the particular server can handle.
- PER_THREAD_BUDGET = 5
- # These are latency limits on various queues above which a server is
- # considered to be under load, causing the auto-deletion to be entirely
- # skipped for that run.
- LOAD_LATENCY_THRESHOLDS = {
- default: 5,
- push: 10,
- # The `pull` queue has lower priority jobs, and it's unlikely that
- # pushing deletes would cause much issues with this queue if it didn't
- # cause issues with `default` and `push`. Yet, do not enqueue deletes
- # if the instance is lagging behind too much.
- pull: 5.minutes.to_i,
- }.freeze
- sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i
- def perform
- return if under_load?
- budget = compute_budget
- # If the budget allows it, we want to consider all accounts with enabled
- # auto cleanup at least once.
- #
- # We start from `first_policy_id` (the last processed id in the previous
- # run) and process each policy until we loop to `first_policy_id`,
- # recording into `affected_policies` any policy that caused posts to be
- # deleted.
- #
- # After that, we set `full_iteration` to `false` and continue looping on
- # policies from `affected_policies`.
- first_policy_id = last_processed_id || 0
- first_iteration = true
- full_iteration = true
- affected_policies = []
- loop do
- num_processed_accounts = 0
- scope = cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
- scope.find_each(order: :asc) do |policy|
- num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
- budget -= num_deleted
- unless num_deleted.zero?
- num_processed_accounts += 1
- affected_policies << policy.id if full_iteration
- end
- full_iteration = false if !first_iteration && policy.id >= first_policy_id
- if budget.zero?
- save_last_processed_id(policy.id)
- break
- end
- end
- # The idea here is to loop through all policies at least once until the budget is exhausted
- # and start back after the last processed account otherwise
- break if budget.zero? || (num_processed_accounts.zero? && !full_iteration)
- full_iteration = false unless first_iteration
- first_iteration = false
- end
- end
- def compute_budget
- # Each post deletion is a `RemovalWorker` job (on `default` queue), each
- # potentially spawning many `ActivityPub::DeliveryWorker` jobs (on the `push` queue).
- threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.pluck('concurrency').sum
- [PER_THREAD_BUDGET * threads, MAX_BUDGET].min
- end
- def under_load?
- LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) }
- end
- private
- def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
- scope = AccountStatusesCleanupPolicy.where(enabled: true)
- if full_iteration
- # If we are doing a full iteration, examine all policies we have not examined yet
- if first_iteration
- scope.where(id: first_policy_id...)
- else
- scope.where(id: ..first_policy_id).or(scope.where(id: affected_policies))
- end
- else
- # Otherwise, examine only policies that previously yielded posts to delete
- scope.where(id: affected_policies)
- end
- end
- def queue_under_load?(name, max_latency)
- Sidekiq::Queue.new(name).latency > max_latency
- end
- def last_processed_id
- redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i
- end
- def save_last_processed_id(id)
- if id.nil?
- redis.del('account_statuses_cleanup_scheduler:last_policy_id')
- else
- redis.set('account_statuses_cleanup_scheduler:last_policy_id', id, ex: 1.hour.seconds)
- end
- end
- end
|