videos-redundancy-scheduler.ts 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221
  1. import { AbstractScheduler } from './abstract-scheduler'
  2. import { CONFIG, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers'
  3. import { logger } from '../../helpers/logger'
  4. import { VideosRedundancy } from '../../../shared/models/redundancy'
  5. import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
  6. import { VideoFileModel } from '../../models/video/video-file'
  7. import { downloadWebTorrentVideo } from '../../helpers/webtorrent'
  8. import { join } from 'path'
  9. import { move } from 'fs-extra'
  10. import { getServerActor } from '../../helpers/utils'
  11. import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send'
  12. import { getVideoCacheFileActivityPubUrl } from '../activitypub/url'
  13. import { removeVideoRedundancy } from '../redundancy'
  14. import { getOrCreateVideoAndAccountAndChannel } from '../activitypub'
  15. export class VideosRedundancyScheduler extends AbstractScheduler {
  16. private static instance: AbstractScheduler
  17. private executing = false
  18. protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
  19. private constructor () {
  20. super()
  21. }
  22. async execute () {
  23. if (this.executing) return
  24. this.executing = true
  25. for (const obj of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
  26. logger.info('Running redundancy scheduler for strategy %s.', obj.strategy)
  27. try {
  28. const videoToDuplicate = await this.findVideoToDuplicate(obj)
  29. if (!videoToDuplicate) continue
  30. const videoFiles = videoToDuplicate.VideoFiles
  31. videoFiles.forEach(f => f.Video = videoToDuplicate)
  32. await this.purgeCacheIfNeeded(obj, videoFiles)
  33. if (await this.isTooHeavy(obj, videoFiles)) {
  34. logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url)
  35. continue
  36. }
  37. logger.info('Will duplicate video %s in redundancy scheduler "%s".', videoToDuplicate.url, obj.strategy)
  38. await this.createVideoRedundancy(obj, videoFiles)
  39. } catch (err) {
  40. logger.error('Cannot run videos redundancy %s.', obj.strategy, { err })
  41. }
  42. }
  43. await this.extendsLocalExpiration()
  44. await this.purgeRemoteExpired()
  45. this.executing = false
  46. }
  47. static get Instance () {
  48. return this.instance || (this.instance = new this())
  49. }
  50. private async extendsLocalExpiration () {
  51. const expired = await VideoRedundancyModel.listLocalExpired()
  52. for (const redundancyModel of expired) {
  53. try {
  54. await this.extendsOrDeleteRedundancy(redundancyModel)
  55. } catch (err) {
  56. logger.error('Cannot extend expiration of %s video from our redundancy system.', this.buildEntryLogId(redundancyModel))
  57. }
  58. }
  59. }
  60. private async extendsOrDeleteRedundancy (redundancyModel: VideoRedundancyModel) {
  61. // Refresh the video, maybe it was deleted
  62. const video = await this.loadAndRefreshVideo(redundancyModel.VideoFile.Video.url)
  63. if (!video) {
  64. logger.info('Destroying existing redundancy %s, because the associated video does not exist anymore.', redundancyModel.url)
  65. await redundancyModel.destroy()
  66. return
  67. }
  68. const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
  69. await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime)
  70. }
  71. private async purgeRemoteExpired () {
  72. const expired = await VideoRedundancyModel.listRemoteExpired()
  73. for (const redundancyModel of expired) {
  74. try {
  75. await removeVideoRedundancy(redundancyModel)
  76. } catch (err) {
  77. logger.error('Cannot remove redundancy %s from our redundancy system.', this.buildEntryLogId(redundancyModel))
  78. }
  79. }
  80. }
  81. private findVideoToDuplicate (cache: VideosRedundancy) {
  82. if (cache.strategy === 'most-views') {
  83. return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
  84. }
  85. if (cache.strategy === 'trending') {
  86. return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
  87. }
  88. if (cache.strategy === 'recently-added') {
  89. const minViews = cache.minViews
  90. return VideoRedundancyModel.findRecentlyAddedToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR, minViews)
  91. }
  92. }
  93. private async createVideoRedundancy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
  94. const serverActor = await getServerActor()
  95. for (const file of filesToDuplicate) {
  96. const video = await this.loadAndRefreshVideo(file.Video.url)
  97. const existingRedundancy = await VideoRedundancyModel.loadLocalByFileId(file.id)
  98. if (existingRedundancy) {
  99. await this.extendsOrDeleteRedundancy(existingRedundancy)
  100. continue
  101. }
  102. if (!video) {
  103. logger.info('Video %s we want to duplicate does not existing anymore, skipping.', file.Video.url)
  104. continue
  105. }
  106. logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, redundancy.strategy)
  107. const { baseUrlHttp, baseUrlWs } = video.getBaseUrls()
  108. const magnetUri = video.generateMagnetUri(file, baseUrlHttp, baseUrlWs)
  109. const tmpPath = await downloadWebTorrentVideo({ magnetUri }, VIDEO_IMPORT_TIMEOUT)
  110. const destPath = join(CONFIG.STORAGE.REDUNDANCY_DIR, video.getVideoFilename(file))
  111. await move(tmpPath, destPath)
  112. const createdModel = await VideoRedundancyModel.create({
  113. expiresOn: this.buildNewExpiration(redundancy.minLifetime),
  114. url: getVideoCacheFileActivityPubUrl(file),
  115. fileUrl: video.getVideoRedundancyUrl(file, CONFIG.WEBSERVER.URL),
  116. strategy: redundancy.strategy,
  117. videoFileId: file.id,
  118. actorId: serverActor.id
  119. })
  120. createdModel.VideoFile = file
  121. await sendCreateCacheFile(serverActor, createdModel)
  122. logger.info('Duplicated %s - %d -> %s.', video.url, file.resolution, createdModel.url)
  123. }
  124. }
  125. private async extendsExpirationOf (redundancy: VideoRedundancyModel, expiresAfterMs: number) {
  126. logger.info('Extending expiration of %s.', redundancy.url)
  127. const serverActor = await getServerActor()
  128. redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs)
  129. await redundancy.save()
  130. await sendUpdateCacheFile(serverActor, redundancy)
  131. }
  132. private async purgeCacheIfNeeded (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
  133. while (this.isTooHeavy(redundancy, filesToDuplicate)) {
  134. const toDelete = await VideoRedundancyModel.loadOldestLocalThatAlreadyExpired(redundancy.strategy, redundancy.minLifetime)
  135. if (!toDelete) return
  136. await removeVideoRedundancy(toDelete)
  137. }
  138. }
  139. private async isTooHeavy (redundancy: VideosRedundancy, filesToDuplicate: VideoFileModel[]) {
  140. const maxSize = redundancy.size
  141. const totalDuplicated = await VideoRedundancyModel.getTotalDuplicated(redundancy.strategy)
  142. const totalWillDuplicate = totalDuplicated + this.getTotalFileSizes(filesToDuplicate)
  143. return totalWillDuplicate > maxSize
  144. }
  145. private buildNewExpiration (expiresAfterMs: number) {
  146. return new Date(Date.now() + expiresAfterMs)
  147. }
  148. private buildEntryLogId (object: VideoRedundancyModel) {
  149. return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}`
  150. }
  151. private getTotalFileSizes (files: VideoFileModel[]) {
  152. const fileReducer = (previous: number, current: VideoFileModel) => previous + current.size
  153. return files.reduce(fileReducer, 0)
  154. }
  155. private async loadAndRefreshVideo (videoUrl: string) {
  156. // We need more attributes and check if the video still exists
  157. const getVideoOptions = {
  158. videoObject: videoUrl,
  159. syncParam: { likes: false, dislikes: false, shares: false, comments: false, thumbnail: false, refreshVideo: true },
  160. fetchType: 'all' as 'all'
  161. }
  162. const { video } = await getOrCreateVideoAndAccountAndChannel(getVideoOptions)
  163. return video
  164. }
  165. }