// Copyright (C) 2021 Rafael Caricio // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at // . // // SPDX-License-Identifier: MPL-2.0 use gio::prelude::*; use gst::prelude::*; use gsthlssink3::hlssink3::HlsSink3PlaylistType; use std::io::Write; use std::sync::LazyLock; use std::sync::{mpsc, Arc, Mutex}; use std::time::Duration; static CAT: LazyLock = LazyLock::new(|| { gst::DebugCategory::new( "hlssink3-test", gst::DebugColorFlags::empty(), Some("Flex HLS sink test"), ) }); macro_rules! try_or_pause { ($l:expr) => { match $l { Ok(v) => v, Err(err) => { eprintln!("Skipping Test: {:?}", err); return Ok(()); } } }; } macro_rules! try_create_element { ($l:expr, $n:expr) => { match gst::ElementFactory::find($l) { Some(factory) => factory.create().name($n).build().unwrap(), None => { eprintln!("Could not find {} ({}) plugin, skipping test", $l, $n); return Ok(()); } } }; ($l:expr) => {{ let alias: String = format!("test_{}", $l); try_create_element!($l, >::as_ref(&alias)) }}; } fn init() { use std::sync::Once; static INIT: Once = Once::new(); INIT.call_once(|| { gst::init().unwrap(); gsthlssink3::plugin_register_static().expect("hlssink3 test"); }); } #[derive(Debug, Clone, Eq, PartialEq)] enum HlsSinkEvent { GetPlaylistStream(String), GetFragmentStream(String), DeleteFragment(String), SegmentAddedMessage(String), } /// Represents a HLS playlist file that writes to a shared string. /// /// Only used in tests to check the resulting playlist content without the need to write /// to actual filesystem. struct MemoryPlaylistFile { /// Reference to the content handler. handler: Arc>, } impl MemoryPlaylistFile { /// Whenever we know the file will be overridden we call this method to clear the contents. fn clear_content(&self) { let mut string = self.handler.lock().unwrap(); string.clear(); } } impl Write for MemoryPlaylistFile { fn write(&mut self, buf: &[u8]) -> std::io::Result { // We don't need to handle errors here as this is used only in tests let value = std::str::from_utf8(buf).unwrap(); let mut string = self.handler.lock().unwrap(); string.push_str(value); Ok(buf.len()) } fn flush(&mut self) -> std::io::Result<()> { // We are writing to a `String` object, no need to flush content Ok(()) } } #[test] fn test_hlssink3_element_with_video_content() -> Result<(), ()> { init(); const BUFFER_NB: i32 = 250; let pipeline = gst::Pipeline::with_name("video_pipeline"); let video_src = try_create_element!("videotestsrc"); video_src.set_property("is-live", false); video_src.set_property("num-buffers", BUFFER_NB); let x264enc = try_create_element!("x264enc"); let h264parse = try_create_element!("h264parse"); let hlssink3 = gst::ElementFactory::make("hlssink3") .name("test_hlssink3") .property("target-duration", 2u32) .property("playlist-length", 2u32) .property("max-files", 2u32) .build() .expect("Must be able to instantiate hlssink3"); hlssink3.set_property("playlist-type", HlsSink3PlaylistType::Event); let pl_type: HlsSink3PlaylistType = hlssink3.property("playlist-type"); assert_eq!(pl_type, HlsSink3PlaylistType::Event); hlssink3.set_property_from_str("playlist-type", "unspecified"); let (hls_events_sender, hls_events_receiver) = mpsc::sync_channel(20); let (hls_messages_sender, hls_messages_receiver) = mpsc::sync_channel(10); let playlist_content = Arc::new(Mutex::new(String::from(""))); hlssink3.connect("get-playlist-stream", false, { let hls_events_sender = hls_events_sender.clone(); let playlist_content = playlist_content.clone(); move |args| { let location = args[1].get::().expect("No location given"); hls_events_sender .try_send(HlsSinkEvent::GetPlaylistStream(location)) .expect("Send playlist event"); // We need an owned type to pass to `gio::WriteOutputStream` let playlist = MemoryPlaylistFile { handler: Arc::clone(&playlist_content), }; // Since here a new file will be created, the content is cleared playlist.clear_content(); let output = gio::WriteOutputStream::new(playlist); Some(output.to_value()) } }); hlssink3.connect("get-fragment-stream", false, { let hls_events_sender = hls_events_sender.clone(); move |args| { let location = args[1].get::().expect("No location given"); hls_events_sender .try_send(HlsSinkEvent::GetFragmentStream(location)) .expect("Send fragment event"); let stream = gio::MemoryOutputStream::new_resizable(); Some(stream.to_value()) } }); hlssink3.connect("delete-fragment", false, { let hls_events_sender = hls_events_sender.clone(); move |args| { let location = args[1].get::().expect("No location given"); hls_events_sender .try_send(HlsSinkEvent::DeleteFragment(location)) .expect("Send delete fragment event"); Some(true.to_value()) } }); try_or_pause!(pipeline.add_many([&video_src, &x264enc, &h264parse, &hlssink3,])); try_or_pause!(gst::Element::link_many([ &video_src, &x264enc, &h264parse, &hlssink3 ])); pipeline.set_state(gst::State::Playing).unwrap(); gst::info!( CAT, "hlssink3_video_pipeline: waiting for {} buffers", BUFFER_NB ); let mut eos = false; let bus = pipeline.bus().unwrap(); while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) { use gst::MessageView; match msg.view() { MessageView::Eos(..) => { eos = true; break; } MessageView::Element(msg) => { if let Some(structure) = msg.structure() { if structure.has_name("hls-segment-added") { let location = structure.get::("location").unwrap(); hls_messages_sender .try_send(HlsSinkEvent::SegmentAddedMessage(location)) .expect("Send segment added event"); } } } MessageView::Error(..) => unreachable!(), _ => (), } } pipeline.set_state(gst::State::Null).unwrap(); assert!(eos); // Collect all events triggered during execution of the pipeline let mut actual_events = Vec::new(); while let Ok(event) = hls_events_receiver.recv_timeout(Duration::from_millis(1)) { actual_events.push(event); } let expected_ordering_of_events = { use self::HlsSinkEvent::*; vec![ GetFragmentStream("segment00000.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), GetFragmentStream("segment00001.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), GetFragmentStream("segment00002.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), DeleteFragment("segment00000.ts".to_string()), GetFragmentStream("segment00003.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), DeleteFragment("segment00001.ts".into()), GetFragmentStream("segment00004.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), DeleteFragment("segment00002.ts".to_string()), GetPlaylistStream("playlist.m3u8".to_string()), ] }; assert_eq!(expected_ordering_of_events, actual_events); let mut actual_messages = Vec::new(); while let Ok(event) = hls_messages_receiver.recv_timeout(Duration::from_millis(1)) { actual_messages.push(event); } let expected_messages = { use self::HlsSinkEvent::*; vec![ SegmentAddedMessage("segment00000.ts".to_string()), SegmentAddedMessage("segment00001.ts".to_string()), SegmentAddedMessage("segment00002.ts".to_string()), SegmentAddedMessage("segment00003.ts".to_string()), SegmentAddedMessage("segment00004.ts".to_string()), ] }; assert_eq!(expected_messages, actual_messages); let contents = playlist_content.lock().unwrap(); assert_eq!( r###"#EXTM3U #EXT-X-VERSION:3 #EXT-X-TARGETDURATION:2 #EXT-X-MEDIA-SEQUENCE:4 #EXTINF:1.999, segment00003.ts #EXTINF:0.333, segment00004.ts #EXT-X-ENDLIST "###, contents.to_string() ); Ok(()) } #[test] fn test_hlssink3_element_with_audio_content() -> Result<(), ()> { init(); const BUFFER_NB: i32 = 100; let pipeline = gst::Pipeline::with_name("audio_pipeline"); let audio_src = try_create_element!("audiotestsrc"); audio_src.set_property("is-live", false); audio_src.set_property("num-buffers", BUFFER_NB); let hls_avenc_aac = try_or_pause!(gst::ElementFactory::make("avenc_aac") .name("hls_avenc_aac") .build()); let hlssink3 = gst::ElementFactory::make("hlssink3") .name("hlssink3") .property("target-duration", 6u32) .build() .expect("Must be able to instantiate hlssink3"); hlssink3.connect("get-playlist-stream", false, move |_args| { let stream = gio::MemoryOutputStream::new_resizable(); Some(stream.to_value()) }); hlssink3.connect("get-fragment-stream", false, move |_args| { let stream = gio::MemoryOutputStream::new_resizable(); Some(stream.to_value()) }); hlssink3.connect("delete-fragment", false, move |_| Some(true.to_value())); try_or_pause!(pipeline.add_many([&audio_src, &hls_avenc_aac, &hlssink3,])); try_or_pause!(gst::Element::link_many([ &audio_src, &hls_avenc_aac, &hlssink3 ])); pipeline.set_state(gst::State::Playing).unwrap(); gst::info!(CAT, "audio_pipeline: waiting for {} buffers", BUFFER_NB); let mut eos = false; let bus = pipeline.bus().unwrap(); while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) { use gst::MessageView; match msg.view() { MessageView::Eos(..) => { eos = true; break; } MessageView::Error(..) => unreachable!(), _ => (), } } pipeline.set_state(gst::State::Null).unwrap(); assert!(eos); Ok(()) } #[test] fn test_hlssink3_write_correct_playlist_content() -> Result<(), ()> { init(); const BUFFER_NB: i32 = 50; let pipeline = gst::Pipeline::with_name("video_pipeline"); let video_src = try_create_element!("videotestsrc"); video_src.set_property("is-live", false); video_src.set_property("num-buffers", BUFFER_NB); let x264enc = try_create_element!("x264enc"); let h264parse = try_create_element!("h264parse"); let hlssink3 = gst::ElementFactory::make("hlssink3") .name("test_hlssink3") .property("location", "/www/media/segments/my-own-filename-%03d.ts") .property("playlist-location", "/www/media/main.m3u8") .property("playlist-root", "segments") .build() .expect("Must be able to instantiate hlssink3"); let (hls_events_sender, hls_events_receiver) = mpsc::sync_channel(20); let (hls_messages_sender, hls_messages_receiver) = mpsc::sync_channel(10); let playlist_content = Arc::new(Mutex::new(String::from(""))); hlssink3.connect("get-playlist-stream", false, { let hls_events_sender = hls_events_sender.clone(); let playlist_content = playlist_content.clone(); move |args| { let location = args[1].get::().expect("No location given"); hls_events_sender .try_send(HlsSinkEvent::GetPlaylistStream(location)) .expect("Send playlist event"); // We need an owned type to pass to `gio::WriteOutputStream` let playlist = MemoryPlaylistFile { handler: Arc::clone(&playlist_content), }; // Since here a new file will be created, the content is cleared playlist.clear_content(); let output = gio::WriteOutputStream::new(playlist); Some(output.to_value()) } }); hlssink3.connect("get-fragment-stream", false, { let hls_events_sender = hls_events_sender.clone(); move |args| { let location = args[1].get::().expect("No location given"); hls_events_sender .try_send(HlsSinkEvent::GetFragmentStream(location)) .expect("Send fragment event"); let stream = gio::MemoryOutputStream::new_resizable(); Some(stream.to_value()) } }); hlssink3.connect("delete-fragment", false, { let hls_events_sender = hls_events_sender.clone(); move |args| { let location = args[1].get::().expect("No location given"); hls_events_sender .try_send(HlsSinkEvent::DeleteFragment(location)) .expect("Send delete fragment event"); Some(true.to_value()) } }); try_or_pause!(pipeline.add_many([&video_src, &x264enc, &h264parse, &hlssink3,])); try_or_pause!(gst::Element::link_many([ &video_src, &x264enc, &h264parse, &hlssink3 ])); pipeline.set_state(gst::State::Playing).unwrap(); gst::info!( CAT, "hlssink3_video_pipeline: waiting for {} buffers", BUFFER_NB ); let mut eos = false; let bus = pipeline.bus().unwrap(); while let Some(msg) = bus.timed_pop(gst::ClockTime::NONE) { use gst::MessageView; match msg.view() { MessageView::Eos(..) => { eos = true; break; } MessageView::Element(msg) => { if let Some(structure) = msg.structure() { if structure.has_name("hls-segment-added") { let location = structure.get::("location").unwrap(); hls_messages_sender .try_send(HlsSinkEvent::SegmentAddedMessage(location)) .expect("Send segment added event"); } } } MessageView::Error(..) => unreachable!(), _ => (), } } pipeline.set_state(gst::State::Null).unwrap(); assert!(eos); // Collect all events triggered during execution of the pipeline let mut actual_events = Vec::new(); while let Ok(event) = hls_events_receiver.recv_timeout(Duration::from_millis(1)) { actual_events.push(event); } let expected_ordering_of_events = { use self::HlsSinkEvent::*; vec![ GetFragmentStream("/www/media/segments/my-own-filename-000.ts".to_string()), GetPlaylistStream("/www/media/main.m3u8".to_string()), GetPlaylistStream("/www/media/main.m3u8".to_string()), ] }; assert_eq!(expected_ordering_of_events, actual_events); let mut actual_messages = Vec::new(); while let Ok(event) = hls_messages_receiver.recv_timeout(Duration::from_millis(1)) { actual_messages.push(event); } let expected_messages = { use self::HlsSinkEvent::*; vec![SegmentAddedMessage( "/www/media/segments/my-own-filename-000.ts".to_string(), )] }; assert_eq!(expected_messages, actual_messages); let contents = playlist_content.lock().unwrap(); assert_eq!( r###"#EXTM3U #EXT-X-VERSION:3 #EXT-X-TARGETDURATION:15 #EXT-X-MEDIA-SEQUENCE:1 #EXTINF:1.666, segments/my-own-filename-000.ts #EXT-X-ENDLIST "###, contents.to_string() ); Ok(()) }