From 35d924748742f137d935578d7c668ed000f7850f Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Wed, 23 Feb 2022 00:52:09 +0100 Subject: [PATCH] fmp4mux: HLS VOD example Producing two audio playlists, a video playlist and a super manifest --- generic/fmp4/Cargo.toml | 2 + generic/fmp4/examples/hls_vod.rs | 479 +++++++++++++++++++++++++++++++ 2 files changed, 481 insertions(+) create mode 100644 generic/fmp4/examples/hls_vod.rs diff --git a/generic/fmp4/Cargo.toml b/generic/fmp4/Cargo.toml index 9d5a21563..cb55798a2 100644 --- a/generic/fmp4/Cargo.toml +++ b/generic/fmp4/Cargo.toml @@ -23,6 +23,8 @@ path = "src/lib.rs" [dev-dependencies] gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } +gst-pbutils = { package = "gstreamer-pbutils", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] } +m3u8-rs = "3.0" [build-dependencies] gst-plugin-version-helper = { path="../../version-helper" } diff --git a/generic/fmp4/examples/hls_vod.rs b/generic/fmp4/examples/hls_vod.rs new file mode 100644 index 000000000..11c9ec81d --- /dev/null +++ b/generic/fmp4/examples/hls_vod.rs @@ -0,0 +1,479 @@ +// Copyright (C) 2022 Mathieu Duponchelle +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +// This creates a 10 second VOD HLS stream with one video playlist and two audio +// playlists. Each segment is 2.5 second long. + +use gst::prelude::*; + +use std::path::{Path, PathBuf}; +use std::sync::{Arc, Mutex}; + +use anyhow::{anyhow, Error}; + +use m3u8_rs::{ + AlternativeMedia, AlternativeMediaType, MasterPlaylist, MediaPlaylist, MediaPlaylistType, + MediaSegment, VariantStream, +}; + +struct Segment { + duration: gst::ClockTime, + path: String, +} + +struct StreamState { + path: PathBuf, + segments: Vec, +} + +struct VideoStream { + name: String, + bitrate: u32, + width: i32, + height: i32, +} + +struct AudioStream { + name: String, + lang: String, + default: bool, + wave: String, +} + +struct State { + video_streams: Vec, + audio_streams: Vec, + all_mimes: Vec, + path: PathBuf, + wrote_manifest: bool, +} + +impl State { + fn maybe_write_manifest(&mut self) { + if self.wrote_manifest { + return; + } + + if self.all_mimes.len() < self.video_streams.len() + self.audio_streams.len() { + return; + } + + let mut all_mimes = self.all_mimes.clone(); + all_mimes.sort(); + all_mimes.dedup(); + + let playlist = MasterPlaylist { + version: 7, + variants: self + .video_streams + .iter() + .map(|stream| { + let mut path = PathBuf::new(); + + path.push(&stream.name); + path.push("manifest.m3u8"); + + VariantStream { + uri: path.as_path().display().to_string(), + bandwidth: stream.bitrate.to_string(), + codecs: Some(all_mimes.join(",")), + resolution: Some(format!("{}x{}", stream.width, stream.height)), + audio: Some("audio".to_string()), + ..Default::default() + } + }) + .collect(), + alternatives: self + .audio_streams + .iter() + .map(|stream| { + let mut path = PathBuf::new(); + path.push(&stream.name); + path.push("manifest.m3u8"); + + AlternativeMedia { + media_type: AlternativeMediaType::Audio, + uri: Some(path.as_path().display().to_string()), + group_id: "audio".to_string(), + language: Some(stream.lang.clone()), + name: stream.name.clone(), + default: stream.default, + autoselect: stream.default, + channels: Some("2".to_string()), + ..Default::default() + } + }) + .collect(), + independent_segments: true, + ..Default::default() + }; + + println!("Writing master manifest to {}", self.path.display()); + + let mut file = std::fs::File::create(&self.path).unwrap(); + playlist + .write_to(&mut file) + .expect("Failed to write master playlist"); + + self.wrote_manifest = true; + } +} + +pub fn make_element(element: &str, name: Option<&str>) -> Result { + gst::ElementFactory::make(element, name) + .map_err(|_| anyhow!("Failed to make element {}", element)) +} + +fn setup_appsink(appsink: &gst_app::AppSink, name: &str, path: &Path, is_video: bool) { + let mut path: PathBuf = path.into(); + path.push(name); + + let state = Arc::new(Mutex::new(StreamState { + segments: Vec::new(), + path, + })); + + appsink.set_buffer_list(true); + + let state_clone = state.clone(); + appsink.set_callbacks( + gst_app::AppSinkCallbacks::builder() + .new_sample(move |sink| { + let sample = sink.pull_sample().map_err(|_| gst::FlowError::Eos)?; + let mut state = state.lock().unwrap(); + + // The muxer only outputs non-empty buffer lists + let mut buffer_list = sample.buffer_list_owned().expect("no buffer list"); + assert!(!buffer_list.is_empty()); + + let mut first = buffer_list.get(0).unwrap(); + + // Each list contains a full segment, i.e. does not start with a DELTA_UNIT + assert!(!first.flags().contains(gst::BufferFlags::DELTA_UNIT)); + + // If the buffer has the DISCONT and HEADER flag set then it contains the media + // header, i.e. the `ftyp`, `moov` and other media boxes. + // + // This might be the initial header or the updated header at the end of the stream. + if first + .flags() + .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) + { + let mut path = state.path.clone(); + std::fs::create_dir_all(&path).expect("failed to create directory"); + path.push("init.cmfi"); + + println!("writing header to {}", path.display()); + let map = first.map_readable().unwrap(); + std::fs::write(path, &map).expect("failed to write header"); + drop(map); + + // Remove the header from the buffer list + buffer_list.make_mut().remove(0, 1); + + // If the list is now empty then it only contained the media header and nothing + // else. + if buffer_list.is_empty() { + return Ok(gst::FlowSuccess::Ok); + } + + // Otherwise get the next buffer and continue working with that. + first = buffer_list.get(0).unwrap(); + } + + // If the buffer only has the HEADER flag set then this is a segment header that is + // followed by one or more actual media buffers. + assert!(first.flags().contains(gst::BufferFlags::HEADER)); + + let mut path = state.path.clone(); + let basename = format!( + "segment_{}.{}", + state.segments.len() + 1, + if is_video { "cmfv" } else { "cmfa" } + ); + path.push(&basename); + println!("writing segment to {}", path.display()); + + let duration = first.duration().unwrap(); + + let mut file = std::fs::File::create(path).expect("failed to open fragment"); + for buffer in &*buffer_list { + use std::io::prelude::*; + + let map = buffer.map_readable().unwrap(); + file.write_all(&map).expect("failed to write fragment"); + } + + state.segments.push(Segment { + duration, + path: basename.to_string(), + }); + + Ok(gst::FlowSuccess::Ok) + }) + .eos(move |_sink| { + let state = state_clone.lock().unwrap(); + + // Now write the manifest + let mut path = state.path.clone(); + path.push("manifest.m3u8"); + + println!("writing manifest to {}", path.display()); + + let playlist = MediaPlaylist { + version: 7, + target_duration: 2.5, + media_sequence: 0, + segments: state + .segments + .iter() + .map(|segment| MediaSegment { + uri: segment.path.to_string(), + duration: (segment.duration.nseconds() as f64 + / gst::ClockTime::SECOND.nseconds() as f64) + as f32, + map: Some(m3u8_rs::Map { + uri: "init.cmfi".into(), + ..Default::default() + }), + ..Default::default() + }) + .collect(), + end_list: true, + playlist_type: Some(MediaPlaylistType::Vod), + i_frames_only: false, + start: None, + independent_segments: true, + ..Default::default() + }; + + let mut file = std::fs::File::create(path).unwrap(); + playlist + .write_to(&mut file) + .expect("Failed to write media playlist"); + }) + .build(), + ); +} + +fn probe_encoder(state: Arc>, enc: gst::Element) { + enc.static_pad("src").unwrap().add_probe( + gst::PadProbeType::EVENT_DOWNSTREAM, + move |_pad, info| match info.data { + Some(gst::PadProbeData::Event(ref ev)) => match ev.view() { + gst::EventView::Caps(e) => { + let mime = gst_pbutils::codec_utils_caps_get_mime_codec(&e.caps().to_owned()); + + let mut state = state.lock().unwrap(); + state.all_mimes.push(mime.unwrap().into()); + state.maybe_write_manifest(); + gst::PadProbeReturn::Remove + } + _ => gst::PadProbeReturn::Ok, + }, + _ => gst::PadProbeReturn::Ok, + }, + ); +} + +impl VideoStream { + fn setup( + &self, + state: Arc>, + pipeline: &gst::Pipeline, + path: &Path, + ) -> Result<(), Error> { + let src = make_element("videotestsrc", None)?; + let raw_capsfilter = make_element("capsfilter", None)?; + let timeoverlay = make_element("timeoverlay", None)?; + let enc = make_element("x264enc", None)?; + let h264_capsfilter = make_element("capsfilter", None)?; + let mux = make_element("cmafmux", None)?; + let appsink = make_element("appsink", None)?; + + pipeline.add_many(&[ + &src, + &raw_capsfilter, + &timeoverlay, + &enc, + &h264_capsfilter, + &mux, + &appsink, + ])?; + + gst::Element::link_many(&[ + &src, + &raw_capsfilter, + &timeoverlay, + &enc, + &h264_capsfilter, + &mux, + &appsink, + ])?; + + raw_capsfilter.set_property( + "caps", + gst::Caps::builder("video/x-raw") + .field("format", "I420") + .field("width", self.width) + .field("height", self.height) + .field("framerate", gst::Fraction::new(30, 1)) + .build(), + ); + + h264_capsfilter.set_property( + "caps", + gst::Caps::builder("video/x-h264") + .field("profile", "main") + .build(), + ); + + src.set_property("num-buffers", 300); + enc.set_property("bframes", 0u32); + enc.set_property("bitrate", self.bitrate / 1000u32); + mux.set_property("fragment-duration", gst::ClockTime::from_mseconds(2500)); + mux.set_property_from_str("header-update-mode", "update"); + mux.set_property("write-mehd", true); + + probe_encoder(state, enc); + + let appsink = appsink.downcast::().unwrap(); + + setup_appsink(&appsink, &self.name, path, true); + + Ok(()) + } +} + +impl AudioStream { + fn setup( + &self, + state: Arc>, + pipeline: &gst::Pipeline, + path: &Path, + ) -> Result<(), Error> { + let src = make_element("audiotestsrc", None)?; + let raw_capsfilter = make_element("capsfilter", None)?; + let enc = make_element("avenc_aac", None)?; + let mux = make_element("cmafmux", None)?; + let appsink = make_element("appsink", None)?; + + pipeline.add_many(&[&src, &raw_capsfilter, &enc, &mux, &appsink])?; + + gst::Element::link_many(&[&src, &raw_capsfilter, &enc, &mux, &appsink])?; + + src.set_property("num-buffers", 100); + src.set_property("samplesperbuffer", 4410); + src.set_property_from_str("wave", &self.wave); + raw_capsfilter.set_property( + "caps", + gst::Caps::builder("audio/x-raw") + .field("rate", 44100) + .build(), + ); + + mux.set_property("fragment-duration", gst::ClockTime::from_mseconds(2500)); + mux.set_property_from_str("header-update-mode", "update"); + mux.set_property("write-mehd", true); + + probe_encoder(state, enc); + + let appsink = appsink.downcast::().unwrap(); + + setup_appsink(&appsink, &self.name, path, false); + + Ok(()) + } +} + +fn main() -> Result<(), Error> { + gst::init()?; + + gstfmp4::plugin_register_static()?; + + let path = PathBuf::from("hls_vod_stream"); + + let pipeline = gst::Pipeline::new(None); + + std::fs::create_dir_all(&path).expect("failed to create directory"); + + let mut manifest_path = path.clone(); + manifest_path.push("manifest.m3u8"); + + let state = Arc::new(Mutex::new(State { + video_streams: vec![VideoStream { + name: "video_0".to_string(), + bitrate: 2_048_000, + width: 1280, + height: 720, + }], + audio_streams: vec![ + AudioStream { + name: "audio_0".to_string(), + lang: "eng".to_string(), + default: true, + wave: "sine".to_string(), + }, + AudioStream { + name: "audio_1".to_string(), + lang: "fre".to_string(), + default: false, + wave: "white-noise".to_string(), + }, + ], + all_mimes: vec![], + path: manifest_path.clone(), + wrote_manifest: false, + })); + + { + let state_lock = state.lock().unwrap(); + + for stream in &state_lock.video_streams { + stream.setup(state.clone(), &pipeline, &path)?; + } + + for stream in &state_lock.audio_streams { + stream.setup(state.clone(), &pipeline, &path)?; + } + } + + pipeline.set_state(gst::State::Playing)?; + + let bus = pipeline + .bus() + .expect("Pipeline without bus. Shouldn't happen!"); + + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + println!("EOS"); + break; + } + MessageView::Error(err) => { + pipeline.set_state(gst::State::Null)?; + eprintln!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error(), + err.debug().unwrap_or_else(|| "".into()), + ); + break; + } + _ => (), + } + } + + pipeline.set_state(gst::State::Null)?; + + Ok(()) +}