job-queue.ts 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246
  1. import * as Bull from 'bull'
  2. import { jobStates } from '@server/helpers/custom-validators/jobs'
  3. import { processVideoRedundancy } from '@server/lib/job-queue/handlers/video-redundancy'
  4. import {
  5. ActivitypubFollowPayload,
  6. ActivitypubHttpBroadcastPayload,
  7. ActivitypubHttpFetcherPayload,
  8. ActivitypubHttpUnicastPayload,
  9. EmailPayload,
  10. JobState,
  11. JobType,
  12. RefreshPayload,
  13. VideoFileImportPayload,
  14. VideoImportPayload,
  15. VideoLiveEndingPayload,
  16. VideoRedundancyPayload,
  17. VideoTranscodingPayload
  18. } from '../../../shared/models'
  19. import { logger } from '../../helpers/logger'
  20. import { JOB_ATTEMPTS, JOB_COMPLETED_LIFETIME, JOB_CONCURRENCY, JOB_TTL, REPEAT_JOBS, WEBSERVER } from '../../initializers/constants'
  21. import { Redis } from '../redis'
  22. import { processActivityPubFollow } from './handlers/activitypub-follow'
  23. import { processActivityPubHttpBroadcast } from './handlers/activitypub-http-broadcast'
  24. import { processActivityPubHttpFetcher } from './handlers/activitypub-http-fetcher'
  25. import { processActivityPubHttpUnicast } from './handlers/activitypub-http-unicast'
  26. import { refreshAPObject } from './handlers/activitypub-refresher'
  27. import { processEmail } from './handlers/email'
  28. import { processVideoFileImport } from './handlers/video-file-import'
  29. import { processVideoImport } from './handlers/video-import'
  30. import { processVideoLiveEnding } from './handlers/video-live-ending'
  31. import { processVideoTranscoding } from './handlers/video-transcoding'
  32. import { processVideosViews } from './handlers/video-views'
  33. type CreateJobArgument =
  34. { type: 'activitypub-http-broadcast', payload: ActivitypubHttpBroadcastPayload } |
  35. { type: 'activitypub-http-unicast', payload: ActivitypubHttpUnicastPayload } |
  36. { type: 'activitypub-http-fetcher', payload: ActivitypubHttpFetcherPayload } |
  37. { type: 'activitypub-follow', payload: ActivitypubFollowPayload } |
  38. { type: 'video-file-import', payload: VideoFileImportPayload } |
  39. { type: 'video-transcoding', payload: VideoTranscodingPayload } |
  40. { type: 'email', payload: EmailPayload } |
  41. { type: 'video-import', payload: VideoImportPayload } |
  42. { type: 'activitypub-refresher', payload: RefreshPayload } |
  43. { type: 'videos-views', payload: {} } |
  44. { type: 'video-live-ending', payload: VideoLiveEndingPayload } |
  45. { type: 'video-redundancy', payload: VideoRedundancyPayload }
  46. type CreateJobOptions = {
  47. delay?: number
  48. }
  49. const handlers: { [id in JobType]: (job: Bull.Job) => Promise<any> } = {
  50. 'activitypub-http-broadcast': processActivityPubHttpBroadcast,
  51. 'activitypub-http-unicast': processActivityPubHttpUnicast,
  52. 'activitypub-http-fetcher': processActivityPubHttpFetcher,
  53. 'activitypub-follow': processActivityPubFollow,
  54. 'video-file-import': processVideoFileImport,
  55. 'video-transcoding': processVideoTranscoding,
  56. 'email': processEmail,
  57. 'video-import': processVideoImport,
  58. 'videos-views': processVideosViews,
  59. 'activitypub-refresher': refreshAPObject,
  60. 'video-live-ending': processVideoLiveEnding,
  61. 'video-redundancy': processVideoRedundancy
  62. }
  63. const jobTypes: JobType[] = [
  64. 'activitypub-follow',
  65. 'activitypub-http-broadcast',
  66. 'activitypub-http-fetcher',
  67. 'activitypub-http-unicast',
  68. 'email',
  69. 'video-transcoding',
  70. 'video-file-import',
  71. 'video-import',
  72. 'videos-views',
  73. 'activitypub-refresher',
  74. 'video-redundancy',
  75. 'video-live-ending'
  76. ]
  77. class JobQueue {
  78. private static instance: JobQueue
  79. private queues: { [id in JobType]?: Bull.Queue } = {}
  80. private initialized = false
  81. private jobRedisPrefix: string
  82. private constructor () {
  83. }
  84. init () {
  85. // Already initialized
  86. if (this.initialized === true) return
  87. this.initialized = true
  88. this.jobRedisPrefix = 'bull-' + WEBSERVER.HOST
  89. const queueOptions = {
  90. prefix: this.jobRedisPrefix,
  91. redis: Redis.getRedisClientOptions(),
  92. settings: {
  93. maxStalledCount: 10 // transcoding could be long, so jobs can often be interrupted by restarts
  94. }
  95. }
  96. for (const handlerName of Object.keys(handlers)) {
  97. const queue = new Bull(handlerName, queueOptions)
  98. const handler = handlers[handlerName]
  99. queue.process(JOB_CONCURRENCY[handlerName], handler)
  100. .catch(err => logger.error('Error in job queue processor %s.', handlerName, { err }))
  101. queue.on('failed', (job, err) => {
  102. logger.error('Cannot execute job %d in queue %s.', job.id, handlerName, { payload: job.data, err })
  103. })
  104. queue.on('error', err => {
  105. logger.error('Error in job queue %s.', handlerName, { err })
  106. })
  107. this.queues[handlerName] = queue
  108. }
  109. this.addRepeatableJobs()
  110. }
  111. terminate () {
  112. for (const queueName of Object.keys(this.queues)) {
  113. const queue = this.queues[queueName]
  114. queue.close()
  115. }
  116. }
  117. createJob (obj: CreateJobArgument, options: CreateJobOptions = {}): void {
  118. this.createJobWithPromise(obj, options)
  119. .catch(err => logger.error('Cannot create job.', { err, obj }))
  120. }
  121. createJobWithPromise (obj: CreateJobArgument, options: CreateJobOptions = {}) {
  122. const queue = this.queues[obj.type]
  123. if (queue === undefined) {
  124. logger.error('Unknown queue %s: cannot create job.', obj.type)
  125. return
  126. }
  127. const jobArgs: Bull.JobOptions = {
  128. backoff: { delay: 60 * 1000, type: 'exponential' },
  129. attempts: JOB_ATTEMPTS[obj.type],
  130. timeout: JOB_TTL[obj.type],
  131. delay: options.delay
  132. }
  133. return queue.add(obj.payload, jobArgs)
  134. }
  135. async listForApi (options: {
  136. state?: JobState
  137. start: number
  138. count: number
  139. asc?: boolean
  140. jobType: JobType
  141. }): Promise<Bull.Job[]> {
  142. const { state, start, count, asc, jobType } = options
  143. const states = state ? [ state ] : jobStates
  144. let results: Bull.Job[] = []
  145. const filteredJobTypes = this.filterJobTypes(jobType)
  146. for (const jobType of filteredJobTypes) {
  147. const queue = this.queues[jobType]
  148. if (queue === undefined) {
  149. logger.error('Unknown queue %s to list jobs.', jobType)
  150. continue
  151. }
  152. const jobs = await queue.getJobs(states, 0, start + count, asc)
  153. results = results.concat(jobs)
  154. }
  155. results.sort((j1: any, j2: any) => {
  156. if (j1.timestamp < j2.timestamp) return -1
  157. else if (j1.timestamp === j2.timestamp) return 0
  158. return 1
  159. })
  160. if (asc === false) results.reverse()
  161. return results.slice(start, start + count)
  162. }
  163. async count (state: JobState, jobType?: JobType): Promise<number> {
  164. const states = state ? [ state ] : jobStates
  165. let total = 0
  166. const filteredJobTypes = this.filterJobTypes(jobType)
  167. for (const type of filteredJobTypes) {
  168. const queue = this.queues[type]
  169. if (queue === undefined) {
  170. logger.error('Unknown queue %s to count jobs.', type)
  171. continue
  172. }
  173. const counts = await queue.getJobCounts()
  174. for (const s of states) {
  175. total += counts[s]
  176. }
  177. }
  178. return total
  179. }
  180. async removeOldJobs () {
  181. for (const key of Object.keys(this.queues)) {
  182. const queue = this.queues[key]
  183. await queue.clean(JOB_COMPLETED_LIFETIME, 'completed')
  184. }
  185. }
  186. private addRepeatableJobs () {
  187. this.queues['videos-views'].add({}, {
  188. repeat: REPEAT_JOBS['videos-views']
  189. }).catch(err => logger.error('Cannot add repeatable job.', { err }))
  190. }
  191. private filterJobTypes (jobType?: JobType) {
  192. if (!jobType) return jobTypes
  193. return jobTypes.filter(t => t === jobType)
  194. }
  195. static get Instance () {
  196. return this.instance || (this.instance = new this())
  197. }
  198. }
  199. // ---------------------------------------------------------------------------
  200. export {
  201. jobTypes,
  202. JobQueue
  203. }