Browse Source

Support progress for ffmpeg tasks

Chocobozzz 3 years ago
parent
commit
3b01f4c0ac

+ 10 - 3
client/src/app/+admin/system/jobs/jobs.component.html

@@ -40,7 +40,8 @@
       <th style="width: 40px"></th>
       <th style="width: calc(100% - 390px)" class="job-id" i18n>ID</th>
       <th style="width: 200px" class="job-type" i18n>Type</th>
-      <th style="width: 200px" class="job-type" i18n *ngIf="jobState === 'all'">State</th>
+      <th style="width: 200px" class="job-state" i18n *ngIf="jobState === 'all'">State</th>
+      <th style="width: 100px" class="job-progress" i18n *ngIf="hasProgress()">Progress</th>
       <th style="width: 150px" class="job-date" i18n pSortableColumn="createdAt">Created <p-sortIcon field="createdAt"></p-sortIcon></th>
     </tr>
   </ng-template>
@@ -55,9 +56,15 @@
 
       <td class="job-id c-hand" [pRowToggler]="job" [title]="job.id">{{ job.id }}</td>
       <td class="job-type c-hand" [pRowToggler]="job">{{ job.type }}</td>
-      <td class="job-type c-hand" [pRowToggler]="job" *ngIf="jobState === 'all'">
+
+      <td class="job-state c-hand" [pRowToggler]="job" *ngIf="jobState === 'all'">
         <span class="badge" [ngClass]="getJobStateClass(job.state)">{{ job.state }}</span>
       </td>
+
+      <td class="job-state" [pRowToggler]="job" *ngIf="hasProgress()">
+        {{ getProgress(job) }}
+      </td>
+
       <td class="job-date c-hand" [pRowToggler]="job">{{ job.createdAt | date: 'short' }}</td>
     </tr>
   </ng-template>
@@ -94,7 +101,7 @@
               <ng-container *ngIf="jobType === 'all'" i18n>No jobs found.</ng-container>
               <ng-container *ngIf="jobType !== 'all'" i18n>No <code>{{ jobType }}</code> jobs found.</ng-container>
             </ng-container>
-            <ng-container *ngIf="jobState !== 'all'"> 
+            <ng-container *ngIf="jobState !== 'all'">
               <ng-container *ngIf="jobType === 'all'" i18n>No <span class="badge" [ngClass]="getJobStateClass(jobState)">{{ jobState }}</span> jobs found.</ng-container>
               <ng-container *ngIf="jobType !== 'all'" i18n>No <code>{{ jobType }}</code> jobs found that are <span class="badge" [ngClass]="getJobStateClass(jobState)">{{ jobState }}</span>.</ng-container>
             </ng-container>

+ 2 - 1
client/src/app/+admin/system/jobs/jobs.component.scss

@@ -9,7 +9,8 @@
   max-width: 30vw !important;
 }
 
-.job-type {
+.job-type,
+.job-state {
   width: 150px !important;
 }
 

+ 10 - 0
client/src/app/+admin/system/jobs/jobs.component.ts

@@ -83,6 +83,16 @@ export class JobsComponent extends RestTable implements OnInit {
     this.saveJobStateAndType()
   }
 
+  hasProgress () {
+    return this.jobType === 'all' || this.jobType === 'video-transcoding'
+  }
+
+  getProgress (job: Job) {
+    if (job.state === 'active') return job.progress + '%'
+
+    return ''
+  }
+
   protected loadData () {
     let jobState = this.jobState as JobState
     if (this.jobState === 'all') jobState = null

+ 4 - 9
server/controllers/api/jobs.ts

@@ -52,28 +52,23 @@ async function listJobs (req: express.Request, res: express.Response) {
 
   const result: ResultList<Job> = {
     total,
-    data: state
-      ? jobs.map(j => formatJob(j, state))
-      : await Promise.all(jobs.map(j => formatJobWithUnknownState(j)))
+    data: await Promise.all(jobs.map(j => formatJob(j, state)))
   }
 
   return res.json(result)
 }
 
-async function formatJobWithUnknownState (job: any) {
-  return formatJob(job, await job.getState())
-}
-
-function formatJob (job: any, state: JobState): Job {
+async function formatJob (job: any, state?: JobState): Promise<Job> {
   const error = isArray(job.stacktrace) && job.stacktrace.length !== 0
     ? job.stacktrace[0]
     : null
 
   return {
     id: job.id,
-    state: state,
+    state: state || await job.getState(),
     type: job.queue.name as JobType,
     data: job.data,
+    progress: await job.progress(),
     error,
     createdAt: new Date(job.timestamp),
     finishedOn: new Date(job.finishedOn),

+ 14 - 6
server/helpers/ffmpeg-utils.ts

@@ -1,3 +1,4 @@
+import { Job } from 'bull'
 import * as ffmpeg from 'fluent-ffmpeg'
 import { readFile, remove, writeFile } from 'fs-extra'
 import { dirname, join } from 'path'
@@ -124,6 +125,8 @@ interface BaseTranscodeOptions {
   resolution: VideoResolution
 
   isPortraitMode?: boolean
+
+  job?: Job
 }
 
 interface HLSTranscodeOptions extends BaseTranscodeOptions {
@@ -188,7 +191,7 @@ async function transcode (options: TranscodeOptions) {
 
   command = await builders[options.type](command, options)
 
-  await runCommand(command)
+  await runCommand(command, options.job)
 
   await fixHLSPlaylistIfNeeded(options)
 }
@@ -611,11 +614,9 @@ function getFFmpeg (input: string, type: 'live' | 'vod') {
   return command
 }
 
-async function runCommand (command: ffmpeg.FfmpegCommand, onEnd?: Function) {
+async function runCommand (command: ffmpeg.FfmpegCommand, job?: Job) {
   return new Promise<void>((res, rej) => {
     command.on('error', (err, stdout, stderr) => {
-      if (onEnd) onEnd()
-
       logger.error('Error in transcoding job.', { stdout, stderr })
       rej(err)
     })
@@ -623,11 +624,18 @@ async function runCommand (command: ffmpeg.FfmpegCommand, onEnd?: Function) {
     command.on('end', (stdout, stderr) => {
       logger.debug('FFmpeg command ended.', { stdout, stderr })
 
-      if (onEnd) onEnd()
-
       res()
     })
 
+    if (job) {
+      command.on('progress', progress => {
+        if (!progress.percent) return
+
+        job.progress(Math.round(progress.percent))
+          .catch(err => logger.warn('Cannot set ffmpeg job progress.', { err }))
+      })
+    }
+
     command.run()
   })
 }

+ 5 - 4
server/lib/job-queue/handlers/video-transcoding.ts

@@ -44,20 +44,21 @@ async function processVideoTranscoding (job: Bull.Job) {
       videoInputPath,
       resolution: payload.resolution,
       copyCodecs: payload.copyCodecs,
-      isPortraitMode: payload.isPortraitMode || false
+      isPortraitMode: payload.isPortraitMode || false,
+      job
     })
 
     await retryTransactionWrapper(onHlsPlaylistGenerationSuccess, video)
   } else if (payload.type === 'new-resolution') {
-    await transcodeNewResolution(video, payload.resolution, payload.isPortraitMode || false)
+    await transcodeNewResolution(video, payload.resolution, payload.isPortraitMode || false, job)
 
     await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
   } else if (payload.type === 'merge-audio') {
-    await mergeAudioVideofile(video, payload.resolution)
+    await mergeAudioVideofile(video, payload.resolution, job)
 
     await retryTransactionWrapper(publishNewResolutionIfNeeded, video, payload)
   } else {
-    const transcodeType = await optimizeOriginalVideofile(video)
+    const transcodeType = await optimizeOriginalVideofile(video, video.getMaxQualityFile(), job)
 
     await retryTransactionWrapper(onVideoFileOptimizerSuccess, video, payload, transcodeType)
   }

+ 25 - 11
server/lib/video-transcoding.ts

@@ -1,3 +1,4 @@
+import { Job } from 'bull'
 import { copyFile, ensureDir, move, remove, stat } from 'fs-extra'
 import { basename, extname as extnameUtil, join } from 'path'
 import { createTorrentAndSetInfoHash } from '@server/helpers/webtorrent'
@@ -23,11 +24,10 @@ import { availableEncoders } from './video-transcoding-profiles'
  */
 
 // Optimize the original video file and replace it. The resolution is not changed.
-async function optimizeOriginalVideofile (video: MVideoWithFile, inputVideoFileArg?: MVideoFile) {
+async function optimizeOriginalVideofile (video: MVideoWithFile, inputVideoFile: MVideoFile, job?: Job) {
   const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
   const newExtname = '.mp4'
 
-  const inputVideoFile = inputVideoFileArg || video.getMaxQualityFile()
   const videoInputPath = getVideoFilePath(video, inputVideoFile)
   const videoTranscodedPath = join(transcodeDirectory, video.id + '-transcoded' + newExtname)
 
@@ -44,7 +44,9 @@ async function optimizeOriginalVideofile (video: MVideoWithFile, inputVideoFileA
     availableEncoders,
     profile: 'default',
 
-    resolution: inputVideoFile.resolution
+    resolution: inputVideoFile.resolution,
+
+    job
   }
 
   // Could be very long!
@@ -70,7 +72,7 @@ async function optimizeOriginalVideofile (video: MVideoWithFile, inputVideoFileA
 }
 
 // Transcode the original video file to a lower resolution.
-async function transcodeNewResolution (video: MVideoWithFile, resolution: VideoResolution, isPortrait: boolean) {
+async function transcodeNewResolution (video: MVideoWithFile, resolution: VideoResolution, isPortrait: boolean, job: Job) {
   const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
   const extname = '.mp4'
 
@@ -96,7 +98,9 @@ async function transcodeNewResolution (video: MVideoWithFile, resolution: VideoR
       availableEncoders,
       profile: 'default',
 
-      resolution
+      resolution,
+
+      job
     }
     : {
       type: 'video' as 'video',
@@ -107,7 +111,9 @@ async function transcodeNewResolution (video: MVideoWithFile, resolution: VideoR
       profile: 'default',
 
       resolution,
-      isPortraitMode: isPortrait
+      isPortraitMode: isPortrait,
+
+      job
     }
 
   await transcode(transcodeOptions)
@@ -116,7 +122,7 @@ async function transcodeNewResolution (video: MVideoWithFile, resolution: VideoR
 }
 
 // Merge an image with an audio file to create a video
-async function mergeAudioVideofile (video: MVideoWithAllFiles, resolution: VideoResolution) {
+async function mergeAudioVideofile (video: MVideoWithAllFiles, resolution: VideoResolution, job: Job) {
   const transcodeDirectory = CONFIG.STORAGE.TMP_DIR
   const newExtname = '.mp4'
 
@@ -140,7 +146,9 @@ async function mergeAudioVideofile (video: MVideoWithAllFiles, resolution: Video
     profile: 'default',
 
     audioPath: audioInputPath,
-    resolution
+    resolution,
+
+    job
   }
 
   try {
@@ -190,6 +198,7 @@ function generateHlsPlaylist (options: {
   resolution: VideoResolution
   copyCodecs: boolean
   isPortraitMode: boolean
+  job?: Job
 }) {
   return generateHlsPlaylistCommon({
     video: options.video,
@@ -197,7 +206,8 @@ function generateHlsPlaylist (options: {
     copyCodecs: options.copyCodecs,
     isPortraitMode: options.isPortraitMode,
     inputPath: options.videoInputPath,
-    type: 'hls' as 'hls'
+    type: 'hls' as 'hls',
+    job: options.job
   })
 }
 
@@ -251,8 +261,10 @@ async function generateHlsPlaylistCommon (options: {
   copyCodecs?: boolean
   isAAC?: boolean
   isPortraitMode: boolean
+
+  job?: Job
 }) {
-  const { type, video, inputPath, resolution, copyCodecs, isPortraitMode, isAAC } = options
+  const { type, video, inputPath, resolution, copyCodecs, isPortraitMode, isAAC, job } = options
 
   const baseHlsDirectory = join(HLS_STREAMING_PLAYLIST_DIRECTORY, video.uuid)
   await ensureDir(join(HLS_STREAMING_PLAYLIST_DIRECTORY, video.uuid))
@@ -277,7 +289,9 @@ async function generateHlsPlaylistCommon (options: {
 
     hlsPlaylist: {
       videoFilename
-    }
+    },
+
+    job
   }
 
   await transcode(transcodeOptions)

+ 1 - 0
shared/models/server/job.model.ts

@@ -23,6 +23,7 @@ export interface Job {
   state: JobState
   type: JobType
   data: any
+  progress: number
   error: any
   createdAt: Date | string
   finishedOn: Date | string