bulk_import_service.rb 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. # frozen_string_literal: true
  2. class BulkImportService < BaseService
  3. def call(import)
  4. @import = import
  5. @account = @import.account
  6. case @import.type.to_sym
  7. when :following
  8. import_follows!
  9. when :blocking
  10. import_blocks!
  11. when :muting
  12. import_mutes!
  13. when :domain_blocking
  14. import_domain_blocks!
  15. when :bookmarks
  16. import_bookmarks!
  17. when :lists
  18. import_lists!
  19. end
  20. @import.update!(state: :finished, finished_at: Time.now.utc) if @import.processed_items == @import.total_items
  21. rescue
  22. @import.update!(state: :finished, finished_at: Time.now.utc)
  23. raise
  24. end
  25. private
  26. def extract_rows_by_acct
  27. local_domain_suffix = "@#{Rails.configuration.x.local_domain}"
  28. @import.rows.to_a.index_by { |row| row.data['acct'].delete_suffix(local_domain_suffix) }
  29. end
  30. def import_follows!
  31. rows_by_acct = extract_rows_by_acct
  32. if @import.overwrite?
  33. @account.following.reorder(nil).find_each do |followee|
  34. row = rows_by_acct.delete(followee.acct)
  35. if row.nil?
  36. UnfollowService.new.call(@account, followee)
  37. else
  38. row.destroy
  39. @import.processed_items += 1
  40. @import.imported_items += 1
  41. # Since we're updating the settings of an existing relationship, we can safely call
  42. # FollowService directly
  43. FollowService.new.call(@account, followee, reblogs: row.data['show_reblogs'], notify: row.data['notify'], languages: row.data['languages'])
  44. end
  45. end
  46. # Save pending infos due to `overwrite?` handling
  47. @import.save!
  48. end
  49. Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
  50. [row.id]
  51. end
  52. end
  53. def import_blocks!
  54. rows_by_acct = extract_rows_by_acct
  55. if @import.overwrite?
  56. @account.blocking.reorder(nil).find_each do |blocked_account|
  57. row = rows_by_acct.delete(blocked_account.acct)
  58. if row.nil?
  59. UnblockService.new.call(@account, blocked_account)
  60. else
  61. row.destroy
  62. @import.processed_items += 1
  63. @import.imported_items += 1
  64. BlockService.new.call(@account, blocked_account)
  65. end
  66. end
  67. # Save pending infos due to `overwrite?` handling
  68. @import.save!
  69. end
  70. Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
  71. [row.id]
  72. end
  73. end
  74. def import_mutes!
  75. rows_by_acct = extract_rows_by_acct
  76. if @import.overwrite?
  77. @account.muting.reorder(nil).find_each do |muted_account|
  78. row = rows_by_acct.delete(muted_account.acct)
  79. if row.nil?
  80. UnmuteService.new.call(@account, muted_account)
  81. else
  82. row.destroy
  83. @import.processed_items += 1
  84. @import.imported_items += 1
  85. MuteService.new.call(@account, muted_account, notifications: row.data['hide_notifications'])
  86. end
  87. end
  88. # Save pending infos due to `overwrite?` handling
  89. @import.save!
  90. end
  91. Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
  92. [row.id]
  93. end
  94. end
  95. def import_domain_blocks!
  96. domains = @import.rows.map { |row| row.data['domain'] }
  97. if @import.overwrite?
  98. @account.domain_blocks.find_each do |domain_block|
  99. domain = domains.delete(domain_block)
  100. @account.unblock_domain!(domain_block.domain) if domain.nil?
  101. end
  102. end
  103. @import.rows.delete_all
  104. domains.each { |domain| @account.block_domain!(domain) }
  105. @import.update!(processed_items: @import.total_items, imported_items: @import.total_items)
  106. AfterAccountDomainBlockWorker.push_bulk(domains) do |domain|
  107. [@account.id, domain]
  108. end
  109. end
  110. def import_bookmarks!
  111. rows_by_uri = @import.rows.index_by { |row| row.data['uri'] }
  112. if @import.overwrite?
  113. @account.bookmarks.includes(:status).find_each do |bookmark|
  114. row = rows_by_uri.delete(ActivityPub::TagManager.instance.uri_for(bookmark.status))
  115. if row.nil?
  116. bookmark.destroy!
  117. else
  118. row.destroy
  119. @import.processed_items += 1
  120. @import.imported_items += 1
  121. end
  122. end
  123. # Save pending infos due to `overwrite?` handling
  124. @import.save!
  125. end
  126. Import::RowWorker.push_bulk(rows_by_uri.values) do |row|
  127. [row.id]
  128. end
  129. end
  130. def import_lists!
  131. rows = @import.rows.to_a
  132. included_lists = rows.map { |row| row.data['list_name'] }.uniq
  133. if @import.overwrite?
  134. @account.owned_lists.where.not(title: included_lists).destroy_all
  135. # As list membership changes do not retroactively change timeline
  136. # contents, simplify things by just clearing everything
  137. @account.owned_lists.find_each do |list|
  138. list.list_accounts.destroy_all
  139. end
  140. end
  141. included_lists.each do |title|
  142. @account.owned_lists.find_or_create_by!(title: title)
  143. end
  144. Import::RowWorker.push_bulk(rows) do |row|
  145. [row.id]
  146. end
  147. end
  148. end