send-utils.ts 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280
  1. import { Transaction } from 'sequelize'
  2. import { ActorFollowHealthCache } from '@server/lib/actor-follow-health-cache'
  3. import { getServerActor } from '@server/models/application/application'
  4. import { Activity, ActivityAudience, ActivitypubHttpBroadcastPayload } from '@shared/models'
  5. import { ContextType } from '@shared/models/activitypub/context'
  6. import { afterCommitIfTransaction } from '../../../../helpers/database-utils'
  7. import { logger } from '../../../../helpers/logger'
  8. import { ActorModel } from '../../../../models/actor/actor'
  9. import { ActorFollowModel } from '../../../../models/actor/actor-follow'
  10. import { MActor, MActorId, MActorLight, MActorWithInboxes, MVideoAccountLight, MVideoId, MVideoImmutable } from '../../../../types/models'
  11. import { JobQueue } from '../../../job-queue'
  12. import { getActorsInvolvedInVideo, getAudienceFromFollowersOf, getOriginVideoAudience } from './audience-utils'
  13. async function sendVideoRelatedActivity (activityBuilder: (audience: ActivityAudience) => Activity, options: {
  14. byActor: MActorLight
  15. video: MVideoImmutable | MVideoAccountLight
  16. contextType: ContextType
  17. transaction?: Transaction
  18. }) {
  19. const { byActor, video, transaction, contextType } = options
  20. // Send to origin
  21. if (video.isOwned() === false) {
  22. return sendVideoActivityToOrigin(activityBuilder, options)
  23. }
  24. const actorsInvolvedInVideo = await getActorsInvolvedInVideo(video, transaction)
  25. // Send to followers
  26. const audience = getAudienceFromFollowersOf(actorsInvolvedInVideo)
  27. const activity = activityBuilder(audience)
  28. const actorsException = [ byActor ]
  29. return broadcastToFollowers({
  30. data: activity,
  31. byActor,
  32. toFollowersOf: actorsInvolvedInVideo,
  33. transaction,
  34. actorsException,
  35. contextType
  36. })
  37. }
  38. async function sendVideoActivityToOrigin (activityBuilder: (audience: ActivityAudience) => Activity, options: {
  39. byActor: MActorLight
  40. video: MVideoImmutable | MVideoAccountLight
  41. contextType: ContextType
  42. actorsInvolvedInVideo?: MActorLight[]
  43. transaction?: Transaction
  44. }) {
  45. const { byActor, video, actorsInvolvedInVideo, transaction, contextType } = options
  46. if (video.isOwned()) throw new Error('Cannot send activity to owned video origin ' + video.url)
  47. let accountActor: MActorLight = (video as MVideoAccountLight).VideoChannel?.Account?.Actor
  48. if (!accountActor) accountActor = await ActorModel.loadAccountActorByVideoId(video.id, transaction)
  49. const audience = getOriginVideoAudience(accountActor, actorsInvolvedInVideo)
  50. const activity = activityBuilder(audience)
  51. return afterCommitIfTransaction(transaction, () => {
  52. return unicastTo({
  53. data: activity,
  54. byActor,
  55. toActorUrl: accountActor.getSharedInbox(),
  56. contextType
  57. })
  58. })
  59. }
  60. // ---------------------------------------------------------------------------
  61. async function forwardVideoRelatedActivity (
  62. activity: Activity,
  63. t: Transaction,
  64. followersException: MActorWithInboxes[],
  65. video: MVideoId
  66. ) {
  67. // Mastodon does not add our announces in audience, so we forward to them manually
  68. const additionalActors = await getActorsInvolvedInVideo(video, t)
  69. const additionalFollowerUrls = additionalActors.map(a => a.followersUrl)
  70. return forwardActivity(activity, t, followersException, additionalFollowerUrls)
  71. }
  72. async function forwardActivity (
  73. activity: Activity,
  74. t: Transaction,
  75. followersException: MActorWithInboxes[] = [],
  76. additionalFollowerUrls: string[] = []
  77. ) {
  78. logger.info('Forwarding activity %s.', activity.id)
  79. const to = activity.to || []
  80. const cc = activity.cc || []
  81. const followersUrls = additionalFollowerUrls
  82. for (const dest of to.concat(cc)) {
  83. if (dest.endsWith('/followers')) {
  84. followersUrls.push(dest)
  85. }
  86. }
  87. const toActorFollowers = await ActorModel.listByFollowersUrls(followersUrls, t)
  88. const uris = await computeFollowerUris(toActorFollowers, followersException, t)
  89. if (uris.length === 0) {
  90. logger.info('0 followers for %s, no forwarding.', toActorFollowers.map(a => a.id).join(', '))
  91. return undefined
  92. }
  93. logger.debug('Creating forwarding job.', { uris })
  94. const payload: ActivitypubHttpBroadcastPayload = {
  95. uris,
  96. body: activity,
  97. contextType: null
  98. }
  99. return afterCommitIfTransaction(t, () => JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload }))
  100. }
  101. // ---------------------------------------------------------------------------
  102. async function broadcastToFollowers (options: {
  103. data: any
  104. byActor: MActorId
  105. toFollowersOf: MActorId[]
  106. transaction: Transaction
  107. contextType: ContextType
  108. actorsException?: MActorWithInboxes[]
  109. }) {
  110. const { data, byActor, toFollowersOf, transaction, contextType, actorsException = [] } = options
  111. const uris = await computeFollowerUris(toFollowersOf, actorsException, transaction)
  112. return afterCommitIfTransaction(transaction, () => {
  113. return broadcastTo({
  114. uris,
  115. data,
  116. byActor,
  117. contextType
  118. })
  119. })
  120. }
  121. async function broadcastToActors (options: {
  122. data: any
  123. byActor: MActorId
  124. toActors: MActor[]
  125. transaction: Transaction
  126. contextType: ContextType
  127. actorsException?: MActorWithInboxes[]
  128. }) {
  129. const { data, byActor, toActors, transaction, contextType, actorsException = [] } = options
  130. const uris = await computeUris(toActors, actorsException)
  131. return afterCommitIfTransaction(transaction, () => {
  132. return broadcastTo({
  133. uris,
  134. data,
  135. byActor,
  136. contextType
  137. })
  138. })
  139. }
  140. function broadcastTo (options: {
  141. uris: string[]
  142. data: any
  143. byActor: MActorId
  144. contextType: ContextType
  145. }) {
  146. const { uris, data, byActor, contextType } = options
  147. if (uris.length === 0) return undefined
  148. const broadcastUris: string[] = []
  149. const unicastUris: string[] = []
  150. // Bad URIs could be slow to respond, prefer to process them in a dedicated queue
  151. for (const uri of uris) {
  152. if (ActorFollowHealthCache.Instance.isBadInbox(uri)) {
  153. unicastUris.push(uri)
  154. } else {
  155. broadcastUris.push(uri)
  156. }
  157. }
  158. logger.debug('Creating broadcast job.', { broadcastUris, unicastUris })
  159. if (broadcastUris.length !== 0) {
  160. const payload = {
  161. uris: broadcastUris,
  162. signatureActorId: byActor.id,
  163. body: data,
  164. contextType
  165. }
  166. JobQueue.Instance.createJob({ type: 'activitypub-http-broadcast', payload })
  167. }
  168. for (const unicastUri of unicastUris) {
  169. const payload = {
  170. uri: unicastUri,
  171. signatureActorId: byActor.id,
  172. body: data,
  173. contextType
  174. }
  175. JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
  176. }
  177. }
  178. function unicastTo (options: {
  179. data: any
  180. byActor: MActorId
  181. toActorUrl: string
  182. contextType: ContextType
  183. }) {
  184. const { data, byActor, toActorUrl, contextType } = options
  185. logger.debug('Creating unicast job.', { uri: toActorUrl })
  186. const payload = {
  187. uri: toActorUrl,
  188. signatureActorId: byActor.id,
  189. body: data,
  190. contextType
  191. }
  192. JobQueue.Instance.createJob({ type: 'activitypub-http-unicast', payload })
  193. }
  194. // ---------------------------------------------------------------------------
  195. export {
  196. broadcastToFollowers,
  197. unicastTo,
  198. forwardActivity,
  199. broadcastToActors,
  200. sendVideoActivityToOrigin,
  201. forwardVideoRelatedActivity,
  202. sendVideoRelatedActivity
  203. }
  204. // ---------------------------------------------------------------------------
  205. async function computeFollowerUris (toFollowersOf: MActorId[], actorsException: MActorWithInboxes[], t: Transaction) {
  206. const toActorFollowerIds = toFollowersOf.map(a => a.id)
  207. const result = await ActorFollowModel.listAcceptedFollowerSharedInboxUrls(toActorFollowerIds, t)
  208. const sharedInboxesException = await buildSharedInboxesException(actorsException)
  209. return result.data.filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
  210. }
  211. async function computeUris (toActors: MActor[], actorsException: MActorWithInboxes[] = []) {
  212. const serverActor = await getServerActor()
  213. const targetUrls = toActors
  214. .filter(a => a.id !== serverActor.id) // Don't send to ourselves
  215. .map(a => a.getSharedInbox())
  216. const toActorSharedInboxesSet = new Set(targetUrls)
  217. const sharedInboxesException = await buildSharedInboxesException(actorsException)
  218. return Array.from(toActorSharedInboxesSet)
  219. .filter(sharedInbox => sharedInboxesException.includes(sharedInbox) === false)
  220. }
  221. async function buildSharedInboxesException (actorsException: MActorWithInboxes[]) {
  222. const serverActor = await getServerActor()
  223. return actorsException
  224. .map(f => f.getSharedInbox())
  225. .concat([ serverActor.sharedInboxUrl ])
  226. }