live-socket-messages.ts 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. /* eslint-disable @typescript-eslint/no-unused-expressions,@typescript-eslint/require-await */
  2. import { wait } from '@peertube/peertube-core-utils'
  3. import { LiveVideoEventPayload, VideoPrivacy, VideoState, VideoStateType } from '@peertube/peertube-models'
  4. import {
  5. PeerTubeServer,
  6. cleanupTests,
  7. createMultipleServers,
  8. doubleFollow,
  9. setAccessTokensToServers,
  10. setDefaultVideoChannel,
  11. stopFfmpeg,
  12. waitJobs,
  13. waitUntilLivePublishedOnAllServers
  14. } from '@peertube/peertube-server-commands'
  15. import { expect } from 'chai'
  16. describe('Test live socket messages', function () {
  17. let servers: PeerTubeServer[] = []
  18. before(async function () {
  19. this.timeout(120000)
  20. servers = await createMultipleServers(2)
  21. // Get the access tokens
  22. await setAccessTokensToServers(servers)
  23. await setDefaultVideoChannel(servers)
  24. await servers[0].config.enableMinimumTranscoding()
  25. await servers[0].config.enableLive({ allowReplay: true, transcoding: false })
  26. // Server 1 and server 2 follow each other
  27. await doubleFollow(servers[0], servers[1])
  28. })
  29. describe('Live socket messages', function () {
  30. async function createLiveWrapper () {
  31. const liveAttributes = {
  32. name: 'live video',
  33. channelId: servers[0].store.channel.id,
  34. privacy: VideoPrivacy.PUBLIC
  35. }
  36. const { uuid } = await servers[0].live.create({ fields: liveAttributes })
  37. return uuid
  38. }
  39. it('Should correctly send a message when the live starts and ends', async function () {
  40. this.timeout(60000)
  41. const localStateChanges: VideoStateType[] = []
  42. const remoteStateChanges: VideoStateType[] = []
  43. const liveVideoUUID = await createLiveWrapper()
  44. await waitJobs(servers)
  45. {
  46. const videoId = await servers[0].videos.getId({ uuid: liveVideoUUID })
  47. const localSocket = servers[0].socketIO.getLiveNotificationSocket()
  48. localSocket.on('state-change', data => localStateChanges.push(data.state))
  49. localSocket.emit('subscribe', { videoId })
  50. }
  51. {
  52. const videoId = await servers[1].videos.getId({ uuid: liveVideoUUID })
  53. const remoteSocket = servers[1].socketIO.getLiveNotificationSocket()
  54. remoteSocket.on('state-change', data => remoteStateChanges.push(data.state))
  55. remoteSocket.emit('subscribe', { videoId })
  56. }
  57. const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
  58. await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
  59. await waitJobs(servers)
  60. // Ensure remote server doesn't send multiple times the state change event to viewers
  61. await servers[0].videos.update({ id: liveVideoUUID, attributes: { name: 'my new live name' } })
  62. await waitJobs(servers)
  63. for (const stateChanges of [ localStateChanges, remoteStateChanges ]) {
  64. expect(stateChanges).to.have.lengthOf(1)
  65. expect(stateChanges[0]).to.equal(VideoState.PUBLISHED)
  66. }
  67. await stopFfmpeg(ffmpegCommand)
  68. for (const server of servers) {
  69. await server.live.waitUntilEnded({ videoId: liveVideoUUID })
  70. }
  71. await waitJobs(servers)
  72. for (const stateChanges of [ localStateChanges, remoteStateChanges ]) {
  73. expect(stateChanges).to.have.length.at.least(2)
  74. expect(stateChanges[stateChanges.length - 1]).to.equal(VideoState.LIVE_ENDED)
  75. }
  76. })
  77. it('Should correctly send views change notification', async function () {
  78. this.timeout(60000)
  79. let localLastVideoViews = 0
  80. let remoteLastVideoViews = 0
  81. const liveVideoUUID = await createLiveWrapper()
  82. await waitJobs(servers)
  83. {
  84. const videoId = await servers[0].videos.getId({ uuid: liveVideoUUID })
  85. const localSocket = servers[0].socketIO.getLiveNotificationSocket()
  86. localSocket.on('views-change', (data: LiveVideoEventPayload) => { localLastVideoViews = data.viewers })
  87. localSocket.emit('subscribe', { videoId })
  88. }
  89. {
  90. const videoId = await servers[1].videos.getId({ uuid: liveVideoUUID })
  91. const remoteSocket = servers[1].socketIO.getLiveNotificationSocket()
  92. remoteSocket.on('views-change', (data: LiveVideoEventPayload) => { remoteLastVideoViews = data.viewers })
  93. remoteSocket.emit('subscribe', { videoId })
  94. }
  95. const ffmpegCommand = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
  96. await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
  97. await waitJobs(servers)
  98. expect(localLastVideoViews).to.equal(0)
  99. expect(remoteLastVideoViews).to.equal(0)
  100. await servers[0].views.simulateView({ id: liveVideoUUID })
  101. await servers[1].views.simulateView({ id: liveVideoUUID })
  102. await waitJobs(servers)
  103. expect(localLastVideoViews).to.equal(2)
  104. expect(remoteLastVideoViews).to.equal(2)
  105. await stopFfmpeg(ffmpegCommand)
  106. })
  107. it('Should not receive a notification after unsubscribe', async function () {
  108. this.timeout(120000)
  109. const stateChanges: VideoStateType[] = []
  110. const liveVideoUUID = await createLiveWrapper()
  111. await waitJobs(servers)
  112. const videoId = await servers[0].videos.getId({ uuid: liveVideoUUID })
  113. const socket = servers[0].socketIO.getLiveNotificationSocket()
  114. socket.on('state-change', data => stateChanges.push(data.state))
  115. socket.emit('subscribe', { videoId })
  116. const command = await servers[0].live.sendRTMPStreamInVideo({ videoId: liveVideoUUID })
  117. await waitUntilLivePublishedOnAllServers(servers, liveVideoUUID)
  118. await waitJobs(servers)
  119. // Notifier waits before sending a notification
  120. await wait(10000)
  121. expect(stateChanges).to.have.lengthOf(1)
  122. socket.emit('unsubscribe', { videoId })
  123. await stopFfmpeg(command)
  124. await waitJobs(servers)
  125. expect(stateChanges).to.have.lengthOf(1)
  126. })
  127. })
  128. after(async function () {
  129. await cleanupTests(servers)
  130. })
  131. })