jobs.ts 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import { Job as BullJob } from 'bullmq'
  2. import express from 'express'
  3. import { HttpStatusCode, Job, JobState, JobType, ResultList, UserRight } from '@peertube/peertube-models'
  4. import { isArray } from '../../helpers/custom-validators/misc.js'
  5. import { JobQueue } from '../../lib/job-queue/index.js'
  6. import {
  7. apiRateLimiter,
  8. asyncMiddleware,
  9. authenticate,
  10. ensureUserHasRight,
  11. jobsSortValidator,
  12. openapiOperationDoc,
  13. paginationValidatorBuilder,
  14. setDefaultPagination,
  15. setDefaultSort
  16. } from '../../middlewares/index.js'
  17. import { listJobsValidator } from '../../middlewares/validators/jobs.js'
  18. const jobsRouter = express.Router()
  19. jobsRouter.use(apiRateLimiter)
  20. jobsRouter.post('/pause',
  21. authenticate,
  22. ensureUserHasRight(UserRight.MANAGE_JOBS),
  23. asyncMiddleware(pauseJobQueue)
  24. )
  25. jobsRouter.post('/resume',
  26. authenticate,
  27. ensureUserHasRight(UserRight.MANAGE_JOBS),
  28. resumeJobQueue
  29. )
  30. jobsRouter.get('/:state?',
  31. openapiOperationDoc({ operationId: 'getJobs' }),
  32. authenticate,
  33. ensureUserHasRight(UserRight.MANAGE_JOBS),
  34. paginationValidatorBuilder([ 'jobs' ]),
  35. jobsSortValidator,
  36. setDefaultSort,
  37. setDefaultPagination,
  38. listJobsValidator,
  39. asyncMiddleware(listJobs)
  40. )
  41. // ---------------------------------------------------------------------------
  42. export {
  43. jobsRouter
  44. }
  45. // ---------------------------------------------------------------------------
  46. async function pauseJobQueue (req: express.Request, res: express.Response) {
  47. await JobQueue.Instance.pause()
  48. return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
  49. }
  50. function resumeJobQueue (req: express.Request, res: express.Response) {
  51. JobQueue.Instance.resume()
  52. return res.sendStatus(HttpStatusCode.NO_CONTENT_204)
  53. }
  54. async function listJobs (req: express.Request, res: express.Response) {
  55. const state = req.params.state as JobState
  56. const asc = req.query.sort === 'createdAt'
  57. const jobType = req.query.jobType
  58. const jobs = await JobQueue.Instance.listForApi({
  59. state,
  60. start: req.query.start,
  61. count: req.query.count,
  62. asc,
  63. jobType
  64. })
  65. const total = await JobQueue.Instance.count(state, jobType)
  66. const result: ResultList<Job> = {
  67. total,
  68. data: await Promise.all(jobs.map(j => formatJob(j, state)))
  69. }
  70. return res.json(result)
  71. }
  72. async function formatJob (job: BullJob, state?: JobState): Promise<Job> {
  73. const error = isArray(job.stacktrace) && job.stacktrace.length !== 0
  74. ? job.stacktrace[0]
  75. : null
  76. return {
  77. id: job.id,
  78. state: state || await job.getState(),
  79. type: job.queueName as JobType,
  80. data: job.data,
  81. parent: job.parent
  82. ? { id: job.parent.id }
  83. : undefined,
  84. progress: job.progress as number,
  85. priority: job.opts.priority,
  86. error,
  87. createdAt: new Date(job.timestamp),
  88. finishedOn: new Date(job.finishedOn),
  89. processedOn: new Date(job.processedOn)
  90. }
  91. }