peertube-socket.ts 4.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import { Server as HTTPServer } from 'http'
  2. import { Namespace, Server as SocketServer, Socket } from 'socket.io'
  3. import { isIdValid } from '@server/helpers/custom-validators/misc.js'
  4. import { Debounce } from '@server/helpers/debounce.js'
  5. import { MVideo, MVideoImmutable } from '@server/types/models/index.js'
  6. import { MRunner } from '@server/types/models/runners/index.js'
  7. import { UserNotificationModelForApi } from '@server/types/models/user/index.js'
  8. import { LiveVideoEventPayload, LiveVideoEventType } from '@peertube/peertube-models'
  9. import { logger } from '../helpers/logger.js'
  10. import { authenticateRunnerSocket, authenticateSocket } from '../middlewares/index.js'
  11. class PeerTubeSocket {
  12. private static instance: PeerTubeSocket
  13. private userNotificationSockets: { [ userId: number ]: Socket[] } = {}
  14. private liveVideosNamespace: Namespace
  15. private readonly runnerSockets = new Set<Socket>()
  16. private constructor () {}
  17. init (server: HTTPServer) {
  18. const io = new SocketServer(server)
  19. io.of('/user-notifications')
  20. .use(authenticateSocket)
  21. .on('connection', socket => {
  22. const userId = socket.handshake.auth.user.id
  23. logger.debug('User %d connected to the notification system.', userId)
  24. if (!this.userNotificationSockets[userId]) this.userNotificationSockets[userId] = []
  25. this.userNotificationSockets[userId].push(socket)
  26. socket.on('disconnect', () => {
  27. logger.debug('User %d disconnected from SocketIO notifications.', userId)
  28. this.userNotificationSockets[userId] = this.userNotificationSockets[userId].filter(s => s !== socket)
  29. })
  30. })
  31. this.liveVideosNamespace = io.of('/live-videos')
  32. .on('connection', socket => {
  33. socket.on('subscribe', params => {
  34. const videoId = params.videoId + ''
  35. if (!isIdValid(videoId)) return
  36. /* eslint-disable @typescript-eslint/no-floating-promises */
  37. socket.join(videoId)
  38. })
  39. socket.on('unsubscribe', params => {
  40. const videoId = params.videoId + ''
  41. if (!isIdValid(videoId)) return
  42. /* eslint-disable @typescript-eslint/no-floating-promises */
  43. socket.leave(videoId)
  44. })
  45. })
  46. io.of('/runners')
  47. .use(authenticateRunnerSocket)
  48. .on('connection', socket => {
  49. const runner: MRunner = socket.handshake.auth.runner
  50. logger.debug(`New runner "${runner.name}" connected to the notification system.`)
  51. this.runnerSockets.add(socket)
  52. socket.on('disconnect', () => {
  53. logger.debug(`Runner "${runner.name}" disconnected from the notification system.`)
  54. this.runnerSockets.delete(socket)
  55. })
  56. })
  57. }
  58. sendNotification (userId: number, notification: UserNotificationModelForApi) {
  59. const sockets = this.userNotificationSockets[userId]
  60. if (!sockets) return
  61. logger.debug('Sending user notification to user %d.', userId)
  62. const notificationMessage = notification.toFormattedJSON()
  63. for (const socket of sockets) {
  64. socket.emit('new-notification', notificationMessage)
  65. }
  66. }
  67. sendVideoLiveNewState (video: MVideo) {
  68. const data: LiveVideoEventPayload = { state: video.state }
  69. const type: LiveVideoEventType = 'state-change'
  70. logger.debug('Sending video live new state notification of %s.', video.url, { state: video.state })
  71. this.liveVideosNamespace
  72. .in(video.id + '')
  73. .emit(type, data)
  74. }
  75. sendVideoViewsUpdate (video: MVideoImmutable, numViewers: number) {
  76. const data: LiveVideoEventPayload = { viewers: numViewers }
  77. const type: LiveVideoEventType = 'views-change'
  78. logger.debug('Sending video live views update notification of %s.', video.url, { viewers: numViewers })
  79. this.liveVideosNamespace
  80. .in(video.id + '')
  81. .emit(type, data)
  82. }
  83. @Debounce({ timeoutMS: 1000 })
  84. sendAvailableJobsPingToRunners () {
  85. logger.debug(`Sending available-jobs notification to ${this.runnerSockets.size} runner sockets`)
  86. for (const runners of this.runnerSockets) {
  87. runners.emit('available-jobs')
  88. }
  89. }
  90. static get Instance () {
  91. return this.instance || (this.instance = new this())
  92. }
  93. }
  94. // ---------------------------------------------------------------------------
  95. export {
  96. PeerTubeSocket
  97. }