hlssink3: fix segment paths in playlist file

This commit is contained in:
Rafael Caricio 2021-12-07 00:38:34 +01:00 committed by Sebastian Dröge
parent 86021d637b
commit 9ae8f0d330
4 changed files with 227 additions and 23 deletions

View file

@ -11,7 +11,6 @@ rust-version = "1.56"
[dependencies] [dependencies]
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_10"] } gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_10"] }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
glib = { git = "https://github.com/gtk-rs/gtk-rs-core" } glib = { git = "https://github.com/gtk-rs/gtk-rs-core" }
gio = { git = "https://github.com/gtk-rs/gtk-rs-core" } gio = { git = "https://github.com/gtk-rs/gtk-rs-core" }
once_cell = "1.7.2" once_cell = "1.7.2"

View file

@ -319,16 +319,11 @@ impl HlsSink3 {
fn segment_filename(&self, state: &mut StartedState) -> String { fn segment_filename(&self, state: &mut StartedState) -> String {
assert!(state.current_segment_location.is_some()); assert!(state.current_segment_location.is_some());
let segment_filename = state.current_segment_location.take().unwrap(); let segment_filename = path_basename(state.current_segment_location.take().unwrap());
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
if let Some(playlist_root) = &settings.playlist_root { if let Some(playlist_root) = &settings.playlist_root {
format!( format!("{}/{}", playlist_root, segment_filename)
"{}{}{}",
playlist_root,
std::path::MAIN_SEPARATOR,
segment_filename
)
} else { } else {
segment_filename segment_filename
} }
@ -427,7 +422,7 @@ impl ObjectImpl for HlsSink3 {
glib::ParamSpecString::new( glib::ParamSpecString::new(
"playlist-root", "playlist-root",
"Playlist Root", "Playlist Root",
"Location of the playlist to write.", "Base path for the segments in the playlist file.",
None, None,
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
), ),
@ -492,7 +487,9 @@ impl ObjectImpl for HlsSink3 {
.get::<Option<String>>() .get::<Option<String>>()
.expect("type checked upstream") .expect("type checked upstream")
.unwrap_or_else(|| DEFAULT_LOCATION.into()); .unwrap_or_else(|| DEFAULT_LOCATION.into());
settings.segment_formatter = SegmentFormatter::new(&settings.location).unwrap(); settings.segment_formatter = SegmentFormatter::new(&settings.location).expect(
"A string containing `%03d` pattern must be used (can be any number from 0-9)",
);
settings settings
.splitmuxsink .splitmuxsink
.set_property("location", &settings.location); .set_property("location", &settings.location);
@ -869,3 +866,26 @@ impl ElementImpl for HlsSink3 {
} }
} }
} }
/// The content of the last item of a path separated by `/` character.
fn path_basename(name: impl AsRef<str>) -> String {
name.as_ref().split('/').last().unwrap().to_string()
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn can_extract_basenames() {
for (input, output) in [
("", ""),
("value", "value"),
("/my/nice/path.ts", "path.ts"),
("file.ts", "file.ts"),
("https://localhost/output/file.vtt", "file.vtt"),
] {
assert_eq!(path_basename(input), output);
}
}
}

View file

@ -163,10 +163,10 @@ pub enum PlaylistRenderState {
/// part003.ts /// part003.ts
/// part004.ts /// part004.ts
/// ``` /// ```
/// Then we can use the segment pattern value as `"part%03.ts"`: /// Then we can use the segment pattern value as `"part%03d.ts"`:
/// ///
/// ```rust,ignore /// ```rust,ignore
/// let formatter = SegmentFormatter::new("part%03.ts").unwrap(); /// let formatter = SegmentFormatter::new("part%03d.ts").unwrap();
/// assert_eq!(formatter.segment(1), "part001.ts"); /// assert_eq!(formatter.segment(1), "part001.ts");
/// assert_eq!(formatter.segment(2), "part002.ts"); /// assert_eq!(formatter.segment(2), "part002.ts");
/// assert_eq!(formatter.segment(3), "part003.ts"); /// assert_eq!(formatter.segment(3), "part003.ts");

View file

@ -11,7 +11,8 @@ use gio::prelude::*;
use gst::gst_info; use gst::gst_info;
use gst::prelude::*; use gst::prelude::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::sync::mpsc; use std::io::Write;
use std::sync::{mpsc, Arc, Mutex};
use std::time::Duration; use std::time::Duration;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -67,6 +68,38 @@ enum HlsSinkEvent {
DeleteFragment(String), DeleteFragment(String),
} }
/// Represents a HLS playlist file that writes to a shared string.
///
/// Only used in tests to check the resulting playlist content without the need to write
/// to actual filesystem.
struct MemoryPlaylistFile {
/// Reference to the content handler.
handler: Arc<Mutex<String>>,
}
impl MemoryPlaylistFile {
/// Whenever we know the file will be overridden we call this method to clear the contents.
fn clear_content(&self) {
let mut string = self.handler.lock().unwrap();
string.clear();
}
}
impl Write for MemoryPlaylistFile {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
// We don't need to handle errors here as this is used only in tests
let value = std::str::from_utf8(buf).unwrap();
let mut string = self.handler.lock().unwrap();
string.push_str(value);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
// We are writing to a `String` object, no need to flush content
Ok(())
}
}
#[test] #[test]
fn test_hlssink3_element_with_video_content() -> Result<(), ()> { fn test_hlssink3_element_with_video_content() -> Result<(), ()> {
init(); init();
@ -76,22 +109,24 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> {
let pipeline = gst::Pipeline::new(Some("video_pipeline")); let pipeline = gst::Pipeline::new(Some("video_pipeline"));
let video_src = try_create_element!("videotestsrc"); let video_src = try_create_element!("videotestsrc");
video_src.set_property("is-live", &true); video_src.set_property("is-live", true);
video_src.set_property("num-buffers", &BUFFER_NB); video_src.set_property("num-buffers", BUFFER_NB);
let x264enc = try_create_element!("x264enc"); let x264enc = try_create_element!("x264enc");
let h264parse = try_create_element!("h264parse"); let h264parse = try_create_element!("h264parse");
let hlssink3 = gst::ElementFactory::make("hlssink3", Some("test_hlssink3")) let hlssink3 = gst::ElementFactory::make("hlssink3", Some("test_hlssink3"))
.expect("Must be able to instantiate hlssink3"); .expect("Must be able to instantiate hlssink3");
hlssink3.set_property("target-duration", &2u32); hlssink3.set_property("target-duration", 2u32);
hlssink3.set_property("playlist-length", &2u32); hlssink3.set_property("playlist-length", 2u32);
hlssink3.set_property("max-files", &2u32); hlssink3.set_property("max-files", 2u32);
let (hls_events_sender, hls_events_receiver) = mpsc::sync_channel(20); let (hls_events_sender, hls_events_receiver) = mpsc::sync_channel(20);
let playlist_content = Arc::new(Mutex::new(String::from("")));
hlssink3.connect("get-playlist-stream", false, { hlssink3.connect("get-playlist-stream", false, {
let hls_events_sender = hls_events_sender.clone(); let hls_events_sender = hls_events_sender.clone();
let playlist_content = playlist_content.clone();
move |args| { move |args| {
let location = args[1].get::<String>().expect("No location given"); let location = args[1].get::<String>().expect("No location given");
@ -99,8 +134,14 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> {
.try_send(HlsSinkEvent::GetPlaylistStream(location)) .try_send(HlsSinkEvent::GetPlaylistStream(location))
.expect("Send playlist event"); .expect("Send playlist event");
let stream = gio::MemoryOutputStream::new_resizable(); // We need an owned type to pass to `gio::WriteOutputStream`
Some(stream.to_value()) let playlist = MemoryPlaylistFile {
handler: Arc::clone(&playlist_content),
};
// Since here a new file will be created, the content is cleared
playlist.clear_content();
let output = gio::WriteOutputStream::new(playlist);
Some(output.to_value())
} }
}); });
@ -182,6 +223,20 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> {
}; };
assert_eq!(expected_ordering_of_events, actual_events); assert_eq!(expected_ordering_of_events, actual_events);
let contents = playlist_content.lock().unwrap();
assert_eq!(
r###"#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:2
#EXT-X-MEDIA-SEQUENCE:4
#EXTINF:2,
segment00003.ts
#EXTINF:0.3,
segment00004.ts
"###,
contents.to_string()
);
Ok(()) Ok(())
} }
@ -194,8 +249,8 @@ fn test_hlssink3_element_with_audio_content() -> Result<(), ()> {
let pipeline = gst::Pipeline::new(Some("audio_pipeline")); let pipeline = gst::Pipeline::new(Some("audio_pipeline"));
let audio_src = try_create_element!("audiotestsrc"); let audio_src = try_create_element!("audiotestsrc");
audio_src.set_property("is-live", &true); audio_src.set_property("is-live", true);
audio_src.set_property("num-buffers", &BUFFER_NB); audio_src.set_property("num-buffers", BUFFER_NB);
let hls_avenc_aac = try_or_pause!(gst::ElementFactory::make( let hls_avenc_aac = try_or_pause!(gst::ElementFactory::make(
"avenc_aac", "avenc_aac",
@ -203,7 +258,7 @@ fn test_hlssink3_element_with_audio_content() -> Result<(), ()> {
)); ));
let hlssink3 = gst::ElementFactory::make("hlssink3", Some("hlssink3")) let hlssink3 = gst::ElementFactory::make("hlssink3", Some("hlssink3"))
.expect("Must be able to instantiate hlssink3"); .expect("Must be able to instantiate hlssink3");
hlssink3.set_property("target-duration", &6u32); hlssink3.set_property("target-duration", 6u32);
hlssink3.connect("get-playlist-stream", false, move |_args| { hlssink3.connect("get-playlist-stream", false, move |_args| {
let stream = gio::MemoryOutputStream::new_resizable(); let stream = gio::MemoryOutputStream::new_resizable();
@ -227,6 +282,7 @@ fn test_hlssink3_element_with_audio_content() -> Result<(), ()> {
pipeline.set_state(gst::State::Playing).unwrap(); pipeline.set_state(gst::State::Playing).unwrap();
gst_info!(CAT, "audio_pipeline: waiting for {} buffers", BUFFER_NB); gst_info!(CAT, "audio_pipeline: waiting for {} buffers", BUFFER_NB);
let mut eos = false; let mut eos = false;
let bus = pipeline.bus().unwrap(); let bus = pipeline.bus().unwrap();
while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) { while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) {
@ -245,3 +301,132 @@ fn test_hlssink3_element_with_audio_content() -> Result<(), ()> {
assert!(eos); assert!(eos);
Ok(()) Ok(())
} }
#[test]
fn test_hlssink3_write_correct_playlist_content() -> Result<(), ()> {
init();
const BUFFER_NB: i32 = 50;
let pipeline = gst::Pipeline::new(Some("video_pipeline"));
let video_src = try_create_element!("videotestsrc");
video_src.set_property("is-live", true);
video_src.set_property("num-buffers", BUFFER_NB);
let x264enc = try_create_element!("x264enc");
let h264parse = try_create_element!("h264parse");
let hlssink3 = gst::ElementFactory::make("hlssink3", Some("test_hlssink3"))
.expect("Must be able to instantiate hlssink3");
hlssink3.set_properties(&[
("location", &"/www/media/segments/my-own-filename-%03d.ts"),
("playlist-location", &"/www/media/main.m3u8"),
("playlist-root", &"segments"),
]);
let (hls_events_sender, hls_events_receiver) = mpsc::sync_channel(20);
let playlist_content = Arc::new(Mutex::new(String::from("")));
hlssink3.connect("get-playlist-stream", false, {
let hls_events_sender = hls_events_sender.clone();
let playlist_content = playlist_content.clone();
move |args| {
let location = args[1].get::<String>().expect("No location given");
hls_events_sender
.try_send(HlsSinkEvent::GetPlaylistStream(location))
.expect("Send playlist event");
// We need an owned type to pass to `gio::WriteOutputStream`
let playlist = MemoryPlaylistFile {
handler: Arc::clone(&playlist_content),
};
// Since here a new file will be created, the content is cleared
playlist.clear_content();
let output = gio::WriteOutputStream::new(playlist);
Some(output.to_value())
}
});
hlssink3.connect("get-fragment-stream", false, {
let hls_events_sender = hls_events_sender.clone();
move |args| {
let location = args[1].get::<String>().expect("No location given");
hls_events_sender
.try_send(HlsSinkEvent::GetFragmentStream(location))
.expect("Send fragment event");
let stream = gio::MemoryOutputStream::new_resizable();
Some(stream.to_value())
}
});
hlssink3.connect("delete-fragment", false, move |args| {
let location = args[1].get::<String>().expect("No location given");
hls_events_sender
.try_send(HlsSinkEvent::DeleteFragment(location))
.expect("Send delete fragment event");
Some(true.to_value())
});
try_or_pause!(pipeline.add_many(&[&video_src, &x264enc, &h264parse, &hlssink3,]));
try_or_pause!(gst::Element::link_many(&[
&video_src, &x264enc, &h264parse, &hlssink3
]));
pipeline.set_state(gst::State::Playing).unwrap();
gst_info!(
CAT,
"hlssink3_video_pipeline: waiting for {} buffers",
BUFFER_NB
);
let mut eos = false;
let bus = pipeline.bus().unwrap();
while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
eos = true;
break;
}
MessageView::Error(..) => unreachable!(),
_ => (),
}
}
pipeline.set_state(gst::State::Null).unwrap();
assert!(eos);
// Collect all events triggered during execution of the pipeline
let mut actual_events = Vec::new();
while let Ok(event) = hls_events_receiver.recv_timeout(Duration::from_millis(1)) {
actual_events.push(event);
}
let expected_ordering_of_events = {
use self::HlsSinkEvent::*;
vec![
GetFragmentStream("/www/media/segments/my-own-filename-000.ts".to_string()),
GetPlaylistStream("/www/media/main.m3u8".to_string()),
GetPlaylistStream("/www/media/main.m3u8".to_string()),
]
};
assert_eq!(expected_ordering_of_events, actual_events);
let contents = playlist_content.lock().unwrap();
assert_eq!(
r###"#EXTM3U
#EXT-X-VERSION:3
#EXT-X-TARGETDURATION:15
#EXT-X-MEDIA-SEQUENCE:1
#EXTINF:1.633,
segments/my-own-filename-000.ts
"###,
contents.to_string()
);
Ok(())
}