jobs.ts 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. import * as request from 'supertest'
  2. import { HttpStatusCode } from '../../../shared/core-utils/miscs/http-error-codes'
  3. import { makeGetRequest } from '../../../shared/extra-utils'
  4. import { Job, JobState, JobType } from '../../models'
  5. import { wait } from '../miscs/miscs'
  6. import { ServerInfo } from './servers'
  7. function buildJobsUrl (state?: JobState) {
  8. let path = '/api/v1/jobs'
  9. if (state) path += '/' + state
  10. return path
  11. }
  12. function getJobsList (url: string, accessToken: string, state?: JobState) {
  13. const path = buildJobsUrl(state)
  14. return request(url)
  15. .get(path)
  16. .set('Accept', 'application/json')
  17. .set('Authorization', 'Bearer ' + accessToken)
  18. .expect(HttpStatusCode.OK_200)
  19. .expect('Content-Type', /json/)
  20. }
  21. function getJobsListPaginationAndSort (options: {
  22. url: string
  23. accessToken: string
  24. start: number
  25. count: number
  26. sort: string
  27. state?: JobState
  28. jobType?: JobType
  29. }) {
  30. const { url, accessToken, state, start, count, sort, jobType } = options
  31. const path = buildJobsUrl(state)
  32. const query = {
  33. start,
  34. count,
  35. sort,
  36. jobType
  37. }
  38. return makeGetRequest({
  39. url,
  40. path,
  41. token: accessToken,
  42. statusCodeExpected: HttpStatusCode.OK_200,
  43. query
  44. })
  45. }
  46. async function waitJobs (serversArg: ServerInfo[] | ServerInfo) {
  47. const pendingJobWait = process.env.NODE_PENDING_JOB_WAIT ? parseInt(process.env.NODE_PENDING_JOB_WAIT, 10) : 2000
  48. let servers: ServerInfo[]
  49. if (Array.isArray(serversArg) === false) servers = [ serversArg as ServerInfo ]
  50. else servers = serversArg as ServerInfo[]
  51. const states: JobState[] = [ 'waiting', 'active', 'delayed' ]
  52. let pendingRequests: boolean
  53. function tasksBuilder () {
  54. const tasks: Promise<any>[] = []
  55. // Check if each server has pending request
  56. for (const server of servers) {
  57. for (const state of states) {
  58. const p = getJobsListPaginationAndSort({
  59. url: server.url,
  60. accessToken: server.accessToken,
  61. state: state,
  62. start: 0,
  63. count: 10,
  64. sort: '-createdAt'
  65. })
  66. .then(res => res.body.data)
  67. .then((jobs: Job[]) => jobs.filter(j => j.type !== 'videos-views'))
  68. .then(jobs => {
  69. if (jobs.length !== 0) {
  70. pendingRequests = true
  71. }
  72. })
  73. tasks.push(p)
  74. }
  75. }
  76. return tasks
  77. }
  78. do {
  79. pendingRequests = false
  80. await Promise.all(tasksBuilder())
  81. // Retry, in case of new jobs were created
  82. if (pendingRequests === false) {
  83. await wait(pendingJobWait)
  84. await Promise.all(tasksBuilder())
  85. }
  86. if (pendingRequests) {
  87. await wait(1000)
  88. }
  89. } while (pendingRequests)
  90. }
  91. // ---------------------------------------------------------------------------
  92. export {
  93. getJobsList,
  94. waitJobs,
  95. getJobsListPaginationAndSort
  96. }