statuses.rb 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138
  1. # frozen_string_literal: true
  2. class Trends::Statuses < Trends::Base
  3. PREFIX = 'trending_statuses'
  4. BATCH_SIZE = 100
  5. self.default_options = {
  6. threshold: 5,
  7. review_threshold: 3,
  8. score_halflife: 1.hour.freeze,
  9. decay_threshold: 0.3,
  10. }
  11. class Query < Trends::Query
  12. def filtered_for!(account)
  13. @account = account
  14. self
  15. end
  16. def filtered_for(account)
  17. clone.filtered_for!(account)
  18. end
  19. def to_arel
  20. scope = Status.joins(:trend).reorder(score: :desc)
  21. scope = scope.reorder(language_order_clause.desc, score: :desc) if preferred_languages.present?
  22. scope = scope.merge(StatusTrend.allowed) if @allowed
  23. scope = scope.not_excluded_by_account(@account).not_domain_blocked_by_account(@account) if @account.present?
  24. scope = scope.offset(@offset) if @offset.present?
  25. scope = scope.limit(@limit) if @limit.present?
  26. scope
  27. end
  28. private
  29. def language_order_clause
  30. Arel::Nodes::Case.new.when(StatusTrend.arel_table[:language].in(preferred_languages)).then(1).else(0)
  31. end
  32. def preferred_languages
  33. if @account&.chosen_languages.present?
  34. @account.chosen_languages
  35. else
  36. @locale
  37. end
  38. end
  39. end
  40. def register(status, at_time = Time.now.utc)
  41. add(status.proper, status.account_id, at_time) if eligible?(status.proper)
  42. end
  43. def add(status, _account_id, at_time = Time.now.utc)
  44. record_used_id(status.id, at_time)
  45. end
  46. def query
  47. Query.new(key_prefix, klass)
  48. end
  49. def refresh(at_time = Time.now.utc)
  50. # First, recalculate scores for statuses that were trending previously. We split the queries
  51. # to avoid having to load all of the IDs into Ruby just to send them back into Postgres
  52. Status.where(id: StatusTrend.select(:status_id)).includes(:status_stat, :account).reorder(nil).find_in_batches(batch_size: BATCH_SIZE) do |statuses|
  53. calculate_scores(statuses, at_time)
  54. end
  55. # Then, calculate scores for statuses that were used today. There are potentially some
  56. # duplicate items here that we might process one more time, but that should be fine
  57. Status.where(id: recently_used_ids(at_time)).includes(:status_stat, :account).reorder(nil).find_in_batches(batch_size: BATCH_SIZE) do |statuses|
  58. calculate_scores(statuses, at_time)
  59. end
  60. # Now that all trends have up-to-date scores, and all the ones below the threshold have
  61. # been removed, we can recalculate their positions
  62. StatusTrend.connection.exec_update('UPDATE status_trends SET rank = t0.calculated_rank FROM (SELECT id, row_number() OVER w AS calculated_rank FROM status_trends WINDOW w AS (PARTITION BY language ORDER BY score DESC)) t0 WHERE status_trends.id = t0.id')
  63. end
  64. def request_review
  65. StatusTrend.pluck('distinct language').flat_map do |language|
  66. score_at_threshold = StatusTrend.where(language: language, allowed: true).order(rank: :desc).where('rank <= ?', options[:review_threshold]).first&.score || 0
  67. status_trends = StatusTrend.where(language: language, allowed: false).joins(:status).includes(status: :account)
  68. status_trends.filter_map do |trend|
  69. status = trend.status
  70. if trend.score > score_at_threshold && !status.trendable? && status.requires_review_notification?
  71. status.account.touch(:requested_review_at)
  72. status
  73. end
  74. end
  75. end
  76. end
  77. protected
  78. def key_prefix
  79. PREFIX
  80. end
  81. def klass
  82. Status
  83. end
  84. private
  85. def eligible?(status)
  86. status.public_visibility? && status.account.discoverable? && !status.account.silenced? && !status.account.sensitized? && status.spoiler_text.blank? && !status.sensitive? && !status.reply? && valid_locale?(status.language)
  87. end
  88. def calculate_scores(statuses, at_time)
  89. items = statuses.map do |status|
  90. expected = 1.0
  91. observed = (status.reblogs_count + status.favourites_count).to_f
  92. score = if expected > observed || observed < options[:threshold]
  93. 0
  94. else
  95. ((observed - expected)**2) / expected
  96. end
  97. decaying_score = if score.zero? || !eligible?(status)
  98. 0
  99. else
  100. score * (0.5**((at_time.to_f - status.created_at.to_f) / options[:score_halflife].to_f))
  101. end
  102. [decaying_score, status]
  103. end
  104. to_insert = items.filter { |(score, _)| score >= options[:decay_threshold] }
  105. to_delete = items.filter { |(score, _)| score < options[:decay_threshold] }
  106. StatusTrend.upsert_all(to_insert.map { |(score, status)| { status_id: status.id, account_id: status.account_id, score: score, language: status.language, allowed: status.trendable? || false } }, unique_by: :status_id) if to_insert.any?
  107. StatusTrend.where(status_id: to_delete.map { |(_, status)| status.id }).delete_all if to_delete.any?
  108. end
  109. end