create.rb 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. # frozen_string_literal: true
  2. class ActivityPub::Activity::Create < ActivityPub::Activity
  3. include FormattingHelper
  4. def perform
  5. @account.schedule_refresh_if_stale!
  6. dereference_object!
  7. create_status
  8. end
  9. private
  10. def create_status
  11. return reject_payload! if unsupported_object_type? || non_matching_uri_hosts?(@account.uri, object_uri) || tombstone_exists? || !related_to_local_activity?
  12. with_redis_lock("create:#{object_uri}") do
  13. return if delete_arrived_first?(object_uri) || poll_vote?
  14. @status = find_existing_status
  15. if @status.nil?
  16. process_status
  17. elsif @options[:delivered_to_account_id].present?
  18. postprocess_audience_and_deliver
  19. end
  20. end
  21. @status
  22. end
  23. def audience_to
  24. as_array(@object['to'] || @json['to']).map { |x| value_or_id(x) }
  25. end
  26. def audience_cc
  27. as_array(@object['cc'] || @json['cc']).map { |x| value_or_id(x) }
  28. end
  29. def process_status
  30. @tags = []
  31. @mentions = []
  32. @unresolved_mentions = []
  33. @silenced_account_ids = []
  34. @params = {}
  35. process_status_params
  36. process_tags
  37. process_audience
  38. ApplicationRecord.transaction do
  39. @status = Status.create!(@params)
  40. attach_tags(@status)
  41. attach_counts(@status)
  42. end
  43. resolve_thread(@status)
  44. resolve_unresolved_mentions(@status)
  45. fetch_replies(@status)
  46. distribute
  47. forward_for_reply
  48. end
  49. def distribute
  50. # Spread out crawling randomly to avoid DDoSing the link
  51. LinkCrawlWorker.perform_in(rand(1..59).seconds, @status.id)
  52. # Distribute into home and list feeds and notify mentioned accounts
  53. ::DistributionWorker.perform_async(@status.id, { 'silenced_account_ids' => @silenced_account_ids }) if @options[:override_timestamps] || @status.within_realtime_window?
  54. end
  55. def find_existing_status
  56. status = status_from_uri(object_uri)
  57. status ||= Status.find_by(uri: @object['atomUri']) if @object['atomUri'].present?
  58. status if status&.account_id == @account.id
  59. end
  60. def process_status_params
  61. @status_parser = ActivityPub::Parser::StatusParser.new(@json, followers_collection: @account.followers_url, object: @object)
  62. attachment_ids = process_attachments.take(Status::MEDIA_ATTACHMENTS_LIMIT).map(&:id)
  63. @params = {
  64. uri: @status_parser.uri,
  65. url: @status_parser.url || @status_parser.uri,
  66. account: @account,
  67. text: converted_object_type? ? converted_text : (@status_parser.text || ''),
  68. language: @status_parser.language,
  69. spoiler_text: converted_object_type? ? '' : (@status_parser.spoiler_text || ''),
  70. created_at: @status_parser.created_at,
  71. edited_at: @status_parser.edited_at && @status_parser.edited_at != @status_parser.created_at ? @status_parser.edited_at : nil,
  72. override_timestamps: @options[:override_timestamps],
  73. reply: @status_parser.reply,
  74. sensitive: @account.sensitized? || @status_parser.sensitive || false,
  75. visibility: @status_parser.visibility,
  76. thread: replied_to_status,
  77. conversation: conversation_from_uri(@object['conversation']),
  78. media_attachment_ids: attachment_ids,
  79. ordered_media_attachment_ids: attachment_ids,
  80. poll: process_poll,
  81. }
  82. end
  83. def process_audience
  84. # Unlike with tags, there is no point in resolving accounts we don't already
  85. # know here, because silent mentions would only be used for local access control anyway
  86. accounts_in_audience = (audience_to + audience_cc).uniq.filter_map do |audience|
  87. account_from_uri(audience) unless ActivityPub::TagManager.instance.public_collection?(audience)
  88. end
  89. # If the payload was delivered to a specific inbox, the inbox owner must have
  90. # access to it, unless they already have access to it anyway
  91. if @options[:delivered_to_account_id]
  92. accounts_in_audience << delivered_to_account
  93. accounts_in_audience.uniq!
  94. end
  95. accounts_in_audience.each do |account|
  96. # This runs after tags are processed, and those translate into non-silent
  97. # mentions, which take precedence
  98. next if @mentions.any? { |mention| mention.account_id == account.id }
  99. @mentions << Mention.new(account: account, silent: true)
  100. # If there is at least one silent mention, then the status can be considered
  101. # as a limited-audience status, and not strictly a direct message, but only
  102. # if we considered a direct message in the first place
  103. @params[:visibility] = :limited if @params[:visibility] == :direct
  104. end
  105. # Accounts that are tagged but are not in the audience are not
  106. # supposed to be notified explicitly
  107. @silenced_account_ids = @mentions.map(&:account_id) - accounts_in_audience.map(&:id)
  108. end
  109. def postprocess_audience_and_deliver
  110. return if @status.mentions.find_by(account_id: @options[:delivered_to_account_id])
  111. @status.mentions.create(account: delivered_to_account, silent: true)
  112. @status.update(visibility: :limited) if @status.direct_visibility?
  113. return unless delivered_to_account.following?(@account)
  114. FeedInsertWorker.perform_async(@status.id, delivered_to_account.id, 'home')
  115. end
  116. def delivered_to_account
  117. @delivered_to_account ||= Account.find(@options[:delivered_to_account_id])
  118. end
  119. def attach_tags(status)
  120. @tags.each do |tag|
  121. status.tags << tag
  122. tag.update(last_status_at: status.created_at) if tag.last_status_at.nil? || (tag.last_status_at < status.created_at && tag.last_status_at < 12.hours.ago)
  123. end
  124. # If we're processing an old status, this may register tags as being used now
  125. # as opposed to when the status was really published, but this is probably
  126. # not a big deal
  127. Trends.tags.register(status)
  128. @mentions.each do |mention|
  129. mention.status = status
  130. mention.save
  131. end
  132. end
  133. def attach_counts(status)
  134. likes = @status_parser.favourites_count
  135. shares = @status_parser.reblogs_count
  136. return if likes.nil? && shares.nil?
  137. status.status_stat.tap do |status_stat|
  138. status_stat.untrusted_reblogs_count = shares unless shares.nil?
  139. status_stat.untrusted_favourites_count = likes unless likes.nil?
  140. status_stat.save if status_stat.changed?
  141. end
  142. end
  143. def process_tags
  144. return if @object['tag'].nil?
  145. as_array(@object['tag']).each do |tag|
  146. if equals_or_includes?(tag['type'], 'Hashtag')
  147. process_hashtag tag
  148. elsif equals_or_includes?(tag['type'], 'Mention')
  149. process_mention tag
  150. elsif equals_or_includes?(tag['type'], 'Emoji')
  151. process_emoji tag
  152. end
  153. end
  154. end
  155. def process_hashtag(tag)
  156. return if tag['name'].blank?
  157. Tag.find_or_create_by_names(tag['name']) do |hashtag|
  158. @tags << hashtag unless @tags.include?(hashtag) || !hashtag.valid?
  159. end
  160. rescue ActiveRecord::RecordInvalid
  161. nil
  162. end
  163. def process_mention(tag)
  164. return if tag['href'].blank?
  165. account = account_from_uri(tag['href'])
  166. account = ActivityPub::FetchRemoteAccountService.new.call(tag['href'], request_id: @options[:request_id]) if account.nil?
  167. return if account.nil?
  168. @mentions << Mention.new(account: account, silent: false)
  169. rescue Mastodon::UnexpectedResponseError, *Mastodon::HTTP_CONNECTION_ERRORS
  170. @unresolved_mentions << tag['href']
  171. end
  172. def process_emoji(tag)
  173. return if skip_download?
  174. custom_emoji_parser = ActivityPub::Parser::CustomEmojiParser.new(tag)
  175. return if custom_emoji_parser.shortcode.blank? || custom_emoji_parser.image_remote_url.blank?
  176. emoji = CustomEmoji.find_by(shortcode: custom_emoji_parser.shortcode, domain: @account.domain)
  177. return unless emoji.nil? || custom_emoji_parser.image_remote_url != emoji.image_remote_url || (custom_emoji_parser.updated_at && custom_emoji_parser.updated_at >= emoji.updated_at)
  178. begin
  179. emoji ||= CustomEmoji.new(domain: @account.domain, shortcode: custom_emoji_parser.shortcode, uri: custom_emoji_parser.uri)
  180. emoji.image_remote_url = custom_emoji_parser.image_remote_url
  181. emoji.save
  182. rescue Seahorse::Client::NetworkingError => e
  183. Rails.logger.warn "Error storing emoji: #{e}"
  184. end
  185. end
  186. def process_attachments
  187. return [] if @object['attachment'].nil?
  188. media_attachments = []
  189. as_array(@object['attachment']).each do |attachment|
  190. media_attachment_parser = ActivityPub::Parser::MediaAttachmentParser.new(attachment)
  191. next if media_attachment_parser.remote_url.blank? || media_attachments.size >= Status::MEDIA_ATTACHMENTS_LIMIT
  192. begin
  193. media_attachment = MediaAttachment.create(
  194. account: @account,
  195. remote_url: media_attachment_parser.remote_url,
  196. thumbnail_remote_url: media_attachment_parser.thumbnail_remote_url,
  197. description: media_attachment_parser.description,
  198. focus: media_attachment_parser.focus,
  199. blurhash: media_attachment_parser.blurhash
  200. )
  201. media_attachments << media_attachment
  202. next if unsupported_media_type?(media_attachment_parser.file_content_type) || skip_download?
  203. media_attachment.download_file!
  204. media_attachment.download_thumbnail!
  205. media_attachment.save
  206. rescue Mastodon::UnexpectedResponseError, *Mastodon::HTTP_CONNECTION_ERRORS
  207. RedownloadMediaWorker.perform_in(rand(30..600).seconds, media_attachment.id)
  208. rescue Seahorse::Client::NetworkingError => e
  209. Rails.logger.warn "Error storing media attachment: #{e}"
  210. RedownloadMediaWorker.perform_async(media_attachment.id)
  211. end
  212. end
  213. media_attachments
  214. rescue Addressable::URI::InvalidURIError => e
  215. Rails.logger.debug { "Invalid URL in attachment: #{e}" }
  216. media_attachments
  217. end
  218. def process_poll
  219. poll_parser = ActivityPub::Parser::PollParser.new(@object)
  220. return unless poll_parser.valid?
  221. @account.polls.new(
  222. multiple: poll_parser.multiple,
  223. expires_at: poll_parser.expires_at,
  224. options: poll_parser.options,
  225. cached_tallies: poll_parser.cached_tallies,
  226. voters_count: poll_parser.voters_count
  227. )
  228. end
  229. def poll_vote?
  230. return false if replied_to_status.nil? || replied_to_status.preloadable_poll.nil? || !replied_to_status.local? || !replied_to_status.preloadable_poll.options.include?(@object['name'])
  231. poll_vote! unless replied_to_status.preloadable_poll.expired?
  232. true
  233. end
  234. def poll_vote!
  235. poll = replied_to_status.preloadable_poll
  236. already_voted = true
  237. with_redis_lock("vote:#{replied_to_status.poll_id}:#{@account.id}") do
  238. already_voted = poll.votes.exists?(account: @account)
  239. poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
  240. end
  241. increment_voters_count! unless already_voted
  242. ActivityPub::DistributePollUpdateWorker.perform_in(3.minutes, replied_to_status.id) unless replied_to_status.preloadable_poll.hide_totals?
  243. end
  244. def resolve_thread(status)
  245. return unless status.reply? && status.thread.nil? && Request.valid_url?(in_reply_to_uri)
  246. ThreadResolveWorker.perform_async(status.id, in_reply_to_uri, { 'request_id' => @options[:request_id] })
  247. end
  248. def resolve_unresolved_mentions(status)
  249. @unresolved_mentions.uniq.each do |uri|
  250. MentionResolveWorker.perform_in(rand(30...600).seconds, status.id, uri, { 'request_id' => @options[:request_id] })
  251. end
  252. end
  253. def fetch_replies(status)
  254. collection = @object['replies']
  255. return if collection.blank?
  256. replies = ActivityPub::FetchRepliesService.new.call(status, collection, allow_synchronous_requests: false, request_id: @options[:request_id])
  257. return unless replies.nil?
  258. uri = value_or_id(collection)
  259. ActivityPub::FetchRepliesWorker.perform_async(status.id, uri, { 'request_id' => @options[:request_id] }) unless uri.nil?
  260. rescue => e
  261. Rails.logger.warn "Error fetching replies: #{e}"
  262. end
  263. def conversation_from_uri(uri)
  264. return nil if uri.nil?
  265. return Conversation.find_by(id: OStatus::TagManager.instance.unique_tag_to_local_id(uri, 'Conversation')) if OStatus::TagManager.instance.local_id?(uri)
  266. begin
  267. Conversation.find_or_create_by!(uri: uri)
  268. rescue ActiveRecord::RecordInvalid, ActiveRecord::RecordNotUnique
  269. retry
  270. end
  271. end
  272. def replied_to_status
  273. return @replied_to_status if defined?(@replied_to_status)
  274. if in_reply_to_uri.blank?
  275. @replied_to_status = nil
  276. else
  277. @replied_to_status = status_from_uri(in_reply_to_uri)
  278. @replied_to_status ||= status_from_uri(@object['inReplyToAtomUri']) if @object['inReplyToAtomUri'].present?
  279. @replied_to_status
  280. end
  281. end
  282. def in_reply_to_uri
  283. value_or_id(@object['inReplyTo'])
  284. end
  285. def converted_text
  286. [formatted_title, @status_parser.spoiler_text.presence, formatted_url].compact.join("\n\n")
  287. end
  288. def formatted_title
  289. "<h2>#{@status_parser.title}</h2>" if @status_parser.title.present?
  290. end
  291. def formatted_url
  292. linkify(@status_parser.url || @status_parser.uri)
  293. end
  294. def unsupported_media_type?(mime_type)
  295. mime_type.present? && !MediaAttachment.supported_mime_types.include?(mime_type)
  296. end
  297. def skip_download?
  298. return @skip_download if defined?(@skip_download)
  299. @skip_download ||= DomainBlock.reject_media?(@account.domain)
  300. end
  301. def reply_to_local?
  302. !replied_to_status.nil? && replied_to_status.account.local?
  303. end
  304. def related_to_local_activity?
  305. fetch? || followed_by_local_accounts? || requested_through_relay? ||
  306. responds_to_followed_account? || addresses_local_accounts?
  307. end
  308. def responds_to_followed_account?
  309. !replied_to_status.nil? && (replied_to_status.account.local? || replied_to_status.account.passive_relationships.exists?)
  310. end
  311. def addresses_local_accounts?
  312. return true if @options[:delivered_to_account_id]
  313. local_usernames = (audience_to + audience_cc).uniq.select { |uri| ActivityPub::TagManager.instance.local_uri?(uri) }.map { |uri| ActivityPub::TagManager.instance.uri_to_local_id(uri, :username) }
  314. return false if local_usernames.empty?
  315. Account.local.exists?(username: local_usernames)
  316. end
  317. def tombstone_exists?
  318. Tombstone.exists?(uri: object_uri)
  319. end
  320. def forward_for_reply
  321. return unless @status.distributable? && @json['signature'].present? && reply_to_local?
  322. ActivityPub::RawDistributionWorker.perform_async(Oj.dump(@json), replied_to_status.account_id, [@account.preferred_inbox_url])
  323. end
  324. def increment_voters_count!
  325. poll = replied_to_status.preloadable_poll
  326. unless poll.voters_count.nil?
  327. poll.voters_count = poll.voters_count + 1
  328. poll.save
  329. end
  330. rescue ActiveRecord::StaleObjectError
  331. poll.reload
  332. retry
  333. end
  334. end