live-manager.ts 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571
  1. import * as Bluebird from 'bluebird'
  2. import * as chokidar from 'chokidar'
  3. import { FfmpegCommand } from 'fluent-ffmpeg'
  4. import { appendFile, ensureDir, readFile, stat } from 'fs-extra'
  5. import { basename, join } from 'path'
  6. import { isTestInstance } from '@server/helpers/core-utils'
  7. import { getLiveMuxingCommand, getLiveTranscodingCommand } from '@server/helpers/ffmpeg-utils'
  8. import { computeResolutionsToTranscode, getVideoFileFPS, getVideoFileResolution } from '@server/helpers/ffprobe-utils'
  9. import { logger } from '@server/helpers/logger'
  10. import { CONFIG, registerConfigChangedHandler } from '@server/initializers/config'
  11. import { MEMOIZE_TTL, P2P_MEDIA_LOADER_PEER_VERSION, VIDEO_LIVE, VIEW_LIFETIME, WEBSERVER } from '@server/initializers/constants'
  12. import { UserModel } from '@server/models/account/user'
  13. import { VideoModel } from '@server/models/video/video'
  14. import { VideoFileModel } from '@server/models/video/video-file'
  15. import { VideoLiveModel } from '@server/models/video/video-live'
  16. import { VideoStreamingPlaylistModel } from '@server/models/video/video-streaming-playlist'
  17. import { MStreamingPlaylist, MUserId, MVideoLive, MVideoLiveVideo } from '@server/types/models'
  18. import { VideoState, VideoStreamingPlaylistType } from '@shared/models'
  19. import { federateVideoIfNeeded } from './activitypub/videos'
  20. import { buildSha256Segment } from './hls'
  21. import { JobQueue } from './job-queue'
  22. import { cleanupLive } from './job-queue/handlers/video-live-ending'
  23. import { PeerTubeSocket } from './peertube-socket'
  24. import { isAbleToUploadVideo } from './user'
  25. import { getHLSDirectory } from './video-paths'
  26. import { availableEncoders } from './video-transcoding-profiles'
  27. import memoizee = require('memoizee')
  28. const NodeRtmpServer = require('node-media-server/node_rtmp_server')
  29. const context = require('node-media-server/node_core_ctx')
  30. const nodeMediaServerLogger = require('node-media-server/node_core_logger')
  31. // Disable node media server logs
  32. nodeMediaServerLogger.setLogType(0)
  33. const config = {
  34. rtmp: {
  35. port: CONFIG.LIVE.RTMP.PORT,
  36. chunk_size: VIDEO_LIVE.RTMP.CHUNK_SIZE,
  37. gop_cache: VIDEO_LIVE.RTMP.GOP_CACHE,
  38. ping: VIDEO_LIVE.RTMP.PING,
  39. ping_timeout: VIDEO_LIVE.RTMP.PING_TIMEOUT
  40. },
  41. transcoding: {
  42. ffmpeg: 'ffmpeg'
  43. }
  44. }
  45. class LiveManager {
  46. private static instance: LiveManager
  47. private readonly transSessions = new Map<string, FfmpegCommand>()
  48. private readonly videoSessions = new Map<number, string>()
  49. // Values are Date().getTime()
  50. private readonly watchersPerVideo = new Map<number, number[]>()
  51. private readonly segmentsSha256 = new Map<string, Map<string, string>>()
  52. private readonly livesPerUser = new Map<number, { liveId: number, videoId: number, size: number }[]>()
  53. private readonly isAbleToUploadVideoWithCache = memoizee((userId: number) => {
  54. return isAbleToUploadVideo(userId, 1000)
  55. }, { maxAge: MEMOIZE_TTL.LIVE_ABLE_TO_UPLOAD })
  56. private rtmpServer: any
  57. private constructor () {
  58. }
  59. init () {
  60. const events = this.getContext().nodeEvent
  61. events.on('postPublish', (sessionId: string, streamPath: string) => {
  62. logger.debug('RTMP received stream', { id: sessionId, streamPath })
  63. const splittedPath = streamPath.split('/')
  64. if (splittedPath.length !== 3 || splittedPath[1] !== VIDEO_LIVE.RTMP.BASE_PATH) {
  65. logger.warn('Live path is incorrect.', { streamPath })
  66. return this.abortSession(sessionId)
  67. }
  68. this.handleSession(sessionId, streamPath, splittedPath[2])
  69. .catch(err => logger.error('Cannot handle sessions.', { err }))
  70. })
  71. events.on('donePublish', sessionId => {
  72. logger.info('Live session ended.', { sessionId })
  73. })
  74. registerConfigChangedHandler(() => {
  75. if (!this.rtmpServer && CONFIG.LIVE.ENABLED === true) {
  76. this.run()
  77. return
  78. }
  79. if (this.rtmpServer && CONFIG.LIVE.ENABLED === false) {
  80. this.stop()
  81. }
  82. })
  83. // Cleanup broken lives, that were terminated by a server restart for example
  84. this.handleBrokenLives()
  85. .catch(err => logger.error('Cannot handle broken lives.', { err }))
  86. setInterval(() => this.updateLiveViews(), VIEW_LIFETIME.LIVE)
  87. }
  88. run () {
  89. logger.info('Running RTMP server on port %d', config.rtmp.port)
  90. this.rtmpServer = new NodeRtmpServer(config)
  91. this.rtmpServer.tcpServer.on('error', err => {
  92. logger.error('Cannot run RTMP server.', { err })
  93. })
  94. this.rtmpServer.run()
  95. }
  96. stop () {
  97. logger.info('Stopping RTMP server.')
  98. this.rtmpServer.stop()
  99. this.rtmpServer = undefined
  100. }
  101. isRunning () {
  102. return !!this.rtmpServer
  103. }
  104. getSegmentsSha256 (videoUUID: string) {
  105. return this.segmentsSha256.get(videoUUID)
  106. }
  107. stopSessionOf (videoId: number) {
  108. const sessionId = this.videoSessions.get(videoId)
  109. if (!sessionId) return
  110. this.videoSessions.delete(videoId)
  111. this.abortSession(sessionId)
  112. }
  113. getLiveQuotaUsedByUser (userId: number) {
  114. const currentLives = this.livesPerUser.get(userId)
  115. if (!currentLives) return 0
  116. return currentLives.reduce((sum, obj) => sum + obj.size, 0)
  117. }
  118. addViewTo (videoId: number) {
  119. if (this.videoSessions.has(videoId) === false) return
  120. let watchers = this.watchersPerVideo.get(videoId)
  121. if (!watchers) {
  122. watchers = []
  123. this.watchersPerVideo.set(videoId, watchers)
  124. }
  125. watchers.push(new Date().getTime())
  126. }
  127. cleanupShaSegments (videoUUID: string) {
  128. this.segmentsSha256.delete(videoUUID)
  129. }
  130. addSegmentToReplay (hlsVideoPath: string, segmentPath: string) {
  131. const segmentName = basename(segmentPath)
  132. const dest = join(hlsVideoPath, VIDEO_LIVE.REPLAY_DIRECTORY, this.buildConcatenatedName(segmentName))
  133. return readFile(segmentPath)
  134. .then(data => appendFile(dest, data))
  135. .catch(err => logger.error('Cannot copy segment %s to repay directory.', segmentPath, { err }))
  136. }
  137. buildConcatenatedName (segmentOrPlaylistPath: string) {
  138. const num = basename(segmentOrPlaylistPath).match(/^(\d+)(-|\.)/)
  139. return 'concat-' + num[1] + '.ts'
  140. }
  141. private processSegments (hlsVideoPath: string, videoUUID: string, videoLive: MVideoLive, segmentPaths: string[]) {
  142. Bluebird.mapSeries(segmentPaths, async previousSegment => {
  143. // Add sha hash of previous segments, because ffmpeg should have finished generating them
  144. await this.addSegmentSha(videoUUID, previousSegment)
  145. if (videoLive.saveReplay) {
  146. await this.addSegmentToReplay(hlsVideoPath, previousSegment)
  147. }
  148. }).catch(err => logger.error('Cannot process segments in %s', hlsVideoPath, { err }))
  149. }
  150. private getContext () {
  151. return context
  152. }
  153. private abortSession (id: string) {
  154. const session = this.getContext().sessions.get(id)
  155. if (session) {
  156. session.stop()
  157. this.getContext().sessions.delete(id)
  158. }
  159. const transSession = this.transSessions.get(id)
  160. if (transSession) {
  161. transSession.kill('SIGINT')
  162. this.transSessions.delete(id)
  163. }
  164. }
  165. private async handleSession (sessionId: string, streamPath: string, streamKey: string) {
  166. const videoLive = await VideoLiveModel.loadByStreamKey(streamKey)
  167. if (!videoLive) {
  168. logger.warn('Unknown live video with stream key %s.', streamKey)
  169. return this.abortSession(sessionId)
  170. }
  171. const video = videoLive.Video
  172. if (video.isBlacklisted()) {
  173. logger.warn('Video is blacklisted. Refusing stream %s.', streamKey)
  174. return this.abortSession(sessionId)
  175. }
  176. // Cleanup old potential live files (could happen with a permanent live)
  177. this.cleanupShaSegments(video.uuid)
  178. const oldStreamingPlaylist = await VideoStreamingPlaylistModel.loadHLSPlaylistByVideo(video.id)
  179. if (oldStreamingPlaylist) {
  180. await cleanupLive(video, oldStreamingPlaylist)
  181. }
  182. this.videoSessions.set(video.id, sessionId)
  183. const playlistUrl = WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsMasterPlaylistStaticPath(video.uuid)
  184. const session = this.getContext().sessions.get(sessionId)
  185. const rtmpUrl = 'rtmp://127.0.0.1:' + config.rtmp.port + streamPath
  186. const [ resolutionResult, fps ] = await Promise.all([
  187. getVideoFileResolution(rtmpUrl),
  188. getVideoFileFPS(rtmpUrl)
  189. ])
  190. const resolutionsEnabled = CONFIG.LIVE.TRANSCODING.ENABLED
  191. ? computeResolutionsToTranscode(resolutionResult.videoFileResolution, 'live')
  192. : []
  193. const allResolutions = resolutionsEnabled.concat([ session.videoHeight ])
  194. logger.info('Will mux/transcode live video of original resolution %d.', session.videoHeight, { allResolutions })
  195. const [ videoStreamingPlaylist ] = await VideoStreamingPlaylistModel.upsert({
  196. videoId: video.id,
  197. playlistUrl,
  198. segmentsSha256Url: WEBSERVER.URL + VideoStreamingPlaylistModel.getHlsSha256SegmentsStaticPath(video.uuid, video.isLive),
  199. p2pMediaLoaderInfohashes: VideoStreamingPlaylistModel.buildP2PMediaLoaderInfoHashes(playlistUrl, allResolutions),
  200. p2pMediaLoaderPeerVersion: P2P_MEDIA_LOADER_PEER_VERSION,
  201. type: VideoStreamingPlaylistType.HLS
  202. }, { returning: true }) as [ MStreamingPlaylist, boolean ]
  203. return this.runMuxing({
  204. sessionId,
  205. videoLive,
  206. playlist: videoStreamingPlaylist,
  207. rtmpUrl,
  208. fps,
  209. allResolutions
  210. })
  211. }
  212. private async runMuxing (options: {
  213. sessionId: string
  214. videoLive: MVideoLiveVideo
  215. playlist: MStreamingPlaylist
  216. rtmpUrl: string
  217. fps: number
  218. allResolutions: number[]
  219. }) {
  220. const { sessionId, videoLive, playlist, allResolutions, fps, rtmpUrl } = options
  221. const startStreamDateTime = new Date().getTime()
  222. const user = await UserModel.loadByLiveId(videoLive.id)
  223. if (!this.livesPerUser.has(user.id)) {
  224. this.livesPerUser.set(user.id, [])
  225. }
  226. const currentUserLive = { liveId: videoLive.id, videoId: videoLive.videoId, size: 0 }
  227. const livesOfUser = this.livesPerUser.get(user.id)
  228. livesOfUser.push(currentUserLive)
  229. for (let i = 0; i < allResolutions.length; i++) {
  230. const resolution = allResolutions[i]
  231. const file = new VideoFileModel({
  232. resolution,
  233. size: -1,
  234. extname: '.ts',
  235. infoHash: null,
  236. fps,
  237. videoStreamingPlaylistId: playlist.id
  238. })
  239. VideoFileModel.customUpsert(file, 'streaming-playlist', null)
  240. .catch(err => logger.error('Cannot create file for live streaming.', { err }))
  241. }
  242. const outPath = getHLSDirectory(videoLive.Video)
  243. await ensureDir(outPath)
  244. const replayDirectory = join(outPath, VIDEO_LIVE.REPLAY_DIRECTORY)
  245. if (videoLive.saveReplay === true) {
  246. await ensureDir(replayDirectory)
  247. }
  248. const videoUUID = videoLive.Video.uuid
  249. const ffmpegExec = CONFIG.LIVE.TRANSCODING.ENABLED
  250. ? await getLiveTranscodingCommand({
  251. rtmpUrl,
  252. outPath,
  253. resolutions: allResolutions,
  254. fps,
  255. availableEncoders,
  256. profile: 'default'
  257. })
  258. : getLiveMuxingCommand(rtmpUrl, outPath)
  259. logger.info('Running live muxing/transcoding for %s.', videoUUID)
  260. this.transSessions.set(sessionId, ffmpegExec)
  261. const tsWatcher = chokidar.watch(outPath + '/*.ts')
  262. const segmentsToProcessPerPlaylist: { [playlistId: string]: string[] } = {}
  263. const playlistIdMatcher = /^([\d+])-/
  264. const addHandler = segmentPath => {
  265. logger.debug('Live add handler of %s.', segmentPath)
  266. const playlistId = basename(segmentPath).match(playlistIdMatcher)[0]
  267. const segmentsToProcess = segmentsToProcessPerPlaylist[playlistId] || []
  268. this.processSegments(outPath, videoUUID, videoLive, segmentsToProcess)
  269. segmentsToProcessPerPlaylist[playlistId] = [ segmentPath ]
  270. // Duration constraint check
  271. if (this.isDurationConstraintValid(startStreamDateTime) !== true) {
  272. logger.info('Stopping session of %s: max duration exceeded.', videoUUID)
  273. this.stopSessionOf(videoLive.videoId)
  274. }
  275. // Check user quota if the user enabled replay saving
  276. if (videoLive.saveReplay === true) {
  277. stat(segmentPath)
  278. .then(segmentStat => {
  279. currentUserLive.size += segmentStat.size
  280. })
  281. .then(() => this.isQuotaConstraintValid(user, videoLive))
  282. .then(quotaValid => {
  283. if (quotaValid !== true) {
  284. logger.info('Stopping session of %s: user quota exceeded.', videoUUID)
  285. this.stopSessionOf(videoLive.videoId)
  286. }
  287. })
  288. .catch(err => logger.error('Cannot stat %s or check quota of %d.', segmentPath, user.id, { err }))
  289. }
  290. }
  291. const deleteHandler = segmentPath => this.removeSegmentSha(videoUUID, segmentPath)
  292. tsWatcher.on('add', p => addHandler(p))
  293. tsWatcher.on('unlink', p => deleteHandler(p))
  294. const masterWatcher = chokidar.watch(outPath + '/master.m3u8')
  295. masterWatcher.on('add', async () => {
  296. try {
  297. const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoLive.videoId)
  298. video.state = VideoState.PUBLISHED
  299. await video.save()
  300. videoLive.Video = video
  301. setTimeout(() => {
  302. federateVideoIfNeeded(video, false)
  303. .catch(err => logger.error('Cannot federate live video %s.', video.url, { err }))
  304. PeerTubeSocket.Instance.sendVideoLiveNewState(video)
  305. }, VIDEO_LIVE.SEGMENT_TIME_SECONDS * 1000 * VIDEO_LIVE.EDGE_LIVE_DELAY_SEGMENTS_NOTIFICATION)
  306. } catch (err) {
  307. logger.error('Cannot save/federate live video %d.', videoLive.videoId, { err })
  308. } finally {
  309. masterWatcher.close()
  310. .catch(err => logger.error('Cannot close master watcher of %s.', outPath, { err }))
  311. }
  312. })
  313. const onFFmpegEnded = () => {
  314. logger.info('RTMP transmuxing for video %s ended. Scheduling cleanup', rtmpUrl)
  315. this.transSessions.delete(sessionId)
  316. this.watchersPerVideo.delete(videoLive.videoId)
  317. this.videoSessions.delete(videoLive.videoId)
  318. const newLivesPerUser = this.livesPerUser.get(user.id)
  319. .filter(o => o.liveId !== videoLive.id)
  320. this.livesPerUser.set(user.id, newLivesPerUser)
  321. setTimeout(() => {
  322. // Wait latest segments generation, and close watchers
  323. Promise.all([ tsWatcher.close(), masterWatcher.close() ])
  324. .then(() => {
  325. // Process remaining segments hash
  326. for (const key of Object.keys(segmentsToProcessPerPlaylist)) {
  327. this.processSegments(outPath, videoUUID, videoLive, segmentsToProcessPerPlaylist[key])
  328. }
  329. })
  330. .catch(err => logger.error('Cannot close watchers of %s or process remaining hash segments.', outPath, { err }))
  331. this.onEndTransmuxing(videoLive.Video.id)
  332. .catch(err => logger.error('Error in closed transmuxing.', { err }))
  333. }, 1000)
  334. }
  335. ffmpegExec.on('error', (err, stdout, stderr) => {
  336. onFFmpegEnded()
  337. // Don't care that we killed the ffmpeg process
  338. if (err?.message?.includes('Exiting normally')) return
  339. logger.error('Live transcoding error.', { err, stdout, stderr })
  340. this.abortSession(sessionId)
  341. })
  342. ffmpegExec.on('end', () => onFFmpegEnded())
  343. ffmpegExec.run()
  344. }
  345. private async onEndTransmuxing (videoId: number, cleanupNow = false) {
  346. try {
  347. const fullVideo = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
  348. if (!fullVideo) return
  349. const live = await VideoLiveModel.loadByVideoId(videoId)
  350. if (!live.permanentLive) {
  351. JobQueue.Instance.createJob({
  352. type: 'video-live-ending',
  353. payload: {
  354. videoId: fullVideo.id
  355. }
  356. }, { delay: cleanupNow ? 0 : VIDEO_LIVE.CLEANUP_DELAY })
  357. fullVideo.state = VideoState.LIVE_ENDED
  358. } else {
  359. fullVideo.state = VideoState.WAITING_FOR_LIVE
  360. }
  361. await fullVideo.save()
  362. PeerTubeSocket.Instance.sendVideoLiveNewState(fullVideo)
  363. await federateVideoIfNeeded(fullVideo, false)
  364. } catch (err) {
  365. logger.error('Cannot save/federate new video state of live streaming.', { err })
  366. }
  367. }
  368. private async addSegmentSha (videoUUID: string, segmentPath: string) {
  369. const segmentName = basename(segmentPath)
  370. logger.debug('Adding live sha segment %s.', segmentPath)
  371. const shaResult = await buildSha256Segment(segmentPath)
  372. if (!this.segmentsSha256.has(videoUUID)) {
  373. this.segmentsSha256.set(videoUUID, new Map())
  374. }
  375. const filesMap = this.segmentsSha256.get(videoUUID)
  376. filesMap.set(segmentName, shaResult)
  377. }
  378. private removeSegmentSha (videoUUID: string, segmentPath: string) {
  379. const segmentName = basename(segmentPath)
  380. logger.debug('Removing live sha segment %s.', segmentPath)
  381. const filesMap = this.segmentsSha256.get(videoUUID)
  382. if (!filesMap) {
  383. logger.warn('Unknown files map to remove sha for %s.', videoUUID)
  384. return
  385. }
  386. if (!filesMap.has(segmentName)) {
  387. logger.warn('Unknown segment in files map for video %s and segment %s.', videoUUID, segmentPath)
  388. return
  389. }
  390. filesMap.delete(segmentName)
  391. }
  392. private isDurationConstraintValid (streamingStartTime: number) {
  393. const maxDuration = CONFIG.LIVE.MAX_DURATION
  394. // No limit
  395. if (maxDuration < 0) return true
  396. const now = new Date().getTime()
  397. const max = streamingStartTime + maxDuration
  398. return now <= max
  399. }
  400. private async isQuotaConstraintValid (user: MUserId, live: MVideoLive) {
  401. if (live.saveReplay !== true) return true
  402. return this.isAbleToUploadVideoWithCache(user.id)
  403. }
  404. private async updateLiveViews () {
  405. if (!this.isRunning()) return
  406. if (!isTestInstance()) logger.info('Updating live video views.')
  407. for (const videoId of this.watchersPerVideo.keys()) {
  408. const notBefore = new Date().getTime() - VIEW_LIFETIME.LIVE
  409. const watchers = this.watchersPerVideo.get(videoId)
  410. const numWatchers = watchers.length
  411. const video = await VideoModel.loadAndPopulateAccountAndServerAndTags(videoId)
  412. video.views = numWatchers
  413. await video.save()
  414. await federateVideoIfNeeded(video, false)
  415. PeerTubeSocket.Instance.sendVideoViewsUpdate(video)
  416. // Only keep not expired watchers
  417. const newWatchers = watchers.filter(w => w > notBefore)
  418. this.watchersPerVideo.set(videoId, newWatchers)
  419. logger.debug('New live video views for %s is %d.', video.url, numWatchers)
  420. }
  421. }
  422. private async handleBrokenLives () {
  423. const videoIds = await VideoModel.listPublishedLiveIds()
  424. for (const id of videoIds) {
  425. await this.onEndTransmuxing(id, true)
  426. }
  427. }
  428. static get Instance () {
  429. return this.instance || (this.instance = new this())
  430. }
  431. }
  432. // ---------------------------------------------------------------------------
  433. export {
  434. LiveManager
  435. }