live-manager.ts 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516
  1. import { readdir, readFile } from 'fs-extra'
  2. import { createServer, Server } from 'net'
  3. import { join } from 'path'
  4. import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
  5. import {
  6. computeResolutionsToTranscode,
  7. ffprobePromise,
  8. getLiveSegmentTime,
  9. getVideoStreamBitrate,
  10. getVideoStreamDimensionsInfo,
  11. getVideoStreamFPS
  12. } from '@server/helpers/ffmpeg'
  13. import { logger, loggerTagsFactory } from '@server/helpers/logger'
  14. import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
  15. import { P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE } from '@server/initializers/constants'
  16. import { UserModel } from '@server/models/user/user'
  17. import { VideoModel } from '@server/models/video/video'
  18. import { VideoLiveModel } from '@server/models/video/video-live'
  19. import { VideoLiveSessionModel } from '@server/models/video/video-live-session'
  20. import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
  21. import { MStreamingPlaylistVideo, MVideo, MVideoLiveSession, MVideoLiveVideo } from '@server/types/models'
  22. import { wait } from '@shared/core-utils'
  23. import { LiveVideoError, VideoState, VideoStreamingPlaylistType } from '@shared/models'
  24. import { federateVideoIfNeeded } from '../activitypub/videos'
  25. import { JobQueue } from '../job-queue'
  26. import { generateHLSMasterPlaylistFilename, generateHlsSha256SegmentsFilename, getLiveReplayBaseDirectory } from '../paths'
  27. import { PeerTubeSocket } from '../peertube-socket'
  28. import { Hooks } from '../plugins/hooks'
  29. import { LiveQuotaStore } from './live-quota-store'
  30. import { cleanupPermanentLive } from './live-utils'
  31. import { MuxingSession } from './shared'
  32. const NodeRtmpSession = require('node-media-server/src/node_rtmp_session')
  33. const context = require('node-media-server/src/node_core_ctx')
  34. const nodeMediaServerLogger = require('node-media-server/src/node_core_logger')
  35. // Disable node media server logs
  36. nodeMediaServerLogger.setLogType(0)
  37. const config = {
  38. rtmp: {
  39. port: CONFIG.LIVE.RTMP.PORT,
  40. chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE,
  41. gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
  42. ping: VIDEO_LIVE.RTMP.PING,
  43. ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
  44. }
  45. }
  46. const lTags = loggerTagsFactory('live')
  47. class LiveManager {
  48. private static instance: LiveManager
  49. private readonly muxingSessions = new Map<string, MuxingSession>()
  50. private readonly videoSessions = new Map<number, string>()
  51. private rtmpServer: Server
  52. private rtmpsServer: ServerTLS
  53. private running = false
  54. private constructor () {
  55. }
  56. init () {
  57. const events = this.getContext().nodeEvent
  58. events.on('postPublish', (sessionId: string, streamPath: string) => {
  59. logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) })
  60. const splittedPath = streamPath.split('/')
  61. if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
  62. logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) })
  63. return this.abortSession(sessionId)
  64. }
  65. const session = this.getContext().sessions.get(sessionId)
  66. this.handleSession(sessionId, session.inputOriginUrl + streamPath, splittedPath[2])
  67. .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) }))
  68. })
  69. events.on('donePublish', sessionId => {
  70. logger.info('Live session ended.', { sessionId, ...lTags(sessionId) })
  71. })
  72. registerConfigChangedHandler(() => {
  73. if (!this.running && CONFIG.LIVE.ENABLED === true) {
  74. this.run().catch(err => logger.error('Cannot run live server.', { err }))
  75. return
  76. }
  77. if (this.running && CONFIG.LIVE.ENABLED === false) {
  78. this.stop()
  79. }
  80. })
  81. // Cleanup broken lives, that were terminated by a server restart for example
  82. this.handleBrokenLives()
  83. .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
  84. }
  85. async run () {
  86. this.running = true
  87. if (CONFIG.LIVE.RTMP.ENABLED) {
  88. logger.info('Running RTMP server on port %d', CONFIG.LIVE.RTMP.PORT, lTags())
  89. this.rtmpServer = createServer(socket => {
  90. const session = new NodeRtmpSession(config, socket)
  91. session.inputOriginUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT
  92. session.run()
  93. })
  94. this.rtmpServer.on('error', err => {
  95. logger.error('Cannot run RTMP server.', { err, ...lTags() })
  96. })
  97. this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT, CONFIG.LIVE.RTMP.HOSTNAME)
  98. }
  99. if (CONFIG.LIVE.RTMPS.ENABLED) {
  100. logger.info('Running RTMPS server on port %d', CONFIG.LIVE.RTMPS.PORT, lTags())
  101. const [ key, cert ] = await Promise.all([
  102. readFile(CONFIG.LIVE.RTMPS.KEY_FILE),
  103. readFile(CONFIG.LIVE.RTMPS.CERT_FILE)
  104. ])
  105. const serverOptions = { key, cert }
  106. this.rtmpsServer = createServerTLS(serverOptions, socket => {
  107. const session = new NodeRtmpSession(config, socket)
  108. session.inputOriginUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT
  109. session.run()
  110. })
  111. this.rtmpsServer.on('error', err => {
  112. logger.error('Cannot run RTMPS server.', { err, ...lTags() })
  113. })
  114. this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT, CONFIG.LIVE.RTMPS.HOSTNAME)
  115. }
  116. }
  117. stop () {
  118. this.running = false
  119. if (this.rtmpServer) {
  120. logger.info('Stopping RTMP server.', lTags())
  121. this.rtmpServer.close()
  122. this.rtmpServer = undefined
  123. }
  124. if (this.rtmpsServer) {
  125. logger.info('Stopping RTMPS server.', lTags())
  126. this.rtmpsServer.close()
  127. this.rtmpsServer = undefined
  128. }
  129. // Sessions is an object
  130. this.getContext().sessions.forEach((session: any) => {
  131. if (session instanceof NodeRtmpSession) {
  132. session.stop()
  133. }
  134. })
  135. }
  136. isRunning () {
  137. return !!this.rtmpServer
  138. }
  139. stopSessionOf (videoId: number, error: LiveVideoError | null) {
  140. const sessionId = this.videoSessions.get(videoId)
  141. if (!sessionId) return
  142. this.saveEndingSession(videoId, error)
  143. .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
  144. this.videoSessions.delete(videoId)
  145. this.abortSession(sessionId)
  146. }
  147. private getContext () {
  148. return context
  149. }
  150. private abortSession (sessionId: string) {
  151. const session = this.getContext().sessions.get(sessionId)
  152. if (session) {
  153. session.stop()
  154. this.getContext().sessions.delete(sessionId)
  155. }
  156. const muxingSession = this.muxingSessions.get(sessionId)
  157. if (muxingSession) {
  158. // Muxing session will fire and event so we correctly cleanup the session
  159. muxingSession.abort()
  160. this.muxingSessions.delete(sessionId)
  161. }
  162. }
  163. private async handleSession (sessionId: string, inputUrl: string, streamKey: string) {
  164. const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
  165. if (!videoLive) {
  166. logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId))
  167. return this.abortSession(sessionId)
  168. }
  169. const video = videoLive.Video
  170. if (video.isBlacklisted()) {
  171. logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid))
  172. return this.abortSession(sessionId)
  173. }
  174. // Cleanup old potential live (could happen with a permanent live)
  175. const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
  176. if (oldStreamingPlaylist) {
  177. if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid)
  178. await cleanupPermanentLive(video, oldStreamingPlaylist)
  179. }
  180. this.videoSessions.set(video.id, sessionId)
  181. const now = Date.now()
  182. const probe = await ffprobePromise(inputUrl)
  183. const [ { resolution, ratio }, fps, bitrate ] = await Promise.all([
  184. getVideoStreamDimensionsInfo(inputUrl, probe),
  185. getVideoStreamFPS(inputUrl, probe),
  186. getVideoStreamBitrate(inputUrl, probe)
  187. ])
  188. logger.info(
  189. '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)',
  190. inputUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid)
  191. )
  192. const allResolutions = await Hooks.wrapObject(
  193. this.buildAllResolutionsToTranscode(resolution),
  194. 'filter:transcoding.auto.lower-resolutions-to-transcode.result',
  195. { video }
  196. )
  197. logger.info(
  198. 'Will mux/transcode live video of original resolution %d.', resolution,
  199. { allResolutions, ...lTags(sessionId, video.uuid) }
  200. )
  201. const streamingPlaylist = await this.createLivePlaylist(video, allResolutions)
  202. return this.runMuxingSession({
  203. sessionId,
  204. videoLive,
  205. streamingPlaylist,
  206. inputUrl,
  207. fps,
  208. bitrate,
  209. ratio,
  210. allResolutions
  211. })
  212. }
  213. private async runMuxingSession (options: {
  214. sessionId: string
  215. videoLive: MVideoLiveVideo
  216. streamingPlaylist: MStreamingPlaylistVideo
  217. inputUrl: string
  218. fps: number
  219. bitrate: number
  220. ratio: number
  221. allResolutions: number[]
  222. }) {
  223. const { sessionId, videoLive, streamingPlaylist, allResolutions, fps, bitrate, ratio, inputUrl } = options
  224. const videoUUID = videoLive.Video.uuid
  225. const localLTags = lTags(sessionId, videoUUID)
  226. const liveSession = await this.saveStartingSession(videoLive)
  227. const user = await UserModel.loadByLiveId(videoLive.id)
  228. LiveQuotaStore.Instance.addNewLive(user.id, videoLive.id)
  229. const muxingSession = new MuxingSession({
  230. context: this.getContext(),
  231. user,
  232. sessionId,
  233. videoLive,
  234. streamingPlaylist,
  235. inputUrl,
  236. bitrate,
  237. ratio,
  238. fps,
  239. allResolutions
  240. })
  241. muxingSession.on('master-playlist-created', () => this.publishAndFederateLive(videoLive, localLTags))
  242. muxingSession.on('bad-socket-health', ({ videoId }) => {
  243. logger.error(
  244. 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
  245. ' Stopping session of video %s.', videoUUID,
  246. localLTags
  247. )
  248. this.stopSessionOf(videoId, LiveVideoError.BAD_SOCKET_HEALTH)
  249. })
  250. muxingSession.on('duration-exceeded', ({ videoId }) => {
  251. logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
  252. this.stopSessionOf(videoId, LiveVideoError.DURATION_EXCEEDED)
  253. })
  254. muxingSession.on('quota-exceeded', ({ videoId }) => {
  255. logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
  256. this.stopSessionOf(videoId, LiveVideoError.QUOTA_EXCEEDED)
  257. })
  258. muxingSession.on('ffmpeg-error', ({ videoId }) => {
  259. this.stopSessionOf(videoId, LiveVideoError.FFMPEG_ERROR)
  260. })
  261. muxingSession.on('ffmpeg-end', ({ videoId }) => {
  262. this.onMuxingFFmpegEnd(videoId, sessionId)
  263. })
  264. muxingSession.on('after-cleanup', ({ videoId }) => {
  265. this.muxingSessions.delete(sessionId)
  266. LiveQuotaStore.Instance.removeLive(user.id, videoLive.id)
  267. muxingSession.destroy()
  268. return this.onAfterMuxingCleanup({ videoId, liveSession })
  269. .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
  270. })
  271. this.muxingSessions.set(sessionId, muxingSession)
  272. muxingSession.runMuxing()
  273. .catch(err => {
  274. logger.error('Cannot run muxing.', { err, ...localLTags })
  275. this.abortSession(sessionId)
  276. })
  277. }
  278. private async publishAndFederateLive (live: MVideoLiveVideo, localLTags: { tags: string[] }) {
  279. const videoId = live.videoId
  280. try {
  281. const video = await VideoModel.loadFull(videoId)
  282. logger.info('Will publish and federate live %s.', video.url, localLTags)
  283. video.state = VideoState.PUBLISHED
  284. video.publishedAt = new Date()
  285. await video.save()
  286. live.Video = video
  287. await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
  288. try {
  289. await federateVideoIfNeeded(video, false)
  290. } catch (err) {
  291. logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })
  292. }
  293. PeerTubeSocket.Instance.sendVideoLiveNewState(video)
  294. } catch (err) {
  295. logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags })
  296. }
  297. }
  298. private onMuxingFFmpegEnd (videoId: number, sessionId: string) {
  299. this.videoSessions.delete(videoId)
  300. this.saveEndingSession(videoId, null)
  301. .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
  302. }
  303. private async onAfterMuxingCleanup (options: {
  304. videoId: number | string
  305. liveSession?: MVideoLiveSession
  306. cleanupNow?: boolean // Default false
  307. }) {
  308. const { videoId, liveSession: liveSessionArg, cleanupNow = false } = options
  309. try {
  310. const fullVideo = await VideoModel.loadFull(videoId)
  311. if (!fullVideo) return
  312. const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
  313. const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findLatestSessionOf(fullVideo.id)
  314. // On server restart during a live
  315. if (!liveSession.endDate) {
  316. liveSession.endDate = new Date()
  317. await liveSession.save()
  318. }
  319. JobQueue.Instance.createJob({
  320. type: 'video-live-ending',
  321. payload: {
  322. videoId: fullVideo.id,
  323. replayDirectory: live.saveReplay
  324. ? await this.findReplayDirectory(fullVideo)
  325. : undefined,
  326. liveSessionId: liveSession.id,
  327. streamingPlaylistId: fullVideo.getHLSPlaylist()?.id,
  328. publishedAt: fullVideo.publishedAt.toISOString()
  329. }
  330. }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
  331. fullVideo.state = live.permanentLive
  332. ? VideoState.WAITING_FOR_LIVE
  333. : VideoState.LIVE_ENDED
  334. await fullVideo.save()
  335. PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
  336. await federateVideoIfNeeded(fullVideo, false)
  337. } catch (err) {
  338. logger.error('Cannot save/federate new video state of live streaming of video %d.', videoId, { err, ...lTags(videoId + '') })
  339. }
  340. }
  341. private async handleBrokenLives () {
  342. const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
  343. for (const uuid of videoUUIDs) {
  344. await this.onAfterMuxingCleanup({ videoId: uuid, cleanupNow: true })
  345. }
  346. }
  347. private async findReplayDirectory (video: MVideo) {
  348. const directory = getLiveReplayBaseDirectory(video)
  349. const files = await readdir(directory)
  350. if (files.length === 0) return undefined
  351. return join(directory, files.sort().reverse()[0])
  352. }
  353. private buildAllResolutionsToTranscode (originResolution: number) {
  354. const includeInputResolution = CONFIG.LIVE.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION
  355. const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
  356. ? computeResolutionsToTranscode({ inputResolution: originResolution, type: 'live', includeInputResolution })
  357. : []
  358. if (resolutionsEnabled.length === 0) {
  359. return [ originResolution ]
  360. }
  361. return resolutionsEnabled
  362. }
  363. private async createLivePlaylist (video: MVideo, allResolutions: number[]): Promise<MStreamingPlaylistVideo> {
  364. const playlist = await VideoStreamingPlaylistModel.loadOrGenerate(video)
  365. playlist.playlistFilename = generateHLSMasterPlaylistFilename(true)
  366. playlist.segmentsSha256Filename = generateHlsSha256SegmentsFilename(true)
  367. playlist.p2pMediaLoaderPeerVersion = P2P_MEDIA_LOADER_PEER_VERSION
  368. playlist.type = VideoStreamingPlaylistType.HLS
  369. playlist.assignP2PMediaLoaderInfoHashes(video, allResolutions)
  370. return playlist.save()
  371. }
  372. private saveStartingSession (videoLive: MVideoLiveVideo) {
  373. const liveSession = new VideoLiveSessionModel({
  374. startDate: new Date(),
  375. liveVideoId: videoLive.videoId,
  376. saveReplay: videoLive.saveReplay,
  377. endingProcessed: false
  378. })
  379. return liveSession.save()
  380. }
  381. private async saveEndingSession (videoId: number, error: LiveVideoError | null) {
  382. const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoId)
  383. if (!liveSession) return
  384. liveSession.endDate = new Date()
  385. liveSession.error = error
  386. return liveSession.save()
  387. }
  388. static get Instance () {
  389. return this.instance || (this.instance = new this())
  390. }
  391. }
  392. // ---------------------------------------------------------------------------
  393. export {
  394. LiveManager
  395. }