muxing-session.ts 11 KB

  1. import * as Bluebird from 'bluebird'
  2. import * as chokidar from 'chokidar'
  3. import { FfmpegCommand } from 'fluent-ffmpeg'
  4. import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
  5. import { basename, join } from 'path'
  6. import { EventEmitter } from 'stream'
  7. import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
  8. import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger'
  9. import { CONFIG } from '@server/initializers/config'
  10. import { MEMOIZE_TTL, VIDEO_LIVE } from '@server/initializers/constants'
  11. import { VideoFileModel } from '@server/models/video/video-file'
  12. import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models'
  13. import { VideoTranscodingProfilesManager } from '../../transcoding/video-transcoding-profiles'
  14. import { isAbleToUploadVideo } from '../../user'
  15. import { getHLSDirectory } from '../../video-paths'
  16. import { LiveQuotaStore } from '../live-quota-store'
  17. import { LiveSegmentShaStore } from '../live-segment-sha-store'
  18. import { buildConcatenatedName } from '../live-utils'
  19. import memoizee = require('memoizee')
  20. interface MuxingSessionEvents {
  21. 'master-playlist-created': ({ videoId: number }) => void
  22. 'bad-socket-health': ({ videoId: number }) => void
  23. 'duration-exceeded': ({ videoId: number }) => void
  24. 'quota-exceeded': ({ videoId: number }) => void
  25. 'ffmpeg-end': ({ videoId: number }) => void
  26. 'ffmpeg-error': ({ sessionId: string }) => void
  27. 'after-cleanup': ({ videoId: number }) => void
  28. }
  29. declare interface MuxingSession {
  30. on<U extends keyof MuxingSessionEvents>(
  31. event: U, listener: MuxingSessionEvents[U]
  32. ): this
  33. emit<U extends keyof MuxingSessionEvents>(
  34. event: U, ...args: Parameters<MuxingSessionEvents[U]>
  35. ): boolean
  36. }
  37. class MuxingSession extends EventEmitter {
  38. private ffmpegCommand: FfmpegCommand
  39. private readonly context: any
  40. private readonly user: MUserId
  41. private readonly sessionId: string
  42. private readonly videoLive: MVideoLiveVideo
  43. private readonly streamingPlaylist: MStreamingPlaylistVideo
  44. private readonly rtmpUrl: string
  45. private readonly fps: number
  46. private readonly bitrate: number
  47. private readonly allResolutions: number[]
  48. private readonly videoId: number
  49. private readonly videoUUID: string
  50. private readonly saveReplay: boolean
  51. private readonly lTags: LoggerTagsFn
  52. private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
  53. private tsWatcher: chokidar.FSWatcher
  54. private masterWatcher: chokidar.FSWatcher
  55. private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
  56. return isAbleToUploadVideo(userId, 1000)
  58. private readonly hasClientSocketInBadHealthWithCache = memoizee((sessionId: string) => {
  59. return this.hasClientSocketInBadHealth(sessionId)
  61. constructor (options: {
  62. context: any
  63. user: MUserId
  64. sessionId: string
  65. videoLive: MVideoLiveVideo
  66. streamingPlaylist: MStreamingPlaylistVideo
  67. rtmpUrl: string
  68. fps: number
  69. bitrate: number
  70. allResolutions: number[]
  71. }) {
  72. super()
  73. this.context = options.context
  74. this.user = options.user
  75. this.sessionId = options.sessionId
  76. this.videoLive = options.videoLive
  77. this.streamingPlaylist = options.streamingPlaylist
  78. this.rtmpUrl = options.rtmpUrl
  79. this.fps = options.fps
  80. this.bitrate = options.bitrate
  81. this.allResolutions = options.allResolutions
  82. this.videoId =
  83. this.videoUUID = this.videoLive.Video.uuid
  84. this.saveReplay = this.videoLive.saveReplay
  85. this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
  86. }
  87. async runMuxing () {
  88. this.createFiles()
  89. const outPath = await this.prepareDirectories()
  90. this.ffmpegCommand = CONFIG.LIVE.TRANSCODING.ENABLED
  91. ? await getLiveTranscodingCommand({
  92. rtmpUrl: this.rtmpUrl,
  93. outPath,
  94. masterPlaylistName: this.streamingPlaylist.playlistFilename,
  95. resolutions: this.allResolutions,
  96. fps: this.fps,
  97. bitrate: this.bitrate,
  98. availableEncoders: VideoTranscodingProfilesManager.Instance.getAvailableEncoders(),
  100. })
  101. : getLiveMuxingCommand(this.rtmpUrl, outPath, this.streamingPlaylist.playlistFilename)
  102.'Running live muxing/transcoding for %s.', this.videoUUID, this.lTags)
  103. this.watchTSFiles(outPath)
  104. this.watchMasterFile(outPath)
  105. this.ffmpegCommand.on('error', (err, stdout, stderr) => {
  106. this.onFFmpegError(err, stdout, stderr, outPath)
  107. })
  108. this.ffmpegCommand.on('end', () => this.onFFmpegEnded(outPath))
  110. }
  111. abort () {
  112. if (!this.ffmpegCommand) return
  113. this.ffmpegCommand.kill('SIGINT')
  114. }
  115. destroy () {
  116. this.removeAllListeners()
  117. this.isAbleToUploadVideoWithCache.clear()
  118. this.hasClientSocketInBadHealthWithCache.clear()
  119. }
  120. private onFFmpegError (err: any, stdout: string, stderr: string, outPath: string) {
  121. this.onFFmpegEnded(outPath)
  122. // Don't care that we killed the ffmpeg process
  123. if (err?.message?.includes('Exiting normally')) return
  124. logger.error('Live transcoding error.', { err, stdout, stderr, ...this.lTags })
  125. this.emit('ffmpeg-error', ({ sessionId: this.sessionId }))
  126. }
  127. private onFFmpegEnded (outPath: string) {
  128.'RTMP transmuxing for video %s ended. Scheduling cleanup', this.rtmpUrl, this.lTags)
  129. setTimeout(() => {
  130. // Wait latest segments generation, and close watchers
  131. Promise.all([ this.tsWatcher.close(), this.masterWatcher.close() ])
  132. .then(() => {
  133. // Process remaining segments hash
  134. for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
  135. this.processSegments(outPath, this.segmentsToProcessPerPlaylist[key])
  136. }
  137. })
  138. .catch(err => {
  139. logger.error(
  140. 'Cannot close watchers of %s or process remaining hash segments.', outPath,
  141. { err, ...this.lTags }
  142. )
  143. })
  144. this.emit('after-cleanup', { videoId: this.videoId })
  145. }, 1000)
  146. }
  147. private watchMasterFile (outPath: string) {
  148. this.masterWatcher = + '/' + this.streamingPlaylist.playlistFilename)
  149. this.masterWatcher.on('add', async () => {
  150. this.emit('master-playlist-created', { videoId: this.videoId })
  151. this.masterWatcher.close()
  152. .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err, ...this.lTags }))
  153. })
  154. }
  155. private watchTSFiles (outPath: string) {
  156. const startStreamDateTime = new Date().getTime()
  157. this.tsWatcher = + '/*.ts')
  158. const playlistIdMatcher = /^([\d+])-/
  159. const addHandler = async segmentPath => {
  160. logger.debug('Live add handler of %s.', segmentPath, this.lTags)
  161. const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
  162. const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
  163. this.processSegments(outPath, segmentsToProcess)
  164. this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
  165. if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) {
  166. this.emit('bad-socket-health', { videoId: this.videoId })
  167. return
  168. }
  169. // Duration constraint check
  170. if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
  171. this.emit('duration-exceeded', { videoId: this.videoId })
  172. return
  173. }
  174. // Check user quota if the user enabled replay saving
  175. if (await this.isQuotaExceeded(segmentPath) === true) {
  176. this.emit('quota-exceeded', { videoId: this.videoId })
  177. }
  178. }
  179. const deleteHandler = segmentPath => LiveSegmentShaStore.Instance.removeSegmentSha(this.videoUUID, segmentPath)
  180. this.tsWatcher.on('add', p => addHandler(p))
  181. this.tsWatcher.on('unlink', p => deleteHandler(p))
  182. }
  183. private async isQuotaExceeded (segmentPath: string) {
  184. if (this.saveReplay !== true) return false
  185. try {
  186. const segmentStat = await stat(segmentPath)
  187. LiveQuotaStore.Instance.addQuotaTo(,, segmentStat.size)
  188. const canUpload = await this.isAbleToUploadVideoWithCache(
  189. return canUpload !== true
  190. } catch (err) {
  191. logger.error('Cannot stat %s or check quota of %d.', segmentPath,, { err, ...this.lTags })
  192. }
  193. }
  194. private createFiles () {
  195. for (let i = 0; i < this.allResolutions.length; i++) {
  196. const resolution = this.allResolutions[i]
  197. const file = new VideoFileModel({
  198. resolution,
  199. size: -1,
  200. extname: '.ts',
  201. infoHash: null,
  202. fps: this.fps,
  203. videoStreamingPlaylistId:
  204. })
  205. VideoFileModel.customUpsert(file, 'streaming-playlist', null)
  206. .catch(err => logger.error('Cannot create file for live streaming.', { err, ...this.lTags }))
  207. }
  208. }
  209. private async prepareDirectories () {
  210. const outPath = getHLSDirectory(this.videoLive.Video)
  211. await ensureDir(outPath)
  212. const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
  213. if (this.videoLive.saveReplay === true) {
  214. await ensureDir(replayDirectory)
  215. }
  216. return outPath
  217. }
  218. private isDurationConstraintValid (streamingStartTime: number) {
  219. const maxDuration = CONFIG.LIVE.MAX_DURATION
  220. // No limit
  221. if (maxDuration < 0) return true
  222. const now = new Date().getTime()
  223. const max = streamingStartTime + maxDuration
  224. return now <= max
  225. }
  226. private processSegments (hlsVideoPath: string, segmentPaths: string[]) {
  227. Bluebird.mapSeries(segmentPaths, async previousSegment => {
  228. // Add sha hash of previous segments, because ffmpeg should have finished generating them
  229. await LiveSegmentShaStore.Instance.addSegmentSha(this.videoUUID, previousSegment)
  230. if (this.saveReplay) {
  231. await this.addSegmentToReplay(hlsVideoPath, previousSegment)
  232. }
  233. }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err, ...this.lTags }))
  234. }
  235. private hasClientSocketInBadHealth (sessionId: string) {
  236. const rtmpSession = this.context.sessions.get(sessionId)
  237. if (!rtmpSession) {
  238. logger.warn('Cannot get session %s to check players socket health.', sessionId, this.lTags)
  239. return
  240. }
  241. for (const playerSessionId of rtmpSession.players) {
  242. const playerSession = this.context.sessions.get(playerSessionId)
  243. if (!playerSession) {
  244. logger.error('Cannot get player session %s to check socket health.', playerSession, this.lTags)
  245. continue
  246. }
  247. if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) {
  248. return true
  249. }
  250. }
  251. return false
  252. }
  253. private async addSegmentToReplay (hlsVideoPath: string, segmentPath: string) {
  254. const segmentName = basename(segmentPath)
  255. const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, buildConcatenatedName(segmentName))
  256. try {
  257. const data = await readFile(segmentPath)
  258. await appendFile(dest, data)
  259. } catch (err) {
  260. logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags })
  261. }
  262. }
  263. }
  264. // ---------------------------------------------------------------------------
  265. export {
  266. MuxingSession
  267. }