live-manager.ts 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. import { readdir, readFile } from 'fs/promises'
  2. import { createServer, Server } from 'net'
  3. import context from 'node-media-server/src/node_core_ctx.js'
  4. import nodeMediaServerLogger from 'node-media-server/src/node_core_logger.js'
  5. import NodeRtmpSession from 'node-media-server/src/node_rtmp_session.js'
  6. import { join } from 'path'
  7. import { createServer as createServerTLS, Server as ServerTLS } from 'tls'
  8. import { pick, wait } from '@peertube/peertube-core-utils'
  9. import { LiveVideoError, LiveVideoErrorType, VideoState } from '@peertube/peertube-models'
  10. import { logger, loggerTagsFactory } from '@server/helpers/logger.js'
  11. import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config.js'
  12. import { VIDEO_LIVE, WEBSERVER } from '@server/initializers/constants.js'
  13. import { sequelizeTypescript } from '@server/initializers/database.js'
  14. import { RunnerJobModel } from '@server/models/runner/runner-job.js'
  15. import { UserModel } from '@server/models/user/user.js'
  16. import { VideoLiveReplaySettingModel } from '@server/models/video/video-live-replay-setting.js'
  17. import { VideoLiveSessionModel } from '@server/models/video/video-live-session.js'
  18. import { VideoLiveModel } from '@server/models/video/video-live.js'
  19. import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist.js'
  20. import { VideoModel } from '@server/models/video/video.js'
  21. import { MUser, MVideo, MVideoLiveSession, MVideoLiveVideo, MVideoLiveVideoWithSetting } from '@server/types/models/index.js'
  22. import {
  23. ffprobePromise,
  24. getVideoStreamBitrate,
  25. getVideoStreamDimensionsInfo,
  26. getVideoStreamFPS,
  27. hasAudioStream
  28. } from '@peertube/peertube-ffmpeg'
  29. import { federateVideoIfNeeded } from '../activitypub/videos/index.js'
  30. import { JobQueue } from '../job-queue/index.js'
  31. import { getLiveReplayBaseDirectory } from '../paths.js'
  32. import { PeerTubeSocket } from '../peertube-socket.js'
  33. import { Hooks } from '../plugins/hooks.js'
  34. import { computeResolutionsToTranscode } from '../transcoding/transcoding-resolutions.js'
  35. import { LiveQuotaStore } from './live-quota-store.js'
  36. import { cleanupAndDestroyPermanentLive, getLiveSegmentTime } from './live-utils.js'
  37. import { MuxingSession } from './shared/index.js'
  38. import { Notifier } from '../notifier/notifier.js'
  39. // Disable node media server logs
  40. nodeMediaServerLogger.setLogType(0)
  41. const config = {
  42. rtmp: {
  43. port: CONFIG.LIVE.RTMP.PORT,
  44. chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE,
  45. gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
  46. ping: VIDEO_LIVE.RTMP.PING,
  47. ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
  48. }
  49. }
  50. const lTags = loggerTagsFactory('live')
  51. class LiveManager {
  52. private static instance: LiveManager
  53. private readonly muxingSessions = new Map<string, MuxingSession>()
  54. private readonly videoSessions = new Map<string, string>()
  55. private rtmpServer: Server
  56. private rtmpsServer: ServerTLS
  57. private running = false
  58. private constructor () {
  59. }
  60. init () {
  61. const events = this.getContext().nodeEvent
  62. events.on('postPublish', (sessionId: string, streamPath: string) => {
  63. logger.debug('RTMP received stream', { id: sessionId, streamPath, ...lTags(sessionId) })
  64. const splittedPath = streamPath.split('/')
  65. if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
  66. logger.warn('Live path is incorrect.', { streamPath, ...lTags(sessionId) })
  67. return this.abortSession(sessionId)
  68. }
  69. const session = this.getContext().sessions.get(sessionId)
  70. const inputLocalUrl = session.inputOriginLocalUrl + streamPath
  71. const inputPublicUrl = session.inputOriginPublicUrl + streamPath
  72. this.handleSession({ sessionId, inputPublicUrl, inputLocalUrl, streamKey: splittedPath[2] })
  73. .catch(err => logger.error('Cannot handle sessions.', { err, ...lTags(sessionId) }))
  74. })
  75. events.on('donePublish', sessionId => {
  76. logger.info('Live session ended.', { sessionId, ...lTags(sessionId) })
  77. // Force session aborting, so we kill ffmpeg even if it still has data to process (slow CPU)
  78. setTimeout(() => this.abortSession(sessionId), 2000)
  79. })
  80. registerConfigChangedHandler(() => {
  81. if (!this.running && CONFIG.LIVE.ENABLED === true) {
  82. this.run().catch(err => logger.error('Cannot run live server.', { err }))
  83. return
  84. }
  85. if (this.running && CONFIG.LIVE.ENABLED === false) {
  86. this.stop()
  87. }
  88. })
  89. // Cleanup broken lives, that were terminated by a server restart for example
  90. this.handleBrokenLives()
  91. .catch(err => logger.error('Cannot handle broken lives.', { err, ...lTags() }))
  92. }
  93. async run () {
  94. this.running = true
  95. if (CONFIG.LIVE.RTMP.ENABLED) {
  96. logger.info('Running RTMP server on port %d', CONFIG.LIVE.RTMP.PORT, lTags())
  97. this.rtmpServer = createServer(socket => {
  98. const session = new NodeRtmpSession(config, socket)
  99. session.inputOriginLocalUrl = 'rtmp://127.0.0.1:' + CONFIG.LIVE.RTMP.PORT
  100. session.inputOriginPublicUrl = WEBSERVER.RTMP_URL
  101. session.run()
  102. })
  103. this.rtmpServer.on('error', err => {
  104. logger.error('Cannot run RTMP server.', { err, ...lTags() })
  105. })
  106. this.rtmpServer.listen(CONFIG.LIVE.RTMP.PORT, CONFIG.LIVE.RTMP.HOSTNAME)
  107. }
  108. if (CONFIG.LIVE.RTMPS.ENABLED) {
  109. logger.info('Running RTMPS server on port %d', CONFIG.LIVE.RTMPS.PORT, lTags())
  110. const [ key, cert ] = await Promise.all([
  111. readFile(CONFIG.LIVE.RTMPS.KEY_FILE),
  112. readFile(CONFIG.LIVE.RTMPS.CERT_FILE)
  113. ])
  114. const serverOptions = { key, cert }
  115. this.rtmpsServer = createServerTLS(serverOptions, socket => {
  116. const session = new NodeRtmpSession(config, socket)
  117. session.inputOriginLocalUrl = 'rtmps://127.0.0.1:' + CONFIG.LIVE.RTMPS.PORT
  118. session.inputOriginPublicUrl = WEBSERVER.RTMPS_URL
  119. session.run()
  120. })
  121. this.rtmpsServer.on('error', err => {
  122. logger.error('Cannot run RTMPS server.', { err, ...lTags() })
  123. })
  124. this.rtmpsServer.listen(CONFIG.LIVE.RTMPS.PORT, CONFIG.LIVE.RTMPS.HOSTNAME)
  125. }
  126. }
  127. stop () {
  128. this.running = false
  129. if (this.rtmpServer) {
  130. logger.info('Stopping RTMP server.', lTags())
  131. this.rtmpServer.close()
  132. this.rtmpServer = undefined
  133. }
  134. if (this.rtmpsServer) {
  135. logger.info('Stopping RTMPS server.', lTags())
  136. this.rtmpsServer.close()
  137. this.rtmpsServer = undefined
  138. }
  139. // Sessions is an object
  140. this.getContext().sessions.forEach((session: any) => {
  141. if (session instanceof NodeRtmpSession) {
  142. session.stop()
  143. }
  144. })
  145. }
  146. isRunning () {
  147. return !!this.rtmpServer
  148. }
  149. hasSession (sessionId: string) {
  150. return this.getContext().sessions.has(sessionId)
  151. }
  152. stopSessionOf (options: {
  153. videoUUID: string
  154. error: LiveVideoErrorType | null
  155. errorOnReplay?: boolean
  156. }) {
  157. const { videoUUID, error } = options
  158. const sessionId = this.videoSessions.get(videoUUID)
  159. if (!sessionId) {
  160. logger.debug('No live session to stop for video %s', videoUUID, lTags(sessionId, videoUUID))
  161. return
  162. }
  163. logger.info('Stopping live session of video %s', videoUUID, { error, ...lTags(sessionId, videoUUID) })
  164. this.saveEndingSession(options)
  165. .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId, videoUUID) }))
  166. this.videoSessions.delete(videoUUID)
  167. this.abortSession(sessionId)
  168. }
  169. private getContext () {
  170. return context
  171. }
  172. private abortSession (sessionId: string) {
  173. const session = this.getContext().sessions.get(sessionId)
  174. if (session) {
  175. session.stop()
  176. this.getContext().sessions.delete(sessionId)
  177. }
  178. const muxingSession = this.muxingSessions.get(sessionId)
  179. if (muxingSession) {
  180. // Muxing session will fire and event so we correctly cleanup the session
  181. muxingSession.abort()
  182. this.muxingSessions.delete(sessionId)
  183. }
  184. }
  185. private async handleSession (options: {
  186. sessionId: string
  187. inputLocalUrl: string
  188. inputPublicUrl: string
  189. streamKey: string
  190. }) {
  191. const { inputLocalUrl, inputPublicUrl, sessionId, streamKey } = options
  192. const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
  193. if (!videoLive) {
  194. logger.warn('Unknown live video with stream key %s.', streamKey, lTags(sessionId))
  195. return this.abortSession(sessionId)
  196. }
  197. const video = videoLive.Video
  198. if (video.isBlacklisted()) {
  199. logger.warn('Video is blacklisted. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid))
  200. return this.abortSession(sessionId)
  201. }
  202. const user = await UserModel.loadByLiveId(videoLive.id)
  203. if (user.blocked) {
  204. logger.warn('User is blocked. Refusing stream %s.', streamKey, lTags(sessionId, video.uuid))
  205. return this.abortSession(sessionId)
  206. }
  207. if (this.videoSessions.has(video.uuid)) {
  208. logger.warn('Video %s has already a live session. Refusing stream %s.', video.uuid, streamKey, lTags(sessionId, video.uuid))
  209. return this.abortSession(sessionId)
  210. }
  211. // Cleanup old potential live (could happen with a permanent live)
  212. const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
  213. if (oldStreamingPlaylist) {
  214. if (!videoLive.permanentLive) throw new Error('Found previous session in a non permanent live: ' + video.uuid)
  215. await cleanupAndDestroyPermanentLive(video, oldStreamingPlaylist)
  216. }
  217. this.videoSessions.set(video.uuid, sessionId)
  218. const now = Date.now()
  219. const probe = await ffprobePromise(inputLocalUrl)
  220. const [ { resolution, ratio }, fps, bitrate, hasAudio ] = await Promise.all([
  221. getVideoStreamDimensionsInfo(inputLocalUrl, probe),
  222. getVideoStreamFPS(inputLocalUrl, probe),
  223. getVideoStreamBitrate(inputLocalUrl, probe),
  224. hasAudioStream(inputLocalUrl, probe)
  225. ])
  226. logger.info(
  227. '%s probing took %d ms (bitrate: %d, fps: %d, resolution: %d)',
  228. inputLocalUrl, Date.now() - now, bitrate, fps, resolution, lTags(sessionId, video.uuid)
  229. )
  230. const allResolutions = await Hooks.wrapObject(
  231. this.buildAllResolutionsToTranscode(resolution, hasAudio),
  232. 'filter:transcoding.auto.resolutions-to-transcode.result',
  233. { video }
  234. )
  235. logger.info(
  236. 'Handling live video of original resolution %d.', resolution,
  237. { allResolutions, ...lTags(sessionId, video.uuid) }
  238. )
  239. return this.runMuxingSession({
  240. sessionId,
  241. videoLive,
  242. user,
  243. inputLocalUrl,
  244. inputPublicUrl,
  245. fps,
  246. bitrate,
  247. ratio,
  248. allResolutions,
  249. hasAudio
  250. })
  251. }
  252. private async runMuxingSession (options: {
  253. sessionId: string
  254. videoLive: MVideoLiveVideoWithSetting
  255. user: MUser
  256. inputLocalUrl: string
  257. inputPublicUrl: string
  258. fps: number
  259. bitrate: number
  260. ratio: number
  261. allResolutions: number[]
  262. hasAudio: boolean
  263. }) {
  264. const { sessionId, videoLive, user } = options
  265. const videoUUID = videoLive.Video.uuid
  266. const localLTags = lTags(sessionId, videoUUID)
  267. const liveSession = await this.saveStartingSession(videoLive)
  268. LiveQuotaStore.Instance.addNewLive(user.id, sessionId)
  269. const muxingSession = new MuxingSession({
  270. context: this.getContext(),
  271. sessionId,
  272. videoLive,
  273. user,
  274. ...pick(options, [ 'inputLocalUrl', 'inputPublicUrl', 'bitrate', 'ratio', 'fps', 'allResolutions', 'hasAudio' ])
  275. })
  276. muxingSession.on('live-ready', () => this.publishAndFederateLive(videoLive, localLTags))
  277. muxingSession.on('bad-socket-health', ({ videoUUID }) => {
  278. logger.error(
  279. 'Too much data in client socket stream (ffmpeg is too slow to transcode the video).' +
  280. ' Stopping session of video %s.', videoUUID,
  281. localLTags
  282. )
  283. this.stopSessionOf({ videoUUID, error: LiveVideoError.BAD_SOCKET_HEALTH })
  284. })
  285. muxingSession.on('duration-exceeded', ({ videoUUID }) => {
  286. logger.info('Stopping session of %s: max duration exceeded.', videoUUID, localLTags)
  287. this.stopSessionOf({ videoUUID, error: LiveVideoError.DURATION_EXCEEDED })
  288. })
  289. muxingSession.on('quota-exceeded', ({ videoUUID }) => {
  290. logger.info('Stopping session of %s: user quota exceeded.', videoUUID, localLTags)
  291. this.stopSessionOf({ videoUUID, error: LiveVideoError.QUOTA_EXCEEDED })
  292. })
  293. muxingSession.on('transcoding-error', ({ videoUUID }) => {
  294. this.stopSessionOf({ videoUUID, error: LiveVideoError.FFMPEG_ERROR })
  295. })
  296. muxingSession.on('transcoding-end', ({ videoUUID }) => {
  297. this.onMuxingFFmpegEnd(videoUUID, sessionId)
  298. })
  299. muxingSession.on('after-cleanup', ({ videoUUID }) => {
  300. this.muxingSessions.delete(sessionId)
  301. LiveQuotaStore.Instance.removeLive(user.id, sessionId)
  302. muxingSession.destroy()
  303. return this.onAfterMuxingCleanup({ videoUUID, liveSession })
  304. .catch(err => logger.error('Error in end transmuxing.', { err, ...localLTags }))
  305. })
  306. this.muxingSessions.set(sessionId, muxingSession)
  307. muxingSession.runMuxing()
  308. .catch(err => {
  309. logger.error('Cannot run muxing.', { err, ...localLTags })
  310. this.muxingSessions.delete(sessionId)
  311. muxingSession.destroy()
  312. this.stopSessionOf({
  313. videoUUID,
  314. error: err.liveVideoErrorCode || LiveVideoError.UNKNOWN_ERROR,
  315. errorOnReplay: true // Replay cannot be processed as muxing session failed directly
  316. })
  317. })
  318. }
  319. private async publishAndFederateLive (live: MVideoLiveVideo, localLTags: { tags: string[] }) {
  320. const videoId = live.videoId
  321. try {
  322. const video = await VideoModel.loadFull(videoId)
  323. logger.info('Will publish and federate live %s.', video.url, localLTags)
  324. video.state = VideoState.PUBLISHED
  325. video.publishedAt = new Date()
  326. await video.save()
  327. live.Video = video
  328. await wait(getLiveSegmentTime(live.latencyMode) * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
  329. try {
  330. await federateVideoIfNeeded(video, false)
  331. } catch (err) {
  332. logger.error('Cannot federate live video %s.', video.url, { err, ...localLTags })
  333. }
  334. Notifier.Instance.notifyOnNewVideoOrLiveIfNeeded(video)
  335. PeerTubeSocket.Instance.sendVideoLiveNewState(video)
  336. Hooks.runAction('action:live.video.state.updated', { video })
  337. } catch (err) {
  338. logger.error('Cannot save/federate live video %d.', videoId, { err, ...localLTags })
  339. }
  340. }
  341. private onMuxingFFmpegEnd (videoUUID: string, sessionId: string) {
  342. // Session already cleaned up
  343. if (!this.videoSessions.has(videoUUID)) return
  344. this.videoSessions.delete(videoUUID)
  345. this.saveEndingSession({ videoUUID, error: null })
  346. .catch(err => logger.error('Cannot save ending session.', { err, ...lTags(sessionId) }))
  347. }
  348. private async onAfterMuxingCleanup (options: {
  349. videoUUID: string
  350. liveSession?: MVideoLiveSession
  351. cleanupNow?: boolean // Default false
  352. }) {
  353. const { videoUUID, liveSession: liveSessionArg, cleanupNow = false } = options
  354. logger.debug('Live of video %s has been cleaned up. Moving to its next state.', videoUUID, lTags(videoUUID))
  355. try {
  356. const fullVideo = await VideoModel.loadFull(videoUUID)
  357. if (!fullVideo) return
  358. const live = await VideoLiveModel.loadByVideoId(fullVideo.id)
  359. const liveSession = liveSessionArg ?? await VideoLiveSessionModel.findLatestSessionOf(fullVideo.id)
  360. // On server restart during a live
  361. if (!liveSession.endDate) {
  362. liveSession.endDate = new Date()
  363. await liveSession.save()
  364. }
  365. JobQueue.Instance.createJobAsync({
  366. type: 'video-live-ending',
  367. payload: {
  368. videoId: fullVideo.id,
  369. replayDirectory: live.saveReplay
  370. ? await this.findReplayDirectory(fullVideo)
  371. : undefined,
  372. liveSessionId: liveSession.id,
  373. streamingPlaylistId: fullVideo.getHLSPlaylist()?.id,
  374. publishedAt: fullVideo.publishedAt.toISOString()
  375. },
  376. delay: cleanupNow
  377. ? 0
  378. : VIDEO_LIVE.CLEANUP_DELAY
  379. })
  380. fullVideo.state = live.permanentLive
  381. ? VideoState.WAITING_FOR_LIVE
  382. : VideoState.LIVE_ENDED
  383. await fullVideo.save()
  384. PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
  385. await federateVideoIfNeeded(fullVideo, false)
  386. Hooks.runAction('action:live.video.state.updated', { video: fullVideo })
  387. } catch (err) {
  388. logger.error('Cannot save/federate new video state of live streaming of video %s.', videoUUID, { err, ...lTags(videoUUID) })
  389. }
  390. }
  391. private async handleBrokenLives () {
  392. await RunnerJobModel.cancelAllJobs({ type: 'live-rtmp-hls-transcoding' })
  393. const videoUUIDs = await VideoModel.listPublishedLiveUUIDs()
  394. for (const uuid of videoUUIDs) {
  395. await this.onAfterMuxingCleanup({ videoUUID: uuid, cleanupNow: true })
  396. }
  397. }
  398. private async findReplayDirectory (video: MVideo) {
  399. const directory = getLiveReplayBaseDirectory(video)
  400. const files = await readdir(directory)
  401. if (files.length === 0) return undefined
  402. return join(directory, files.sort().reverse()[0])
  403. }
  404. private buildAllResolutionsToTranscode (originResolution: number, hasAudio: boolean) {
  405. const includeInput = CONFIG.LIVE.TRANSCODING.ALWAYS_TRANSCODE_ORIGINAL_RESOLUTION
  406. const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
  407. ? computeResolutionsToTranscode({ input: originResolution, type: 'live', includeInput, strictLower: false, hasAudio })
  408. : []
  409. if (resolutionsEnabled.length === 0) {
  410. return [ originResolution ]
  411. }
  412. return resolutionsEnabled
  413. }
  414. private async saveStartingSession (videoLive: MVideoLiveVideoWithSetting) {
  415. const replaySettings = videoLive.saveReplay
  416. ? new VideoLiveReplaySettingModel({
  417. privacy: videoLive.ReplaySetting.privacy
  418. })
  419. : null
  420. return sequelizeTypescript.transaction(async t => {
  421. if (videoLive.saveReplay) {
  422. await replaySettings.save({ transaction: t })
  423. }
  424. return VideoLiveSessionModel.create({
  425. startDate: new Date(),
  426. liveVideoId: videoLive.videoId,
  427. saveReplay: videoLive.saveReplay,
  428. replaySettingId: videoLive.saveReplay ? replaySettings.id : null,
  429. endingProcessed: false
  430. }, { transaction: t })
  431. })
  432. }
  433. private async saveEndingSession (options: {
  434. videoUUID: string
  435. error: LiveVideoErrorType | null
  436. errorOnReplay?: boolean
  437. }) {
  438. const { videoUUID, error, errorOnReplay } = options
  439. const liveSession = await VideoLiveSessionModel.findCurrentSessionOf(videoUUID)
  440. if (!liveSession) return
  441. liveSession.endDate = new Date()
  442. liveSession.error = error
  443. if (errorOnReplay === true) {
  444. liveSession.endingProcessed = true
  445. }
  446. return liveSession.save()
  447. }
  448. static get Instance () {
  449. return this.instance || (this.instance = new this())
  450. }
  451. }
  452. // ---------------------------------------------------------------------------
  453. export {
  454. LiveManager
  455. }