account_conversation.rb 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132
  1. # frozen_string_literal: true
  2. # == Schema Information
  3. #
  4. # Table name: account_conversations
  5. #
  6. # id :bigint(8) not null, primary key
  7. # account_id :bigint(8)
  8. # conversation_id :bigint(8)
  9. # participant_account_ids :bigint(8) default([]), not null, is an Array
  10. # status_ids :bigint(8) default([]), not null, is an Array
  11. # last_status_id :bigint(8)
  12. # lock_version :integer default(0), not null
  13. # unread :boolean default(FALSE), not null
  14. #
  15. class AccountConversation < ApplicationRecord
  16. include Redisable
  17. attr_writer :participant_accounts
  18. before_validation :set_last_status
  19. after_commit :push_to_streaming_api
  20. belongs_to :account
  21. belongs_to :conversation
  22. belongs_to :last_status, class_name: 'Status'
  23. def participant_account_ids=(arr)
  24. self[:participant_account_ids] = arr.sort
  25. @participant_accounts = nil
  26. end
  27. def participant_accounts
  28. @participant_accounts ||= Account.where(id: participant_account_ids).to_a
  29. @participant_accounts.presence || [account]
  30. end
  31. class << self
  32. def to_a_paginated_by_id(limit, options = {})
  33. array = begin
  34. if options[:min_id]
  35. paginate_by_min_id(limit, options[:min_id], options[:max_id]).reverse
  36. else
  37. paginate_by_max_id(limit, options[:max_id], options[:since_id]).to_a
  38. end
  39. end
  40. # Preload participants
  41. participant_ids = array.flat_map(&:participant_account_ids)
  42. accounts_by_id = Account.where(id: participant_ids).index_by(&:id)
  43. array.each do |conversation|
  44. conversation.participant_accounts = conversation.participant_account_ids.filter_map { |id| accounts_by_id[id] }
  45. end
  46. array
  47. end
  48. def paginate_by_min_id(limit, min_id = nil, max_id = nil)
  49. query = order(arel_table[:last_status_id].asc).limit(limit)
  50. query = query.where(arel_table[:last_status_id].gt(min_id)) if min_id.present?
  51. query = query.where(arel_table[:last_status_id].lt(max_id)) if max_id.present?
  52. query
  53. end
  54. def paginate_by_max_id(limit, max_id = nil, since_id = nil)
  55. query = order(arel_table[:last_status_id].desc).limit(limit)
  56. query = query.where(arel_table[:last_status_id].lt(max_id)) if max_id.present?
  57. query = query.where(arel_table[:last_status_id].gt(since_id)) if since_id.present?
  58. query
  59. end
  60. def add_status(recipient, status)
  61. conversation = find_or_initialize_by(account: recipient, conversation_id: status.conversation_id, participant_account_ids: participants_from_status(recipient, status))
  62. return conversation if conversation.status_ids.include?(status.id)
  63. conversation.status_ids << status.id
  64. conversation.unread = status.account_id != recipient.id
  65. conversation.save
  66. conversation
  67. rescue ActiveRecord::StaleObjectError
  68. retry
  69. end
  70. def remove_status(recipient, status)
  71. conversation = find_by(account: recipient, conversation_id: status.conversation_id, participant_account_ids: participants_from_status(recipient, status))
  72. return if conversation.nil?
  73. conversation.status_ids.delete(status.id)
  74. if conversation.status_ids.empty?
  75. conversation.destroy
  76. else
  77. conversation.save
  78. end
  79. conversation
  80. rescue ActiveRecord::StaleObjectError
  81. retry
  82. end
  83. private
  84. def participants_from_status(recipient, status)
  85. ((status.active_mentions.pluck(:account_id) + [status.account_id]).uniq - [recipient.id]).sort
  86. end
  87. end
  88. private
  89. def set_last_status
  90. self.status_ids = status_ids.sort
  91. self.last_status_id = status_ids.last
  92. end
  93. def push_to_streaming_api
  94. return if destroyed? || !subscribed_to_timeline?
  95. PushConversationWorker.perform_async(id)
  96. end
  97. def subscribed_to_timeline?
  98. redis.exists?("subscribed:#{streaming_channel}")
  99. end
  100. def streaming_channel
  101. "timeline:direct:#{account_id}"
  102. end
  103. end