1
0

batched_remove_status_service.rb 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # frozen_string_literal: true
  2. class BatchedRemoveStatusService < BaseService
  3. include StreamEntryRenderer
  4. include Redisable
  5. # Delete given statuses and reblogs of them
  6. # Dispatch PuSH updates of the deleted statuses, but only local ones
  7. # Dispatch Salmon deletes, unique per domain, of the deleted statuses, but only local ones
  8. # Remove statuses from home feeds
  9. # Push delete events to streaming API for home feeds and public feeds
  10. # @param [Status] statuses A preferably batched array of statuses
  11. # @param [Hash] options
  12. # @option [Boolean] :skip_side_effects
  13. def call(statuses, **options)
  14. statuses = Status.where(id: statuses.map(&:id)).includes(:account, :stream_entry).flat_map { |status| [status] + status.reblogs.includes(:account, :stream_entry).to_a }
  15. @mentions = statuses.each_with_object({}) { |s, h| h[s.id] = s.active_mentions.includes(:account).to_a }
  16. @tags = statuses.each_with_object({}) { |s, h| h[s.id] = s.tags.pluck(:name) }
  17. @stream_entry_batches = []
  18. @salmon_batches = []
  19. @json_payloads = statuses.each_with_object({}) { |s, h| h[s.id] = Oj.dump(event: :delete, payload: s.id.to_s) }
  20. @activity_xml = {}
  21. # Ensure that rendered XML reflects destroyed state
  22. statuses.each do |status|
  23. status.mark_for_mass_destruction!
  24. status.destroy
  25. end
  26. return if options[:skip_side_effects]
  27. # Batch by source account
  28. statuses.group_by(&:account_id).each_value do |account_statuses|
  29. account = account_statuses.first.account
  30. next unless account
  31. unpush_from_home_timelines(account, account_statuses)
  32. unpush_from_list_timelines(account, account_statuses)
  33. batch_stream_entries(account, account_statuses) if account.local?
  34. end
  35. # Cannot be batched
  36. statuses.each do |status|
  37. unpush_from_public_timelines(status)
  38. batch_salmon_slaps(status) if status.local?
  39. end
  40. Pubsubhubbub::RawDistributionWorker.push_bulk(@stream_entry_batches) { |batch| batch }
  41. NotificationWorker.push_bulk(@salmon_batches) { |batch| batch }
  42. end
  43. private
  44. def batch_stream_entries(account, statuses)
  45. statuses.each do |status|
  46. @stream_entry_batches << [build_xml(status.stream_entry), account.id]
  47. end
  48. end
  49. def unpush_from_home_timelines(account, statuses)
  50. recipients = account.followers_for_local_distribution.to_a
  51. recipients << account if account.local?
  52. recipients.each do |follower|
  53. statuses.each do |status|
  54. FeedManager.instance.unpush_from_home(follower, status)
  55. end
  56. end
  57. end
  58. def unpush_from_list_timelines(account, statuses)
  59. account.lists_for_local_distribution.select(:id, :account_id).each do |list|
  60. statuses.each do |status|
  61. FeedManager.instance.unpush_from_list(list, status)
  62. end
  63. end
  64. end
  65. def unpush_from_public_timelines(status)
  66. return unless status.public_visibility?
  67. payload = @json_payloads[status.id]
  68. redis.pipelined do
  69. redis.publish('timeline:public', payload)
  70. redis.publish('timeline:public:local', payload) if status.local?
  71. if status.media_attachments.any?
  72. redis.publish('timeline:public:media', payload)
  73. redis.publish('timeline:public:local:media', payload) if status.local?
  74. end
  75. @tags[status.id].each do |hashtag|
  76. redis.publish("timeline:hashtag:#{hashtag}", payload)
  77. redis.publish("timeline:hashtag:#{hashtag}:local", payload) if status.local?
  78. end
  79. end
  80. end
  81. def batch_salmon_slaps(status)
  82. return if @mentions[status.id].empty?
  83. recipients = @mentions[status.id].map(&:account).reject(&:local?).select(&:ostatus?).uniq(&:domain).map(&:id)
  84. recipients.each do |recipient_id|
  85. @salmon_batches << [build_xml(status.stream_entry), status.account_id, recipient_id]
  86. end
  87. end
  88. def build_xml(stream_entry)
  89. return @activity_xml[stream_entry.id] if @activity_xml.key?(stream_entry.id)
  90. @activity_xml[stream_entry.id] = stream_entry_to_xml(stream_entry)
  91. end
  92. end