diff --git a/net/hlssink3/Cargo.toml b/net/hlssink3/Cargo.toml index 7e1bde32..4161e099 100644 --- a/net/hlssink3/Cargo.toml +++ b/net/hlssink3/Cargo.toml @@ -11,7 +11,6 @@ rust-version = "1.56" [dependencies] gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_10"] } gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } -gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } glib = { git = "https://github.com/gtk-rs/gtk-rs-core" } gio = { git = "https://github.com/gtk-rs/gtk-rs-core" } once_cell = "1.7.2" diff --git a/net/hlssink3/src/imp.rs b/net/hlssink3/src/imp.rs index 7900dcc7..7c51b006 100644 --- a/net/hlssink3/src/imp.rs +++ b/net/hlssink3/src/imp.rs @@ -319,16 +319,11 @@ impl HlsSink3 { fn segment_filename(&self, state: &mut StartedState) -> String { assert!(state.current_segment_location.is_some()); - let segment_filename = state.current_segment_location.take().unwrap(); + let segment_filename = path_basename(state.current_segment_location.take().unwrap()); let settings = self.settings.lock().unwrap(); if let Some(playlist_root) = &settings.playlist_root { - format!( - "{}{}{}", - playlist_root, - std::path::MAIN_SEPARATOR, - segment_filename - ) + format!("{}/{}", playlist_root, segment_filename) } else { segment_filename } @@ -427,7 +422,7 @@ impl ObjectImpl for HlsSink3 { glib::ParamSpecString::new( "playlist-root", "Playlist Root", - "Location of the playlist to write.", + "Base path for the segments in the playlist file.", None, glib::ParamFlags::READWRITE, ), @@ -492,7 +487,9 @@ impl ObjectImpl for HlsSink3 { .get::>() .expect("type checked upstream") .unwrap_or_else(|| DEFAULT_LOCATION.into()); - settings.segment_formatter = SegmentFormatter::new(&settings.location).unwrap(); + settings.segment_formatter = SegmentFormatter::new(&settings.location).expect( + "A string containing `%03d` pattern must be used (can be any number from 0-9)", + ); settings .splitmuxsink .set_property("location", &settings.location); @@ -869,3 +866,26 @@ impl ElementImpl for HlsSink3 { } } } + +/// The content of the last item of a path separated by `/` character. +fn path_basename(name: impl AsRef) -> String { + name.as_ref().split('/').last().unwrap().to_string() +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn can_extract_basenames() { + for (input, output) in [ + ("", ""), + ("value", "value"), + ("/my/nice/path.ts", "path.ts"), + ("file.ts", "file.ts"), + ("https://localhost/output/file.vtt", "file.vtt"), + ] { + assert_eq!(path_basename(input), output); + } + } +} diff --git a/net/hlssink3/src/playlist.rs b/net/hlssink3/src/playlist.rs index a5d228f1..40ca4b18 100644 --- a/net/hlssink3/src/playlist.rs +++ b/net/hlssink3/src/playlist.rs @@ -163,10 +163,10 @@ pub enum PlaylistRenderState { /// part003.ts /// part004.ts /// ``` -/// Then we can use the segment pattern value as `"part%03.ts"`: +/// Then we can use the segment pattern value as `"part%03d.ts"`: /// /// ```rust,ignore -/// let formatter = SegmentFormatter::new("part%03.ts").unwrap(); +/// let formatter = SegmentFormatter::new("part%03d.ts").unwrap(); /// assert_eq!(formatter.segment(1), "part001.ts"); /// assert_eq!(formatter.segment(2), "part002.ts"); /// assert_eq!(formatter.segment(3), "part003.ts"); diff --git a/net/hlssink3/tests/hlssink3.rs b/net/hlssink3/tests/hlssink3.rs index 9aaf094e..21db7fe8 100644 --- a/net/hlssink3/tests/hlssink3.rs +++ b/net/hlssink3/tests/hlssink3.rs @@ -11,7 +11,8 @@ use gio::prelude::*; use gst::gst_info; use gst::prelude::*; use once_cell::sync::Lazy; -use std::sync::mpsc; +use std::io::Write; +use std::sync::{mpsc, Arc, Mutex}; use std::time::Duration; static CAT: Lazy = Lazy::new(|| { @@ -67,6 +68,38 @@ enum HlsSinkEvent { DeleteFragment(String), } +/// Represents a HLS playlist file that writes to a shared string. +/// +/// Only used in tests to check the resulting playlist content without the need to write +/// to actual filesystem. +struct MemoryPlaylistFile { + /// Reference to the content handler. + handler: Arc>, +} + +impl MemoryPlaylistFile { + /// Whenever we know the file will be overridden we call this method to clear the contents. + fn clear_content(&self) { + let mut string = self.handler.lock().unwrap(); + string.clear(); + } +} + +impl Write for MemoryPlaylistFile { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + // We don't need to handle errors here as this is used only in tests + let value = std::str::from_utf8(buf).unwrap(); + let mut string = self.handler.lock().unwrap(); + string.push_str(value); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + // We are writing to a `String` object, no need to flush content + Ok(()) + } +} + #[test] fn test_hlssink3_element_with_video_content() -> Result<(), ()> { init(); @@ -76,22 +109,24 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> { let pipeline = gst::Pipeline::new(Some("video_pipeline")); let video_src = try_create_element!("videotestsrc"); - video_src.set_property("is-live", &true); - video_src.set_property("num-buffers", &BUFFER_NB); + video_src.set_property("is-live", true); + video_src.set_property("num-buffers", BUFFER_NB); let x264enc = try_create_element!("x264enc"); let h264parse = try_create_element!("h264parse"); let hlssink3 = gst::ElementFactory::make("hlssink3", Some("test_hlssink3")) .expect("Must be able to instantiate hlssink3"); - hlssink3.set_property("target-duration", &2u32); - hlssink3.set_property("playlist-length", &2u32); - hlssink3.set_property("max-files", &2u32); + hlssink3.set_property("target-duration", 2u32); + hlssink3.set_property("playlist-length", 2u32); + hlssink3.set_property("max-files", 2u32); let (hls_events_sender, hls_events_receiver) = mpsc::sync_channel(20); + let playlist_content = Arc::new(Mutex::new(String::from(""))); hlssink3.connect("get-playlist-stream", false, { let hls_events_sender = hls_events_sender.clone(); + let playlist_content = playlist_content.clone(); move |args| { let location = args[1].get::().expect("No location given"); @@ -99,8 +134,14 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> { .try_send(HlsSinkEvent::GetPlaylistStream(location)) .expect("Send playlist event"); - let stream = gio::MemoryOutputStream::new_resizable(); - Some(stream.to_value()) + // We need an owned type to pass to `gio::WriteOutputStream` + let playlist = MemoryPlaylistFile { + handler: Arc::clone(&playlist_content), + }; + // Since here a new file will be created, the content is cleared + playlist.clear_content(); + let output = gio::WriteOutputStream::new(playlist); + Some(output.to_value()) } }); @@ -182,6 +223,20 @@ fn test_hlssink3_element_with_video_content() -> Result<(), ()> { }; assert_eq!(expected_ordering_of_events, actual_events); + let contents = playlist_content.lock().unwrap(); + assert_eq!( + r###"#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-TARGETDURATION:2 +#EXT-X-MEDIA-SEQUENCE:4 +#EXTINF:2, +segment00003.ts +#EXTINF:0.3, +segment00004.ts +"###, + contents.to_string() + ); + Ok(()) } @@ -194,8 +249,8 @@ fn test_hlssink3_element_with_audio_content() -> Result<(), ()> { let pipeline = gst::Pipeline::new(Some("audio_pipeline")); let audio_src = try_create_element!("audiotestsrc"); - audio_src.set_property("is-live", &true); - audio_src.set_property("num-buffers", &BUFFER_NB); + audio_src.set_property("is-live", true); + audio_src.set_property("num-buffers", BUFFER_NB); let hls_avenc_aac = try_or_pause!(gst::ElementFactory::make( "avenc_aac", @@ -203,7 +258,7 @@ fn test_hlssink3_element_with_audio_content() -> Result<(), ()> { )); let hlssink3 = gst::ElementFactory::make("hlssink3", Some("hlssink3")) .expect("Must be able to instantiate hlssink3"); - hlssink3.set_property("target-duration", &6u32); + hlssink3.set_property("target-duration", 6u32); hlssink3.connect("get-playlist-stream", false, move |_args| { let stream = gio::MemoryOutputStream::new_resizable(); @@ -227,6 +282,7 @@ fn test_hlssink3_element_with_audio_content() -> Result<(), ()> { pipeline.set_state(gst::State::Playing).unwrap(); gst_info!(CAT, "audio_pipeline: waiting for {} buffers", BUFFER_NB); + let mut eos = false; let bus = pipeline.bus().unwrap(); while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) { @@ -245,3 +301,132 @@ fn test_hlssink3_element_with_audio_content() -> Result<(), ()> { assert!(eos); Ok(()) } + +#[test] +fn test_hlssink3_write_correct_playlist_content() -> Result<(), ()> { + init(); + + const BUFFER_NB: i32 = 50; + + let pipeline = gst::Pipeline::new(Some("video_pipeline")); + + let video_src = try_create_element!("videotestsrc"); + video_src.set_property("is-live", true); + video_src.set_property("num-buffers", BUFFER_NB); + + let x264enc = try_create_element!("x264enc"); + let h264parse = try_create_element!("h264parse"); + + let hlssink3 = gst::ElementFactory::make("hlssink3", Some("test_hlssink3")) + .expect("Must be able to instantiate hlssink3"); + hlssink3.set_properties(&[ + ("location", &"/www/media/segments/my-own-filename-%03d.ts"), + ("playlist-location", &"/www/media/main.m3u8"), + ("playlist-root", &"segments"), + ]); + + let (hls_events_sender, hls_events_receiver) = mpsc::sync_channel(20); + let playlist_content = Arc::new(Mutex::new(String::from(""))); + + hlssink3.connect("get-playlist-stream", false, { + let hls_events_sender = hls_events_sender.clone(); + let playlist_content = playlist_content.clone(); + move |args| { + let location = args[1].get::().expect("No location given"); + + hls_events_sender + .try_send(HlsSinkEvent::GetPlaylistStream(location)) + .expect("Send playlist event"); + + // We need an owned type to pass to `gio::WriteOutputStream` + let playlist = MemoryPlaylistFile { + handler: Arc::clone(&playlist_content), + }; + // Since here a new file will be created, the content is cleared + playlist.clear_content(); + let output = gio::WriteOutputStream::new(playlist); + Some(output.to_value()) + } + }); + + hlssink3.connect("get-fragment-stream", false, { + let hls_events_sender = hls_events_sender.clone(); + move |args| { + let location = args[1].get::().expect("No location given"); + + hls_events_sender + .try_send(HlsSinkEvent::GetFragmentStream(location)) + .expect("Send fragment event"); + + let stream = gio::MemoryOutputStream::new_resizable(); + Some(stream.to_value()) + } + }); + + hlssink3.connect("delete-fragment", false, move |args| { + let location = args[1].get::().expect("No location given"); + hls_events_sender + .try_send(HlsSinkEvent::DeleteFragment(location)) + .expect("Send delete fragment event"); + Some(true.to_value()) + }); + + try_or_pause!(pipeline.add_many(&[&video_src, &x264enc, &h264parse, &hlssink3,])); + try_or_pause!(gst::Element::link_many(&[ + &video_src, &x264enc, &h264parse, &hlssink3 + ])); + + pipeline.set_state(gst::State::Playing).unwrap(); + + gst_info!( + CAT, + "hlssink3_video_pipeline: waiting for {} buffers", + BUFFER_NB + ); + + let mut eos = false; + let bus = pipeline.bus().unwrap(); + while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) { + use gst::MessageView; + match msg.view() { + MessageView::Eos(..) => { + eos = true; + break; + } + MessageView::Error(..) => unreachable!(), + _ => (), + } + } + + pipeline.set_state(gst::State::Null).unwrap(); + assert!(eos); + + // Collect all events triggered during execution of the pipeline + let mut actual_events = Vec::new(); + while let Ok(event) = hls_events_receiver.recv_timeout(Duration::from_millis(1)) { + actual_events.push(event); + } + let expected_ordering_of_events = { + use self::HlsSinkEvent::*; + vec![ + GetFragmentStream("/www/media/segments/my-own-filename-000.ts".to_string()), + GetPlaylistStream("/www/media/main.m3u8".to_string()), + GetPlaylistStream("/www/media/main.m3u8".to_string()), + ] + }; + assert_eq!(expected_ordering_of_events, actual_events); + + let contents = playlist_content.lock().unwrap(); + assert_eq!( + r###"#EXTM3U +#EXT-X-VERSION:3 +#EXT-X-TARGETDURATION:15 +#EXT-X-MEDIA-SEQUENCE:1 +#EXTINF:1.633, +segments/my-own-filename-000.ts +"###, + contents.to_string() + ); + + Ok(()) +}