inbox-manager.ts 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455
  1. import { AsyncQueue, queue } from 'async'
  2. import { logger } from '@server/helpers/logger'
  3. import { SCHEDULER_INTERVALS_MS } from '@server/initializers/constants'
  4. import { MActorDefault, MActorSignature } from '@server/types/models'
  5. import { Activity } from '@shared/models'
  6. import { processActivities } from './process'
  7. import { StatsManager } from '../stat-manager'
  8. type QueueParam = {
  9. activities: Activity[]
  10. signatureActor?: MActorSignature
  11. inboxActor?: MActorDefault
  12. }
  13. class InboxManager {
  14. private static instance: InboxManager
  15. private readonly inboxQueue: AsyncQueue<QueueParam>
  16. private messagesProcessed = 0
  17. private constructor () {
  18. this.inboxQueue = queue<QueueParam, Error>((task, cb) => {
  19. const options = { signatureActor: task.signatureActor, inboxActor: task.inboxActor }
  20. this.messagesProcessed++
  21. processActivities(task.activities, options)
  22. .then(() => cb())
  23. .catch(err => {
  24. logger.error('Error in process activities.', { err })
  25. cb()
  26. })
  27. })
  28. setInterval(() => {
  29. StatsManager.Instance.updateInboxStats(this.messagesProcessed, this.inboxQueue.length())
  30. }, SCHEDULER_INTERVALS_MS.updateInboxStats)
  31. }
  32. addInboxMessage (options: QueueParam) {
  33. this.inboxQueue.push(options)
  34. }
  35. static get Instance () {
  36. return this.instance || (this.instance = new this())
  37. }
  38. }
  39. // ---------------------------------------------------------------------------
  40. export {
  41. InboxManager
  42. }