muxing-session.ts 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540
  1. import Bluebird from 'bluebird'
  2. import { FSWatcher, watch } from 'chokidar'
  3. import { EventEmitter } from 'events'
  4. import { ensureDir } from 'fs-extra/esm'
  5. import { appendFile, readFile, stat } from 'fs/promises'
  6. import memoizee from 'memoizee'
  7. import PQueue from 'p-queue'
  8. import { basename, join } from 'path'
  9. import { computeOutputFPS } from '@server/helpers/ffmpeg/index.js'
  10. import { logger, loggerTagsFactory, LoggerTagsFn } from '@server/helpers/logger.js'
  11. import { CONFIG } from '@server/initializers/config.js'
  12. import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants.js'
  13. import { removeHLSFileObjectStorageByPath, storeHLSFileFromContent, storeHLSFileFromPath } from '@server/lib/object-storage/index.js'
  14. import { VideoFileModel } from '@server/models/video/video-file.js'
  15. import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist.js'
  16. import { MStreamingPlaylistVideo, MUserId, MVideoLiveVideo } from '@server/types/models/index.js'
  17. import { LiveVideoError, FileStorage, VideoStreamingPlaylistType } from '@peertube/peertube-models'
  18. import {
  19. generateHLSMasterPlaylistFilename,
  20. generateHlsSha256SegmentsFilename,
  21. getLiveDirectory,
  22. getLiveReplayBaseDirectory
  23. } from '../../paths.js'
  24. import { isUserQuotaValid } from '../../user.js'
  25. import { LiveQuotaStore } from '../live-quota-store.js'
  26. import { LiveSegmentShaStore } from '../live-segment-sha-store.js'
  27. import { buildConcatenatedName, getLiveSegmentTime } from '../live-utils.js'
  28. import { AbstractTranscodingWrapper, FFmpegTranscodingWrapper, RemoteTranscodingWrapper } from './transcoding-wrapper/index.js'
  29. import { wait } from '@peertube/peertube-core-utils'
  30. interface MuxingSessionEvents {
  31. 'live-ready': (options: { videoUUID: string }) => void
  32. 'bad-socket-health': (options: { videoUUID: string }) => void
  33. 'duration-exceeded': (options: { videoUUID: string }) => void
  34. 'quota-exceeded': (options: { videoUUID: string }) => void
  35. 'transcoding-end': (options: { videoUUID: string }) => void
  36. 'transcoding-error': (options: { videoUUID: string }) => void
  37. 'after-cleanup': (options: { videoUUID: string }) => void
  38. }
  39. declare interface MuxingSession {
  40. on<U extends keyof MuxingSessionEvents>(
  41. event: U, listener: MuxingSessionEvents[U]
  42. ): this
  43. emit<U extends keyof MuxingSessionEvents>(
  44. event: U, ...args: Parameters<MuxingSessionEvents[U]>
  45. ): boolean
  46. }
  47. class MuxingSession extends EventEmitter {
  48. private transcodingWrapper: AbstractTranscodingWrapper
  49. private readonly context: any
  50. private readonly user: MUserId
  51. private readonly sessionId: string
  52. private readonly videoLive: MVideoLiveVideo
  53. private readonly inputLocalUrl: string
  54. private readonly inputPublicUrl: string
  55. private readonly fps: number
  56. private readonly allResolutions: number[]
  57. private readonly bitrate: number
  58. private readonly ratio: number
  59. private readonly hasAudio: boolean
  60. private readonly videoUUID: string
  61. private readonly saveReplay: boolean
  62. private readonly outDirectory: string
  63. private readonly replayDirectory: string
  64. private readonly lTags: LoggerTagsFn
  65. // Path -> Queue
  66. private readonly objectStorageSendQueues = new Map<string, PQueue>()
  67. private segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
  68. private streamingPlaylist: MStreamingPlaylistVideo
  69. private liveSegmentShaStore: LiveSegmentShaStore
  70. private filesWatcher: FSWatcher
  71. private masterPlaylistCreated = false
  72. private liveReady = false
  73. private aborted = false
  74. private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
  75. return isUserQuotaValid({ userId, uploadSize: 1000 })
  76. }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
  77. private readonly hasClientSocketInBadHealthWithCache = memoizee((sessionId: string) => {
  78. return this.hasClientSocketInBadHealth(sessionId)
  79. }, { maxAge: MEMOIZE_TTL.LIVE_CHECK_SOCKET_HEALTH })
  80. constructor (options: {
  81. context: any
  82. user: MUserId
  83. sessionId: string
  84. videoLive: MVideoLiveVideo
  85. inputLocalUrl: string
  86. inputPublicUrl: string
  87. fps: number
  88. bitrate: number
  89. ratio: number
  90. allResolutions: number[]
  91. hasAudio: boolean
  92. }) {
  93. super()
  94. this.context = options.context
  95. this.user = options.user
  96. this.sessionId = options.sessionId
  97. this.videoLive = options.videoLive
  98. this.inputLocalUrl = options.inputLocalUrl
  99. this.inputPublicUrl = options.inputPublicUrl
  100. this.fps = options.fps
  101. this.bitrate = options.bitrate
  102. this.ratio = options.ratio
  103. this.hasAudio = options.hasAudio
  104. this.allResolutions = options.allResolutions
  105. this.videoUUID = this.videoLive.Video.uuid
  106. this.saveReplay = this.videoLive.saveReplay
  107. this.outDirectory = getLiveDirectory(this.videoLive.Video)
  108. this.replayDirectory = join(getLiveReplayBaseDirectory(this.videoLive.Video), new Date().toISOString())
  109. this.lTags = loggerTagsFactory('live', this.sessionId, this.videoUUID)
  110. }
  111. async runMuxing () {
  112. this.streamingPlaylist = await this.createLivePlaylist()
  113. this.createLiveShaStore()
  114. this.createFiles()
  115. await this.prepareDirectories()
  116. this.transcodingWrapper = this.buildTranscodingWrapper()
  117. this.transcodingWrapper.on('end', () => this.onTranscodedEnded())
  118. this.transcodingWrapper.on('error', () => this.onTranscodingError())
  119. await this.transcodingWrapper.run()
  120. this.filesWatcher = watch(this.outDirectory, { depth: 0 })
  121. this.watchMasterFile()
  122. this.watchTSFiles()
  123. }
  124. abort () {
  125. if (!this.transcodingWrapper) return
  126. this.aborted = true
  127. this.transcodingWrapper.abort()
  128. }
  129. destroy () {
  130. this.removeAllListeners()
  131. this.isAbleToUploadVideoWithCache.clear()
  132. this.hasClientSocketInBadHealthWithCache.clear()
  133. }
  134. private watchMasterFile () {
  135. this.filesWatcher.on('add', async path => {
  136. if (path !== join(this.outDirectory, this.streamingPlaylist.playlistFilename)) return
  137. if (this.masterPlaylistCreated === true) return
  138. try {
  139. if (this.streamingPlaylist.storage === FileStorage.OBJECT_STORAGE) {
  140. let masterContent = await readFile(path, 'utf-8')
  141. // If the disk sync is slow, don't upload an empty master playlist on object storage
  142. // Wait for ffmpeg to correctly fill it
  143. while (!masterContent) {
  144. await wait(100)
  145. masterContent = await readFile(path, 'utf-8')
  146. }
  147. logger.debug('Uploading live master playlist on object storage for %s', this.videoUUID, { masterContent, ...this.lTags() })
  148. const url = await storeHLSFileFromContent(this.streamingPlaylist, this.streamingPlaylist.playlistFilename, masterContent)
  149. this.streamingPlaylist.playlistUrl = url
  150. }
  151. this.streamingPlaylist.assignP2PMediaLoaderInfoHashes(this.videoLive.Video, this.allResolutions)
  152. await this.streamingPlaylist.save()
  153. } catch (err) {
  154. logger.error('Cannot update streaming playlist.', { err, ...this.lTags() })
  155. }
  156. this.masterPlaylistCreated = true
  157. logger.info('Master playlist file for %s has been created', this.videoUUID, this.lTags())
  158. })
  159. }
  160. private watchTSFiles () {
  161. const startStreamDateTime = new Date().getTime()
  162. const addHandler = async (segmentPath: string) => {
  163. if (segmentPath.endsWith('.ts') !== true) return
  164. logger.debug('Live add handler of TS file %s.', segmentPath, this.lTags())
  165. const playlistId = this.getPlaylistIdFromTS(segmentPath)
  166. const segmentsToProcess = this.segmentsToProcessPerPlaylist[playlistId] || []
  167. this.processSegments(segmentsToProcess)
  168. this.segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
  169. if (this.hasClientSocketInBadHealthWithCache(this.sessionId)) {
  170. this.emit('bad-socket-health', { videoUUID: this.videoUUID })
  171. return
  172. }
  173. // Duration constraint check
  174. if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
  175. this.emit('duration-exceeded', { videoUUID: this.videoUUID })
  176. return
  177. }
  178. // Check user quota if the user enabled replay saving
  179. if (await this.isQuotaExceeded(segmentPath) === true) {
  180. this.emit('quota-exceeded', { videoUUID: this.videoUUID })
  181. }
  182. }
  183. const deleteHandler = async (segmentPath: string) => {
  184. if (segmentPath.endsWith('.ts') !== true) return
  185. logger.debug('Live delete handler of TS file %s.', segmentPath, this.lTags())
  186. try {
  187. await this.liveSegmentShaStore.removeSegmentSha(segmentPath)
  188. } catch (err) {
  189. logger.warn('Cannot remove segment sha %s from sha store', segmentPath, { err, ...this.lTags() })
  190. }
  191. if (this.streamingPlaylist.storage === FileStorage.OBJECT_STORAGE) {
  192. try {
  193. await removeHLSFileObjectStorageByPath(this.streamingPlaylist, segmentPath)
  194. } catch (err) {
  195. logger.error('Cannot remove segment %s from object storage', segmentPath, { err, ...this.lTags() })
  196. }
  197. }
  198. }
  199. this.filesWatcher.on('add', p => addHandler(p))
  200. this.filesWatcher.on('unlink', p => deleteHandler(p))
  201. }
  202. private async isQuotaExceeded (segmentPath: string) {
  203. if (this.saveReplay !== true) return false
  204. if (this.aborted) return false
  205. try {
  206. const segmentStat = await stat(segmentPath)
  207. LiveQuotaStore.Instance.addQuotaTo(this.user.id, this.sessionId, segmentStat.size)
  208. const canUpload = await this.isAbleToUploadVideoWithCache(this.user.id)
  209. return canUpload !== true
  210. } catch (err) {
  211. logger.error('Cannot stat %s or check quota of %d.', segmentPath, this.user.id, { err, ...this.lTags() })
  212. }
  213. }
  214. private createFiles () {
  215. for (let i = 0; i < this.allResolutions.length; i++) {
  216. const resolution = this.allResolutions[i]
  217. const file = new VideoFileModel({
  218. resolution,
  219. size: -1,
  220. extname: '.ts',
  221. infoHash: null,
  222. fps: this.fps,
  223. storage: this.streamingPlaylist.storage,
  224. videoStreamingPlaylistId: this.streamingPlaylist.id
  225. })
  226. VideoFileModel.customUpsert(file, 'streaming-playlist', null)
  227. .catch(err => logger.error('Cannot create file for live streaming.', { err, ...this.lTags() }))
  228. }
  229. }
  230. private async prepareDirectories () {
  231. await ensureDir(this.outDirectory)
  232. if (this.videoLive.saveReplay === true) {
  233. await ensureDir(this.replayDirectory)
  234. }
  235. }
  236. private isDurationConstraintValid (streamingStartTime: number) {
  237. const maxDuration = CONFIG.LIVE.MAX_DURATION
  238. // No limit
  239. if (maxDuration < 0) return true
  240. const now = new Date().getTime()
  241. const max = streamingStartTime + maxDuration
  242. return now <= max
  243. }
  244. private processSegments (segmentPaths: string[]) {
  245. Bluebird.mapSeries(segmentPaths, previousSegment => this.processSegment(previousSegment))
  246. .catch(err => {
  247. if (this.aborted) return
  248. logger.error('Cannot process segments', { err, ...this.lTags() })
  249. })
  250. }
  251. private async processSegment (segmentPath: string) {
  252. // Add sha hash of previous segments, because ffmpeg should have finished generating them
  253. await this.liveSegmentShaStore.addSegmentSha(segmentPath)
  254. if (this.saveReplay) {
  255. await this.addSegmentToReplay(segmentPath)
  256. }
  257. if (this.streamingPlaylist.storage === FileStorage.OBJECT_STORAGE) {
  258. try {
  259. await storeHLSFileFromPath(this.streamingPlaylist, segmentPath)
  260. await this.processM3U8ToObjectStorage(segmentPath)
  261. } catch (err) {
  262. logger.error('Cannot store TS segment %s in object storage', segmentPath, { err, ...this.lTags() })
  263. }
  264. }
  265. // Master playlist and segment JSON file are created, live is ready
  266. if (this.masterPlaylistCreated && !this.liveReady) {
  267. this.liveReady = true
  268. this.emit('live-ready', { videoUUID: this.videoUUID })
  269. }
  270. }
  271. private async processM3U8ToObjectStorage (segmentPath: string) {
  272. const m3u8Path = join(this.outDirectory, this.getPlaylistNameFromTS(segmentPath))
  273. logger.debug('Process M3U8 file %s.', m3u8Path, this.lTags())
  274. const segmentName = basename(segmentPath)
  275. const playlistContent = await readFile(m3u8Path, 'utf-8')
  276. // Remove new chunk references, that will be processed later
  277. const filteredPlaylistContent = playlistContent.substring(0, playlistContent.lastIndexOf(segmentName) + segmentName.length) + '\n'
  278. try {
  279. if (!this.objectStorageSendQueues.has(m3u8Path)) {
  280. this.objectStorageSendQueues.set(m3u8Path, new PQueue({ concurrency: 1 }))
  281. }
  282. const queue = this.objectStorageSendQueues.get(m3u8Path)
  283. await queue.add(() => storeHLSFileFromContent(this.streamingPlaylist, m3u8Path, filteredPlaylistContent))
  284. } catch (err) {
  285. logger.error('Cannot store in object storage m3u8 file %s', m3u8Path, { err, ...this.lTags() })
  286. }
  287. }
  288. private onTranscodingError () {
  289. this.emit('transcoding-error', ({ videoUUID: this.videoUUID }))
  290. }
  291. private onTranscodedEnded () {
  292. this.emit('transcoding-end', ({ videoUUID: this.videoUUID }))
  293. logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', this.inputLocalUrl, this.lTags())
  294. setTimeout(() => {
  295. // Wait latest segments generation, and close watchers
  296. const promise = this.filesWatcher?.close() || Promise.resolve()
  297. promise
  298. .then(() => {
  299. // Process remaining segments hash
  300. for (const key of Object.keys(this.segmentsToProcessPerPlaylist)) {
  301. this.processSegments(this.segmentsToProcessPerPlaylist[key])
  302. }
  303. })
  304. .catch(err => {
  305. logger.error(
  306. 'Cannot close watchers of %s or process remaining hash segments.', this.outDirectory,
  307. { err, ...this.lTags() }
  308. )
  309. })
  310. this.emit('after-cleanup', { videoUUID: this.videoUUID })
  311. }, 1000)
  312. }
  313. private hasClientSocketInBadHealth (sessionId: string) {
  314. const rtmpSession = this.context.sessions.get(sessionId)
  315. if (!rtmpSession) {
  316. logger.warn('Cannot get session %s to check players socket health.', sessionId, this.lTags())
  317. return
  318. }
  319. for (const playerSessionId of rtmpSession.players) {
  320. const playerSession = this.context.sessions.get(playerSessionId)
  321. if (!playerSession) {
  322. logger.error('Cannot get player session %s to check socket health.', playerSession, this.lTags())
  323. continue
  324. }
  325. if (playerSession.socket.writableLength > VIDEO_LIVE.MAX_SOCKET_WAITING_DATA) {
  326. return true
  327. }
  328. }
  329. return false
  330. }
  331. private async addSegmentToReplay (segmentPath: string) {
  332. const segmentName = basename(segmentPath)
  333. const dest = join(this.replayDirectory, buildConcatenatedName(segmentName))
  334. try {
  335. const data = await readFile(segmentPath)
  336. await appendFile(dest, data)
  337. } catch (err) {
  338. logger.error('Cannot copy segment %s to replay directory.', segmentPath, { err, ...this.lTags() })
  339. }
  340. }
  341. private async createLivePlaylist (): Promise<MStreamingPlaylistVideo> {
  342. const playlist = await VideoStreamingPlaylistModel.loadOrGenerate(this.videoLive.Video)
  343. playlist.playlistFilename = generateHLSMasterPlaylistFilename(true)
  344. playlist.segmentsSha256Filename = generateHlsSha256SegmentsFilename(true)
  345. playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION
  346. playlist.type = VideoStreamingPlaylistType.HLS
  347. playlist.storage = CONFIG.OBJECT_STORAGE.ENABLED
  348. ? FileStorage.OBJECT_STORAGE
  349. : FileStorage.FILE_SYSTEM
  350. return playlist.save()
  351. }
  352. private createLiveShaStore () {
  353. this.liveSegmentShaStore = new LiveSegmentShaStore({
  354. videoUUID: this.videoLive.Video.uuid,
  355. sha256Path: join(this.outDirectory, this.streamingPlaylist.segmentsSha256Filename),
  356. streamingPlaylist: this.streamingPlaylist,
  357. sendToObjectStorage: CONFIG.OBJECT_STORAGE.ENABLED
  358. })
  359. }
  360. private buildTranscodingWrapper () {
  361. const options = {
  362. streamingPlaylist: this.streamingPlaylist,
  363. videoLive: this.videoLive,
  364. lTags: this.lTags,
  365. sessionId: this.sessionId,
  366. inputLocalUrl: this.inputLocalUrl,
  367. inputPublicUrl: this.inputPublicUrl,
  368. toTranscode: this.allResolutions.map(resolution => {
  369. let toTranscodeFPS: number
  370. try {
  371. toTranscodeFPS = computeOutputFPS({ inputFPS: this.fps, resolution })
  372. } catch (err) {
  373. err.liveVideoErrorCode = LiveVideoError.INVALID_INPUT_VIDEO_STREAM
  374. throw err
  375. }
  376. return {
  377. resolution,
  378. fps: toTranscodeFPS
  379. }
  380. }),
  381. fps: this.fps,
  382. bitrate: this.bitrate,
  383. ratio: this.ratio,
  384. hasAudio: this.hasAudio,
  385. segmentListSize: VIDEO_LIVE.SEGMENTS_LIST_SIZE,
  386. segmentDuration: getLiveSegmentTime(this.videoLive.latencyMode),
  387. outDirectory: this.outDirectory
  388. }
  389. return CONFIG.LIVE.TRANSCODING.ENABLED && CONFIG.LIVE.TRANSCODING.REMOTE_RUNNERS.ENABLED
  390. ? new RemoteTranscodingWrapper(options)
  391. : new FFmpegTranscodingWrapper(options)
  392. }
  393. private getPlaylistIdFromTS (segmentPath: string) {
  394. const playlistIdMatcher = /^([\d+])-/
  395. return basename(segmentPath).match(playlistIdMatcher)[1]
  396. }
  397. private getPlaylistNameFromTS (segmentPath: string) {
  398. return `${this.getPlaylistIdFromTS(segmentPath)}.m3u8`
  399. }
  400. }
  401. // ---------------------------------------------------------------------------
  402. export {
  403. MuxingSession
  404. }