videos-redundancy-scheduler.ts 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362
  1. import { move } from 'fs-extra'
  2. import { join } from 'path'
  3. import { getServerActor } from '@server/models/application/application'
  4. import { VideoModel } from '@server/models/video/video'
  5. import {
  6. MStreamingPlaylistFiles,
  7. MVideoAccountLight,
  8. MVideoFile,
  9. MVideoFileVideo,
  10. MVideoRedundancyFileVideo,
  11. MVideoRedundancyStreamingPlaylistVideo,
  12. MVideoRedundancyVideo,
  13. MVideoWithAllFiles
  14. } from '@server/types/models'
  15. import { VideosRedundancyStrategy } from '../../../shared/models/redundancy'
  16. import { logger, loggerTagsFactory } from '../../helpers/logger'
  17. import { downloadWebTorrentVideo } from '../../helpers/webtorrent'
  18. import { CONFIG } from '../../initializers/config'
  19. import { DIRECTORIES, REDUNDANCY, VIDEO_IMPORT_TIMEOUT } from '../../initializers/constants'
  20. import { VideoRedundancyModel } from '../../models/redundancy/video-redundancy'
  21. import { sendCreateCacheFile, sendUpdateCacheFile } from '../activitypub/send'
  22. import { getLocalVideoCacheFileActivityPubUrl, getLocalVideoCacheStreamingPlaylistActivityPubUrl } from '../activitypub/url'
  23. import { getOrCreateAPVideo } from '../activitypub/videos'
  24. import { downloadPlaylistSegments } from '../hls'
  25. import { removeVideoRedundancy } from '../redundancy'
  26. import { generateHLSRedundancyUrl, generateWebTorrentRedundancyUrl } from '../video-urls'
  27. import { AbstractScheduler } from './abstract-scheduler'
  28. const lTags = loggerTagsFactory('redundancy')
  29. type CandidateToDuplicate = {
  30. redundancy: VideosRedundancyStrategy
  31. video: MVideoWithAllFiles
  32. files: MVideoFile[]
  33. streamingPlaylists: MStreamingPlaylistFiles[]
  34. }
  35. function isMVideoRedundancyFileVideo (
  36. o: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo
  37. ): o is MVideoRedundancyFileVideo {
  38. return !!(o as MVideoRedundancyFileVideo).VideoFile
  39. }
  40. export class VideosRedundancyScheduler extends AbstractScheduler {
  41. private static instance: VideosRedundancyScheduler
  42. protected schedulerIntervalMs = CONFIG.REDUNDANCY.VIDEOS.CHECK_INTERVAL
  43. private constructor () {
  44. super()
  45. }
  46. async createManualRedundancy (videoId: number) {
  47. const videoToDuplicate = await VideoModel.loadWithFiles(videoId)
  48. if (!videoToDuplicate) {
  49. logger.warn('Video to manually duplicate %d does not exist anymore.', videoId, lTags())
  50. return
  51. }
  52. return this.createVideoRedundancies({
  53. video: videoToDuplicate,
  54. redundancy: null,
  55. files: videoToDuplicate.VideoFiles,
  56. streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists
  57. })
  58. }
  59. protected async internalExecute () {
  60. for (const redundancyConfig of CONFIG.REDUNDANCY.VIDEOS.STRATEGIES) {
  61. logger.info('Running redundancy scheduler for strategy %s.', redundancyConfig.strategy, lTags())
  62. try {
  63. const videoToDuplicate = await this.findVideoToDuplicate(redundancyConfig)
  64. if (!videoToDuplicate) continue
  65. const candidateToDuplicate = {
  66. video: videoToDuplicate,
  67. redundancy: redundancyConfig,
  68. files: videoToDuplicate.VideoFiles,
  69. streamingPlaylists: videoToDuplicate.VideoStreamingPlaylists
  70. }
  71. await this.purgeCacheIfNeeded(candidateToDuplicate)
  72. if (await this.isTooHeavy(candidateToDuplicate)) {
  73. logger.info('Video %s is too big for our cache, skipping.', videoToDuplicate.url, lTags(videoToDuplicate.uuid))
  74. continue
  75. }
  76. logger.info(
  77. 'Will duplicate video %s in redundancy scheduler "%s".',
  78. videoToDuplicate.url, redundancyConfig.strategy, lTags(videoToDuplicate.uuid)
  79. )
  80. await this.createVideoRedundancies(candidateToDuplicate)
  81. } catch (err) {
  82. logger.error('Cannot run videos redundancy %s.', redundancyConfig.strategy, { err, ...lTags() })
  83. }
  84. }
  85. await this.extendsLocalExpiration()
  86. await this.purgeRemoteExpired()
  87. }
  88. static get Instance () {
  89. return this.instance || (this.instance = new this())
  90. }
  91. private async extendsLocalExpiration () {
  92. const expired = await VideoRedundancyModel.listLocalExpired()
  93. for (const redundancyModel of expired) {
  94. try {
  95. const redundancyConfig = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
  96. const { totalUsed } = await VideoRedundancyModel.getStats(redundancyConfig.strategy)
  97. // If the administrator disabled the redundancy or decreased the cache size, remove this redundancy instead of extending it
  98. if (!redundancyConfig || totalUsed > redundancyConfig.size) {
  99. logger.info('Destroying redundancy %s because the cache size %s is too heavy.', redundancyModel.url, redundancyModel.strategy)
  100. await removeVideoRedundancy(redundancyModel)
  101. } else {
  102. await this.extendsRedundancy(redundancyModel)
  103. }
  104. } catch (err) {
  105. logger.error(
  106. 'Cannot extend or remove expiration of %s video from our redundancy system.',
  107. this.buildEntryLogId(redundancyModel), { err, ...lTags(redundancyModel.getVideoUUID()) }
  108. )
  109. }
  110. }
  111. }
  112. private async extendsRedundancy (redundancyModel: MVideoRedundancyVideo) {
  113. const redundancy = CONFIG.REDUNDANCY.VIDEOS.STRATEGIES.find(s => s.strategy === redundancyModel.strategy)
  114. // Redundancy strategy disabled, remove our redundancy instead of extending expiration
  115. if (!redundancy) {
  116. await removeVideoRedundancy(redundancyModel)
  117. return
  118. }
  119. await this.extendsExpirationOf(redundancyModel, redundancy.minLifetime)
  120. }
  121. private async purgeRemoteExpired () {
  122. const expired = await VideoRedundancyModel.listRemoteExpired()
  123. for (const redundancyModel of expired) {
  124. try {
  125. await removeVideoRedundancy(redundancyModel)
  126. } catch (err) {
  127. logger.error(
  128. 'Cannot remove redundancy %s from our redundancy system.',
  129. this.buildEntryLogId(redundancyModel), lTags(redundancyModel.getVideoUUID())
  130. )
  131. }
  132. }
  133. }
  134. private findVideoToDuplicate (cache: VideosRedundancyStrategy) {
  135. if (cache.strategy === 'most-views') {
  136. return VideoRedundancyModel.findMostViewToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
  137. }
  138. if (cache.strategy === 'trending') {
  139. return VideoRedundancyModel.findTrendingToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR)
  140. }
  141. if (cache.strategy === 'recently-added') {
  142. const minViews = cache.minViews
  143. return VideoRedundancyModel.findRecentlyAddedToDuplicate(REDUNDANCY.VIDEOS.RANDOMIZED_FACTOR, minViews)
  144. }
  145. }
  146. private async createVideoRedundancies (data: CandidateToDuplicate) {
  147. const video = await this.loadAndRefreshVideo(data.video.url)
  148. if (!video) {
  149. logger.info('Video %s we want to duplicate does not existing anymore, skipping.', data.video.url, lTags(data.video.uuid))
  150. return
  151. }
  152. for (const file of data.files) {
  153. const existingRedundancy = await VideoRedundancyModel.loadLocalByFileId(file.id)
  154. if (existingRedundancy) {
  155. await this.extendsRedundancy(existingRedundancy)
  156. continue
  157. }
  158. await this.createVideoFileRedundancy(data.redundancy, video, file)
  159. }
  160. for (const streamingPlaylist of data.streamingPlaylists) {
  161. const existingRedundancy = await VideoRedundancyModel.loadLocalByStreamingPlaylistId(streamingPlaylist.id)
  162. if (existingRedundancy) {
  163. await this.extendsRedundancy(existingRedundancy)
  164. continue
  165. }
  166. await this.createStreamingPlaylistRedundancy(data.redundancy, video, streamingPlaylist)
  167. }
  168. }
  169. private async createVideoFileRedundancy (redundancy: VideosRedundancyStrategy | null, video: MVideoAccountLight, fileArg: MVideoFile) {
  170. let strategy = 'manual'
  171. let expiresOn: Date = null
  172. if (redundancy) {
  173. strategy = redundancy.strategy
  174. expiresOn = this.buildNewExpiration(redundancy.minLifetime)
  175. }
  176. const file = fileArg as MVideoFileVideo
  177. file.Video = video
  178. const serverActor = await getServerActor()
  179. logger.info('Duplicating %s - %d in videos redundancy with "%s" strategy.', video.url, file.resolution, strategy, lTags(video.uuid))
  180. const tmpPath = await downloadWebTorrentVideo({ uri: file.torrentUrl }, VIDEO_IMPORT_TIMEOUT)
  181. const destPath = join(CONFIG.STORAGE.REDUNDANCY_DIR, file.filename)
  182. await move(tmpPath, destPath, { overwrite: true })
  183. const createdModel: MVideoRedundancyFileVideo = await VideoRedundancyModel.create({
  184. expiresOn,
  185. url: getLocalVideoCacheFileActivityPubUrl(file),
  186. fileUrl: generateWebTorrentRedundancyUrl(file),
  187. strategy,
  188. videoFileId: file.id,
  189. actorId: serverActor.id
  190. })
  191. createdModel.VideoFile = file
  192. await sendCreateCacheFile(serverActor, video, createdModel)
  193. logger.info('Duplicated %s - %d -> %s.', video.url, file.resolution, createdModel.url, lTags(video.uuid))
  194. }
  195. private async createStreamingPlaylistRedundancy (
  196. redundancy: VideosRedundancyStrategy,
  197. video: MVideoAccountLight,
  198. playlistArg: MStreamingPlaylistFiles
  199. ) {
  200. let strategy = 'manual'
  201. let expiresOn: Date = null
  202. if (redundancy) {
  203. strategy = redundancy.strategy
  204. expiresOn = this.buildNewExpiration(redundancy.minLifetime)
  205. }
  206. const playlist = Object.assign(playlistArg, { Video: video })
  207. const serverActor = await getServerActor()
  208. logger.info('Duplicating %s streaming playlist in videos redundancy with "%s" strategy.', video.url, strategy, lTags(video.uuid))
  209. const destDirectory = join(DIRECTORIES.HLS_REDUNDANCY, video.uuid)
  210. const masterPlaylistUrl = playlist.getMasterPlaylistUrl(video)
  211. const maxSizeKB = this.getTotalFileSizes([], [ playlist ]) / 1000
  212. const toleranceKB = maxSizeKB + ((5 * maxSizeKB) / 100) // 5% more tolerance
  213. await downloadPlaylistSegments(masterPlaylistUrl, destDirectory, VIDEO_IMPORT_TIMEOUT, toleranceKB)
  214. const createdModel: MVideoRedundancyStreamingPlaylistVideo = await VideoRedundancyModel.create({
  215. expiresOn,
  216. url: getLocalVideoCacheStreamingPlaylistActivityPubUrl(video, playlist),
  217. fileUrl: generateHLSRedundancyUrl(video, playlistArg),
  218. strategy,
  219. videoStreamingPlaylistId: playlist.id,
  220. actorId: serverActor.id
  221. })
  222. createdModel.VideoStreamingPlaylist = playlist
  223. await sendCreateCacheFile(serverActor, video, createdModel)
  224. logger.info('Duplicated playlist %s -> %s.', masterPlaylistUrl, createdModel.url, lTags(video.uuid))
  225. }
  226. private async extendsExpirationOf (redundancy: MVideoRedundancyVideo, expiresAfterMs: number) {
  227. logger.info('Extending expiration of %s.', redundancy.url, lTags(redundancy.getVideoUUID()))
  228. const serverActor = await getServerActor()
  229. redundancy.expiresOn = this.buildNewExpiration(expiresAfterMs)
  230. await redundancy.save()
  231. await sendUpdateCacheFile(serverActor, redundancy)
  232. }
  233. private async purgeCacheIfNeeded (candidateToDuplicate: CandidateToDuplicate) {
  234. while (await this.isTooHeavy(candidateToDuplicate)) {
  235. const redundancy = candidateToDuplicate.redundancy
  236. const toDelete = await VideoRedundancyModel.loadOldestLocalExpired(redundancy.strategy, redundancy.minLifetime)
  237. if (!toDelete) return
  238. const videoId = toDelete.VideoFile
  239. ? toDelete.VideoFile.videoId
  240. : toDelete.VideoStreamingPlaylist.videoId
  241. const redundancies = await VideoRedundancyModel.listLocalByVideoId(videoId)
  242. for (const redundancy of redundancies) {
  243. await removeVideoRedundancy(redundancy)
  244. }
  245. }
  246. }
  247. private async isTooHeavy (candidateToDuplicate: CandidateToDuplicate) {
  248. const maxSize = candidateToDuplicate.redundancy.size
  249. const { totalUsed: alreadyUsed } = await VideoRedundancyModel.getStats(candidateToDuplicate.redundancy.strategy)
  250. const videoSize = this.getTotalFileSizes(candidateToDuplicate.files, candidateToDuplicate.streamingPlaylists)
  251. const willUse = alreadyUsed + videoSize
  252. logger.debug('Checking candidate size.', { maxSize, alreadyUsed, videoSize, willUse, ...lTags(candidateToDuplicate.video.uuid) })
  253. return willUse > maxSize
  254. }
  255. private buildNewExpiration (expiresAfterMs: number) {
  256. return new Date(Date.now() + expiresAfterMs)
  257. }
  258. private buildEntryLogId (object: MVideoRedundancyFileVideo | MVideoRedundancyStreamingPlaylistVideo) {
  259. if (isMVideoRedundancyFileVideo(object)) return `${object.VideoFile.Video.url}-${object.VideoFile.resolution}`
  260. return `${object.VideoStreamingPlaylist.getMasterPlaylistUrl(object.VideoStreamingPlaylist.Video)}`
  261. }
  262. private getTotalFileSizes (files: MVideoFile[], playlists: MStreamingPlaylistFiles[]): number {
  263. const fileReducer = (previous: number, current: MVideoFile) => previous + current.size
  264. let allFiles = files
  265. for (const p of playlists) {
  266. allFiles = allFiles.concat(p.VideoFiles)
  267. }
  268. return allFiles.reduce(fileReducer, 0)
  269. }
  270. private async loadAndRefreshVideo (videoUrl: string) {
  271. // We need more attributes and check if the video still exists
  272. const getVideoOptions = {
  273. videoObject: videoUrl,
  274. syncParam: { rates: false, shares: false, comments: false, thumbnail: false, refreshVideo: true },
  275. fetchType: 'all' as 'all'
  276. }
  277. const { video } = await getOrCreateAPVideo(getVideoOptions)
  278. return video
  279. }
  280. }