job-queue.ts 18 KB


  1. import {
  2. FlowJob,
  3. FlowProducer,
  4. Job,
  5. JobsOptions,
  6. Queue,
  7. QueueEvents,
  8. QueueEventsOptions,
  9. QueueOptions,
  10. Worker,
  11. WorkerOptions
  12. } from 'bullmq'
  13. import { pick, timeoutPromise } from '@peertube/peertube-core-utils'
  14. import {
  15. ActivitypubFollowPayload,
  16. ActivitypubHttpBroadcastPayload,
  17. ActivitypubHttpFetcherPayload,
  18. ActivitypubHttpUnicastPayload,
  19. ActorKeysPayload,
  20. AfterVideoChannelImportPayload,
  21. DeleteResumableUploadMetaFilePayload,
  22. EmailPayload,
  23. FederateVideoPayload,
  24. GenerateStoryboardPayload,
  25. JobState,
  26. JobType,
  27. ManageVideoTorrentPayload,
  28. MoveStoragePayload,
  29. NotifyPayload,
  30. RefreshPayload,
  31. TranscodingJobBuilderPayload,
  32. VideoChannelImportPayload,
  33. VideoFileImportPayload,
  34. VideoImportPayload,
  35. VideoLiveEndingPayload,
  36. VideoRedundancyPayload,
  37. VideoStudioEditionPayload,
  38. VideoTranscodingPayload
  39. } from '@peertube/peertube-models'
  40. import { parseDurationToMs } from '@server/helpers/core-utils.js'
  41. import { jobStates } from '@server/helpers/custom-validators/jobs.js'
  42. import { CONFIG } from '@server/initializers/config.js'
  43. import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy.js'
  44. import { logger } from '../../helpers/logger.js'
  45. import { JOB_ATTEMPTS, JOB_CONCURRENCY, JOB_REMOVAL_OPTIONS, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants.js'
  46. import { Hooks } from '../plugins/hooks.js'
  47. import { Redis } from '../redis.js'
  48. import { processActivityPubCleaner } from './handlers/activitypub-cleaner.js'
  49. import { processActivityPubFollow } from './handlers/activitypub-follow.js'
  50. import {
  51. processActivityPubHttpSequentialBroadcast,
  52. processActivityPubParallelHttpBroadcast
  53. } from './handlers/activitypub-http-broadcast.js'
  54. import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher.js'
  55. import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast.js'
  56. import { refreshAPObject } from './handlers/activitypub-refresher.js'
  57. import { processActorKeys } from './handlers/actor-keys.js'
  58. import { processAfterVideoChannelImport } from './handlers/after-video-channel-import.js'
  59. import { processEmail } from './handlers/email.js'
  60. import { processFederateVideo } from './handlers/federate-video.js'
  61. import { processGenerateStoryboard } from './handlers/generate-storyboard.js'
  62. import { processManageVideoTorrent } from './handlers/manage-video-torrent.js'
  63. import { onMoveToObjectStorageFailure, processMoveToObjectStorage } from './handlers/move-to-object-storage.js'
  64. import { processNotify } from './handlers/notify.js'
  65. import { processTranscodingJobBuilder } from './handlers/transcoding-job-builder.js'
  66. import { processVideoChannelImport } from './handlers/video-channel-import.js'
  67. import { processVideoFileImport } from './handlers/video-file-import.js'
  68. import { processVideoImport } from './handlers/video-import.js'
  69. import { processVideoLiveEnding } from './handlers/video-live-ending.js'
  70. import { processVideoStudioEdition } from './handlers/video-studio-edition.js'
  71. import { processVideoTranscoding } from './handlers/video-transcoding.js'
  72. import { processVideosViewsStats } from './handlers/video-views-stats.js'
  73. import { onMoveToFileSystemFailure, processMoveToFileSystem } from './handlers/move-to-file-system.js'
  74. export type CreateJobArgument =
  75. { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
  76. { type: 'activitypub-http-broadcast-parallel', payload: ActivitypubHttpBroadcastPayload } |
  77. { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
  78. { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
  79. { type: 'activitypub-cleaner', payload: {} } |
  80. { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
  81. { type: 'video-file-import', payload: VideoFileImportPayload } |
  82. { type: 'video-transcoding', payload: VideoTranscodingPayload } |
  83. { type: 'email', payload: EmailPayload } |
  84. { type: 'transcoding-job-builder', payload: TranscodingJobBuilderPayload } |
  85. { type: 'video-import', payload: VideoImportPayload } |
  86. { type: 'activitypub-refresher', payload: RefreshPayload } |
  87. { type: 'videos-views-stats', payload: {} } |
  88. { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
  89. { type: 'actor-keys', payload: ActorKeysPayload } |
  90. { type: 'video-redundancy', payload: VideoRedundancyPayload } |
  91. { type: 'delete-resumable-upload-meta-file', payload: DeleteResumableUploadMetaFilePayload } |
  92. { type: 'video-studio-edition', payload: VideoStudioEditionPayload } |
  93. { type: 'manage-video-torrent', payload: ManageVideoTorrentPayload } |
  94. { type: 'move-to-object-storage', payload: MoveStoragePayload } |
  95. { type: 'move-to-file-system', payload: MoveStoragePayload } |
  96. { type: 'video-channel-import', payload: VideoChannelImportPayload } |
  97. { type: 'after-video-channel-import', payload: AfterVideoChannelImportPayload } |
  98. { type: 'notify', payload: NotifyPayload } |
  99. { type: 'federate-video', payload: FederateVideoPayload } |
  100. { type: 'generate-video-storyboard', payload: GenerateStoryboardPayload }
  101. export type CreateJobOptions = {
  102. delay?: number
  103. priority?: number
  104. failParentOnFailure?: boolean
  105. }
  106. const handlers: { [id in JobType]: (job: Job) => Promise<any> } = {
  107. 'activitypub-cleaner': processActivityPubCleaner,
  108. 'activitypub-follow': processActivityPubFollow,
  109. 'activitypub-http-broadcast-parallel': processActivityPubParallelHttpBroadcast,
  110. 'activitypub-http-broadcast': processActivityPubHttpSequentialBroadcast,
  111. 'activitypub-http-fetcher': processActivityPubHttpFetcher,
  112. 'activitypub-http-unicast': processActivityPubHttpUnicast,
  113. 'activitypub-refresher': refreshAPObject,
  114. 'actor-keys': processActorKeys,
  115. 'after-video-channel-import': processAfterVideoChannelImport,
  116. 'email': processEmail,
  117. 'federate-video': processFederateVideo,
  118. 'transcoding-job-builder': processTranscodingJobBuilder,
  119. 'manage-video-torrent': processManageVideoTorrent,
  120. 'move-to-object-storage': processMoveToObjectStorage,
  121. 'move-to-file-system': processMoveToFileSystem,
  122. 'notify': processNotify,
  123. 'video-channel-import': processVideoChannelImport,
  124. 'video-file-import': processVideoFileImport,
  125. 'video-import': processVideoImport,
  126. 'video-live-ending': processVideoLiveEnding,
  127. 'video-redundancy': processVideoRedundancy,
  128. 'video-studio-edition': processVideoStudioEdition,
  129. 'video-transcoding': processVideoTranscoding,
  130. 'videos-views-stats': processVideosViewsStats,
  131. 'generate-video-storyboard': processGenerateStoryboard
  132. }
  133. const errorHandlers: { [id in JobType]?: (job: Job, err: any) => Promise<any> } = {
  134. 'move-to-object-storage': onMoveToObjectStorageFailure,
  135. 'move-to-file-system': onMoveToFileSystemFailure
  136. }
  137. const jobTypes: JobType[] = [
  138. 'activitypub-cleaner',
  139. 'activitypub-follow',
  140. 'activitypub-http-broadcast-parallel',
  141. 'activitypub-http-broadcast',
  142. 'activitypub-http-fetcher',
  143. 'activitypub-http-unicast',
  144. 'activitypub-refresher',
  145. 'actor-keys',
  146. 'after-video-channel-import',
  147. 'email',
  148. 'federate-video',
  149. 'generate-video-storyboard',
  150. 'manage-video-torrent',
  151. 'move-to-object-storage',
  152. 'move-to-file-system',
  153. 'notify',
  154. 'transcoding-job-builder',
  155. 'video-channel-import',
  156. 'video-file-import',
  157. 'video-import',
  158. 'video-live-ending',
  159. 'video-redundancy',
  160. 'video-studio-edition',
  161. 'video-transcoding',
  162. 'videos-views-stats'
  163. ]
  164. const silentFailure = new Set<JobType>([ 'activitypub-http-unicast' ])
  165. class JobQueue {
  166. private static instance: JobQueue
  167. private workers: { [id in JobType]?: Worker } = {}
  168. private queues: { [id in JobType]?: Queue } = {}
  169. private queueEvents: { [id in JobType]?: QueueEvents } = {}
  170. private flowProducer: FlowProducer
  171. private initialized = false
  172. private jobRedisPrefix: string
  173. private constructor () {
  174. }
  175. init () {
  176. // Already initialized
  177. if (this.initialized === true) return
  178. this.initialized = true
  179. this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
  180. for (const handlerName of Object.keys(handlers)) {
  181. this.buildWorker(handlerName)
  182. this.buildQueue(handlerName)
  183. this.buildQueueEvent(handlerName)
  184. }
  185. this.flowProducer = new FlowProducer({
  186. connection: Redis.getRedisClientOptions('FlowProducer'),
  187. prefix: this.jobRedisPrefix
  188. })
  189. this.flowProducer.on('error', err => { logger.error('Error in flow producer', { err }) })
  190. this.addRepeatableJobs()
  191. }
  192. private buildWorker (handlerName: JobType) {
  193. const workerOptions: WorkerOptions = {
  194. autorun: false,
  195. concurrency: this.getJobConcurrency(handlerName),
  196. prefix: this.jobRedisPrefix,
  197. connection: Redis.getRedisClientOptions('Worker'),
  198. maxStalledCount: 10
  199. }
  200. const handler = function (job: Job) {
  201. const timeout = JOB_TTL[handlerName]
  202. const p = handlers[handlerName](job)
  203. if (!timeout) return p
  204. return timeoutPromise(p, timeout)
  205. }
  206. const processor = async (jobArg: Job<any>) => {
  207. const job = await Hooks.wrapObject(jobArg, 'filter:job-queue.process.params', { type: handlerName })
  208. return Hooks.wrapPromiseFun(handler, job, 'filter:job-queue.process.result')
  209. }
  210. const worker = new Worker(handlerName, processor, workerOptions)
  211. worker.on('failed', (job, err) => {
  212. const logLevel = silentFailure.has(handlerName)
  213. ? 'debug'
  214. : 'error'
  215. logger.log(logLevel, 'Cannot execute job %s in queue %s.', job.id, handlerName, { payload: job.data, err })
  216. if (errorHandlers[job.name]) {
  217. errorHandlers[job.name](job, err)
  218. .catch(err => logger.error('Cannot run error handler for job failure %d in queue %s.', job.id, handlerName, { err }))
  219. }
  220. })
  221. worker.on('error', err => { logger.error('Error in job worker %s.', handlerName, { err }) })
  222. this.workers[handlerName] = worker
  223. }
  224. private buildQueue (handlerName: JobType) {
  225. const queueOptions: QueueOptions = {
  226. connection: Redis.getRedisClientOptions('Queue'),
  227. prefix: this.jobRedisPrefix
  228. }
  229. const queue = new Queue(handlerName, queueOptions)
  230. queue.on('error', err => { logger.error('Error in job queue %s.', handlerName, { err }) })
  231. this.queues[handlerName] = queue
  232. queue.removeDeprecatedPriorityKey()
  233. .catch(err => logger.error('Cannot remove bullmq deprecated priority keys of ' + handlerName, { err }))
  234. }
  235. private buildQueueEvent (handlerName: JobType) {
  236. const queueEventsOptions: QueueEventsOptions = {
  237. autorun: false,
  238. connection: Redis.getRedisClientOptions('QueueEvent'),
  239. prefix: this.jobRedisPrefix
  240. }
  241. const queueEvents = new QueueEvents(handlerName, queueEventsOptions)
  242. queueEvents.on('error', err => { logger.error('Error in job queue events %s.', handlerName, { err }) })
  243. this.queueEvents[handlerName] = queueEvents
  244. }
  245. // ---------------------------------------------------------------------------
  246. async terminate () {
  247. const promises = Object.keys(this.workers)
  248. .map(handlerName => {
  249. const worker: Worker = this.workers[handlerName]
  250. const queue: Queue = this.queues[handlerName]
  251. const queueEvent: QueueEvents = this.queueEvents[handlerName]
  252. return Promise.all([
  253. worker.close(false),
  254. queue.close(),
  255. queueEvent.close()
  256. ])
  257. })
  258. return Promise.all(promises)
  259. }
  260. start () {
  261. const promises = Object.keys(this.workers)
  262. .map(handlerName => {
  263. const worker: Worker = this.workers[handlerName]
  264. const queueEvent: QueueEvents = this.queueEvents[handlerName]
  265. return Promise.all([
  266. worker.run(),
  267. queueEvent.run()
  268. ])
  269. })
  270. return Promise.all(promises)
  271. }
  272. async pause () {
  273. for (const handlerName of Object.keys(this.workers)) {
  274. const worker: Worker = this.workers[handlerName]
  275. await worker.pause()
  276. }
  277. }
  278. resume () {
  279. for (const handlerName of Object.keys(this.workers)) {
  280. const worker: Worker = this.workers[handlerName]
  281. worker.resume()
  282. }
  283. }
  284. // ---------------------------------------------------------------------------
  285. createJobAsync (options: CreateJobArgument & CreateJobOptions): void {
  286. this.createJob(options)
  287. .catch(err => logger.error('Cannot create job.', { err, options }))
  288. }
  289. createJob (options: CreateJobArgument & CreateJobOptions) {
  290. const queue: Queue = this.queues[options.type]
  291. if (queue === undefined) {
  292. logger.error('Unknown queue %s: cannot create job.', options.type)
  293. return
  294. }
  295. const jobOptions = this.buildJobOptions(options.type as JobType, pick(options, [ 'priority', 'delay' ]))
  296. return queue.add('job', options.payload, jobOptions)
  297. }
  298. createSequentialJobFlow (...jobs: ((CreateJobArgument & CreateJobOptions) | undefined)[]) {
  299. let lastJob: FlowJob
  300. for (const job of jobs) {
  301. if (!job) continue
  302. lastJob = {
  303. ...this.buildJobFlowOption(job),
  304. children: lastJob
  305. ? [ lastJob ]
  306. : []
  307. }
  308. }
  309. return this.flowProducer.add(lastJob)
  310. }
  311. createJobWithChildren (parent: CreateJobArgument & CreateJobOptions, children: (CreateJobArgument & CreateJobOptions)[]) {
  312. return this.flowProducer.add({
  313. ...this.buildJobFlowOption(parent),
  314. children: children.map(c => this.buildJobFlowOption(c))
  315. })
  316. }
  317. private buildJobFlowOption (job: CreateJobArgument & CreateJobOptions): FlowJob {
  318. return {
  319. name: 'job',
  320. data: job.payload,
  321. queueName: job.type,
  322. opts: {
  323. failParentOnFailure: true,
  324. ...this.buildJobOptions(job.type as JobType, pick(job, [ 'priority', 'delay', 'failParentOnFailure' ]))
  325. }
  326. }
  327. }
  328. private buildJobOptions (type: JobType, options: CreateJobOptions = {}): JobsOptions {
  329. return {
  330. backoff: { delay: 60 * 1000, type: 'exponential' },
  331. attempts: JOB_ATTEMPTS[type],
  332. priority: options.priority,
  333. delay: options.delay,
  334. ...this.buildJobRemovalOptions(type)
  335. }
  336. }
  337. // ---------------------------------------------------------------------------
  338. async listForApi (options: {
  339. state?: JobState
  340. start: number
  341. count: number
  342. asc?: boolean
  343. jobType: JobType
  344. }): Promise<Job[]> {
  345. const { state, start, count, asc, jobType } = options
  346. const states = this.buildStateFilter(state)
  347. const filteredJobTypes = this.buildTypeFilter(jobType)
  348. let results: Job[] = []
  349. for (const jobType of filteredJobTypes) {
  350. const queue: Queue = this.queues[jobType]
  351. if (queue === undefined) {
  352. logger.error('Unknown queue %s to list jobs.', jobType)
  353. continue
  354. }
  355. let jobs = await queue.getJobs(states, 0, start + count, asc)
  356. // FIXME: we have sometimes undefined values https://github.com/taskforcesh/bullmq/issues/248
  357. jobs = jobs.filter(j => !!j)
  358. results = results.concat(jobs)
  359. }
  360. results.sort((j1: any, j2: any) => {
  361. if (j1.timestamp < j2.timestamp) return -1
  362. else if (j1.timestamp === j2.timestamp) return 0
  363. return 1
  364. })
  365. if (asc === false) results.reverse()
  366. return results.slice(start, start + count)
  367. }
  368. async count (state: JobState, jobType?: JobType): Promise<number> {
  369. const states = state ? [ state ] : jobStates
  370. const filteredJobTypes = this.buildTypeFilter(jobType)
  371. let total = 0
  372. for (const type of filteredJobTypes) {
  373. const queue = this.queues[type]
  374. if (queue === undefined) {
  375. logger.error('Unknown queue %s to count jobs.', type)
  376. continue
  377. }
  378. const counts = await queue.getJobCounts()
  379. for (const s of states) {
  380. total += counts[s]
  381. }
  382. }
  383. return total
  384. }
  385. private buildStateFilter (state?: JobState) {
  386. if (!state) return Array.from(jobStates)
  387. const states = [ state ]
  388. // Include parent and prioritized if filtering on waiting
  389. if (state === 'waiting') {
  390. states.push('waiting-children')
  391. states.push('prioritized')
  392. }
  393. return states
  394. }
  395. private buildTypeFilter (jobType?: JobType) {
  396. if (!jobType) return jobTypes
  397. return jobTypes.filter(t => t === jobType)
  398. }
  399. async getStats () {
  400. const promises = jobTypes.map(async t => ({ jobType: t, counts: await this.queues[t].getJobCounts() }))
  401. return Promise.all(promises)
  402. }
  403. // ---------------------------------------------------------------------------
  404. async removeOldJobs () {
  405. for (const key of Object.keys(this.queues)) {
  406. const queue: Queue = this.queues[key]
  407. await queue.clean(parseDurationToMs('7 days'), 1000, 'completed')
  408. await queue.clean(parseDurationToMs('7 days'), 1000, 'failed')
  409. }
  410. }
  411. private addRepeatableJobs () {
  412. this.queues['videos-views-stats'].add('job', {}, {
  413. repeat: REPEAT_JOBS['videos-views-stats'],
  414. ...this.buildJobRemovalOptions('videos-views-stats')
  415. }).catch(err => logger.error('Cannot add repeatable job.', { err }))
  416. if (CONFIG.FEDERATION.VIDEOS.CLEANUP_REMOTE_INTERACTIONS) {
  417. this.queues['activitypub-cleaner'].add('job', {}, {
  418. repeat: REPEAT_JOBS['activitypub-cleaner'],
  419. ...this.buildJobRemovalOptions('activitypub-cleaner')
  420. }).catch(err => logger.error('Cannot add repeatable job.', { err }))
  421. }
  422. }
  423. private getJobConcurrency (jobType: JobType) {
  424. if (jobType === 'video-transcoding') return CONFIG.TRANSCODING.CONCURRENCY
  425. if (jobType === 'video-import') return CONFIG.IMPORT.VIDEOS.CONCURRENCY
  426. return JOB_CONCURRENCY[jobType]
  427. }
  428. private buildJobRemovalOptions (queueName: string) {
  429. return {
  430. removeOnComplete: {
  431. // Wants seconds
  432. age: (JOB_REMOVAL_OPTIONS.SUCCESS[queueName] || JOB_REMOVAL_OPTIONS.SUCCESS.DEFAULT) / 1000,
  433. count: JOB_REMOVAL_OPTIONS.COUNT
  434. },
  435. removeOnFail: {
  436. // Wants seconds
  437. age: (JOB_REMOVAL_OPTIONS.FAILURE[queueName] || JOB_REMOVAL_OPTIONS.FAILURE.DEFAULT) / 1000,
  438. count: JOB_REMOVAL_OPTIONS.COUNT / 1000
  439. }
  440. }
  441. }
  442. static get Instance () {
  443. return this.instance || (this.instance = new this())
  444. }
  445. }
  446. // ---------------------------------------------------------------------------
  447. export {
  448. jobTypes,
  449. JobQueue
  450. }