parent-process.ts 8.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220
  1. import { join } from 'path'
  2. import Piscina from 'piscina'
  3. import { JOB_CONCURRENCY, WORKER_THREADS } from '@server/initializers/constants.js'
  4. import type httpBroadcast from './workers/http-broadcast.js'
  5. import type downloadImage from './workers/image-downloader.js'
  6. import type processImage from './workers/image-processor.js'
  7. import type getImageSize from './workers/get-image-size.js'
  8. import type signJsonLDObject from './workers/sign-json-ld-object.js'
  9. import type buildDigest from './workers/build-digest.js'
  10. import type httpUnicast from './workers/http-unicast.js'
  11. import { logger } from '@server/helpers/logger.js'
  12. let downloadImageWorker: Piscina
  13. export function downloadImageFromWorker (options: Parameters<typeof downloadImage>[0]): Promise<ReturnType<typeof downloadImage>> {
  14. if (!downloadImageWorker) {
  15. downloadImageWorker = new Piscina({
  16. filename: new URL(join('workers', 'image-downloader.js'), import.meta.url).href,
  17. concurrentTasksPerWorker: WORKER_THREADS.DOWNLOAD_IMAGE.CONCURRENCY,
  18. maxThreads: WORKER_THREADS.DOWNLOAD_IMAGE.MAX_THREADS,
  19. minThreads: 1
  20. })
  21. downloadImageWorker.on('error', err => logger.error('Error in download image worker', { err }))
  22. }
  23. return downloadImageWorker.run(options)
  24. }
  25. // ---------------------------------------------------------------------------
  26. let processImageWorker: Piscina
  27. export function processImageFromWorker (options: Parameters<typeof processImage>[0]): Promise<ReturnType<typeof processImage>> {
  28. if (!processImageWorker) {
  29. processImageWorker = new Piscina({
  30. filename: new URL(join('workers', 'image-processor.js'), import.meta.url).href,
  31. concurrentTasksPerWorker: WORKER_THREADS.PROCESS_IMAGE.CONCURRENCY,
  32. maxThreads: WORKER_THREADS.PROCESS_IMAGE.MAX_THREADS,
  33. minThreads: 1
  34. })
  35. processImageWorker.on('error', err => logger.error('Error in process image worker', { err }))
  36. }
  37. return processImageWorker.run(options)
  38. }
  39. // ---------------------------------------------------------------------------
  40. let getImageSizeWorker: Piscina
  41. export function getImageSizeFromWorker (options: Parameters<typeof getImageSize>[0]): Promise<ReturnType<typeof getImageSize>> {
  42. if (!getImageSizeWorker) {
  43. getImageSizeWorker = new Piscina({
  44. filename: new URL(join('workers', 'get-image-size.js'), import.meta.url).href,
  45. concurrentTasksPerWorker: WORKER_THREADS.GET_IMAGE_SIZE.CONCURRENCY,
  46. maxThreads: WORKER_THREADS.GET_IMAGE_SIZE.MAX_THREADS,
  47. minThreads: 1
  48. })
  49. getImageSizeWorker.on('error', err => logger.error('Error in get image size worker', { err }))
  50. }
  51. return getImageSizeWorker.run(options)
  52. }
  53. // ---------------------------------------------------------------------------
  54. let parallelHTTPBroadcastWorker: Piscina
  55. export function parallelHTTPBroadcastFromWorker (options: Parameters<typeof httpBroadcast>[0]): Promise<ReturnType<typeof httpBroadcast>> {
  56. if (!parallelHTTPBroadcastWorker) {
  57. parallelHTTPBroadcastWorker = new Piscina({
  58. filename: new URL(join('workers', 'http-broadcast.js'), import.meta.url).href,
  59. // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
  60. concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast-parallel'],
  61. maxThreads: 1,
  62. minThreads: 1
  63. })
  64. parallelHTTPBroadcastWorker.on('error', err => logger.error('Error in parallel HTTP broadcast worker', { err }))
  65. }
  66. return parallelHTTPBroadcastWorker.run(options)
  67. }
  68. // ---------------------------------------------------------------------------
  69. let sequentialHTTPBroadcastWorker: Piscina
  70. export function sequentialHTTPBroadcastFromWorker (
  71. options: Parameters<typeof httpBroadcast>[0]
  72. ): Promise<ReturnType<typeof httpBroadcast>> {
  73. if (!sequentialHTTPBroadcastWorker) {
  74. sequentialHTTPBroadcastWorker = new Piscina({
  75. filename: new URL(join('workers', 'http-broadcast.js'), import.meta.url).href,
  76. // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
  77. concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-broadcast'],
  78. maxThreads: 1,
  79. minThreads: 1
  80. })
  81. sequentialHTTPBroadcastWorker.on('error', err => logger.error('Error in sequential HTTP broadcast image worker', { err }))
  82. }
  83. return sequentialHTTPBroadcastWorker.run(options)
  84. }
  85. // ---------------------------------------------------------------------------
  86. let httpUnicastWorker: Piscina
  87. export function httpUnicastFromWorker (
  88. options: Parameters<typeof httpUnicast>[0]
  89. ): Promise<ReturnType<typeof httpUnicast>> {
  90. if (!httpUnicastWorker) {
  91. httpUnicastWorker = new Piscina({
  92. filename: new URL(join('workers', 'http-unicast.js'), import.meta.url).href,
  93. // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
  94. concurrentTasksPerWorker: JOB_CONCURRENCY['activitypub-http-unicast'],
  95. maxThreads: 1,
  96. minThreads: 1
  97. })
  98. httpUnicastWorker.on('error', err => logger.error('Error in HTTP unicast worker', { err }))
  99. }
  100. return httpUnicastWorker.run(options)
  101. }
  102. // ---------------------------------------------------------------------------
  103. let signJsonLDObjectWorker: Piscina
  104. export function signJsonLDObjectFromWorker <T> (
  105. options: Parameters<typeof signJsonLDObject<T>>[0]
  106. ): ReturnType<typeof signJsonLDObject<T>> {
  107. if (!signJsonLDObjectWorker) {
  108. signJsonLDObjectWorker = new Piscina({
  109. filename: new URL(join('workers', 'sign-json-ld-object.js'), import.meta.url).href,
  110. concurrentTasksPerWorker: WORKER_THREADS.SIGN_JSON_LD_OBJECT.CONCURRENCY,
  111. maxThreads: WORKER_THREADS.SIGN_JSON_LD_OBJECT.MAX_THREADS,
  112. minThreads: 1
  113. })
  114. signJsonLDObjectWorker.on('error', err => logger.error('Error in sign JSONLD object worker', { err }))
  115. }
  116. return signJsonLDObjectWorker.run(options)
  117. }
  118. // ---------------------------------------------------------------------------
  119. let buildDigestWorker: Piscina
  120. export function buildDigestFromWorker (
  121. options: Parameters<typeof buildDigest>[0]
  122. ): Promise<ReturnType<typeof buildDigest>> {
  123. if (!buildDigestWorker) {
  124. buildDigestWorker = new Piscina({
  125. filename: new URL(join('workers', 'build-digest.js'), import.meta.url).href,
  126. // Keep it sync with job concurrency so the worker will accept all the requests sent by the parallelized jobs
  127. concurrentTasksPerWorker: WORKER_THREADS.BUILD_DIGEST.CONCURRENCY,
  128. maxThreads: WORKER_THREADS.BUILD_DIGEST.MAX_THREADS,
  129. minThreads: 1
  130. })
  131. buildDigestWorker.on('error', err => logger.error('Error in build digest worker', { err }))
  132. }
  133. return buildDigestWorker.run(options)
  134. }
  135. // ---------------------------------------------------------------------------
  136. export function getWorkersStats () {
  137. return [
  138. {
  139. label: 'downloadImage',
  140. queueSize: downloadImageWorker?.queueSize || 0,
  141. completed: downloadImageWorker?.completed || 0
  142. },
  143. {
  144. label: 'processImageWorker',
  145. queueSize: processImageWorker?.queueSize || 0,
  146. completed: processImageWorker?.completed || 0
  147. },
  148. {
  149. label: 'getImageSizeWorker',
  150. queueSize: getImageSizeWorker?.queueSize || 0,
  151. completed: getImageSizeWorker?.completed || 0
  152. },
  153. {
  154. label: 'parallelHTTPBroadcastWorker',
  155. queueSize: parallelHTTPBroadcastWorker?.queueSize || 0,
  156. completed: parallelHTTPBroadcastWorker?.completed || 0
  157. },
  158. {
  159. label: 'sequentialHTTPBroadcastWorker',
  160. queueSize: sequentialHTTPBroadcastWorker?.queueSize || 0,
  161. completed: sequentialHTTPBroadcastWorker?.completed || 0
  162. },
  163. {
  164. label: 'httpUnicastWorker',
  165. queueSize: httpUnicastWorker?.queueSize || 0,
  166. completed: httpUnicastWorker?.completed || 0
  167. },
  168. {
  169. label: 'signJsonLDObjectWorker',
  170. queueSize: signJsonLDObjectWorker?.queueSize || 0,
  171. completed: signJsonLDObjectWorker?.completed || 0
  172. },
  173. {
  174. label: 'buildDigestWorker',
  175. queueSize: buildDigestWorker?.queueSize || 0,
  176. completed: buildDigestWorker?.completed || 0
  177. }
  178. ]
  179. }