diff --git a/Cargo.toml b/Cargo.toml index c9bd276..a8c3e7c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ bytes = "1.0.1" [dev-dependencies] gst-audio = { package = "gstreamer-audio", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] } +gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] } [build-dependencies] gst-plugin-version-helper = { git = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" } diff --git a/README.md b/README.md index b40d6f8..2088f4c 100644 --- a/README.md +++ b/README.md @@ -10,9 +10,9 @@ Progress: - [x] Write TS content to fragment stream (defaults to filesystem); - [x] Write HLS playlist m3u8 file; - [x] Delete old fragments; -- [ ] Signal to acquire fragment stream; -- [ ] Signal to acquire HLS playlist stream; -- [ ] Signal to delete a fragment file; +- [x] Signal to acquire fragment stream; +- [x] Signal to acquire HLS playlist stream; +- [x] Signal to delete a fragment file; ## Example Usage diff --git a/simple_http.py b/simple_http.py index 25e9d57..037a95a 100644 --- a/simple_http.py +++ b/simple_http.py @@ -5,6 +5,8 @@ import sys class CORSRequestHandler(SimpleHTTPRequestHandler): def end_headers(self): self.send_header('Access-Control-Allow-Origin', '*') + self.send_header('Access-Control-Allow-Methods', 'GET') + self.send_header('Access-Control-Allow-Headers', '*') SimpleHTTPRequestHandler.end_headers(self) if __name__ == '__main__': diff --git a/src/imp.rs b/src/imp.rs index 523d2a0..e68d92b 100644 --- a/src/imp.rs +++ b/src/imp.rs @@ -5,9 +5,10 @@ use gst::subclass::prelude::*; use gst::{gst_debug, gst_error, gst_info, gst_trace, gst_warning}; use crate::playlist::PlaylistRenderState; -use m3u8_rs::playlist::{MediaPlaylist, MediaPlaylistType, MediaSegment}; +use m3u8_rs::playlist::{MediaPlaylist, MediaSegment}; use once_cell::sync::Lazy; use std::fs; +use std::io::Write; use std::path; use std::sync::{Arc, Mutex}; @@ -21,6 +22,10 @@ const DEFAULT_SEND_KEYFRAME_REQUESTS: bool = true; const GST_M3U8_PLAYLIST_VERSION: usize = 3; const BACKWARDS_COMPATIBLE_PLACEHOLDER: &str = "%05d"; +const SIGNAL_GET_PLAYLIST_STREAM: &str = "get-playlist-stream"; +const SIGNAL_GET_FRAGMENT_STREAM: &str = "get-fragment-stream"; +const SIGNAL_DELETE_FRAGMENT: &str = "delete-fragment"; + static CAT: Lazy = Lazy::new(|| { gst::DebugCategory::new( "flexhlssink", @@ -116,7 +121,7 @@ impl FlexHlsSink { segments: vec![], discontinuity_sequence: 0, end_list: false, - playlist_type: Some(MediaPlaylistType::Vod), + playlist_type: None, i_frames_only: false, start: None, independent_segments: false, @@ -163,12 +168,17 @@ impl FlexHlsSink { *current_segment_location = Some(segment_file_location.clone()); - // TODO: this should be a call to the signal exposed by this plugin - let stream = self - .new_file_stream(element, &segment_file_location) + let fragment_stream = element + .emit_by_name(SIGNAL_GET_FRAGMENT_STREAM, &[&segment_file_location]) + .expect("Error while getting fragment stream") + .unwrap() + .get::() .map_err(|err| err.to_string())?; + let giostreamsink = settings.giostreamsink.as_ref().unwrap(); - giostreamsink.set_property("stream", &stream).unwrap(); + giostreamsink + .set_property("stream", &fragment_stream) + .unwrap(); gst_info!( CAT, @@ -207,10 +217,19 @@ impl FlexHlsSink { Ok(gio::WriteOutputStream::new(file).upcast()) } + fn delete_fragment

(&self, location: &P) + where + P: AsRef, + { + let _ = fs::remove_file(location).map_err(|err| { + gst_warning!(CAT, "Could not delete segment file: {}", err.to_string()); + }); + } + fn write_playlist( &self, element: &super::FlexHlsSink, - fragment_closed_at: gst::ClockTime, + fragment_closed_at: Option, ) -> Result { gst_info!(CAT, obj: element, "Preparing to write new playlist"); @@ -227,30 +246,35 @@ impl FlexHlsSink { .. } => { gst_info!(CAT, "COUNT {}", playlist.segments.len()); - // TODO: Add new entry to the playlist - let segment_location = current_segment_location - .take() - .ok_or_else(|| gst::StateChangeError)?; + // Only add fragment if it's complete. + if let Some(fragment_closed_at) = fragment_closed_at { + let segment_location = current_segment_location + .take() + .ok_or(gst::StateChangeError)?; - playlist.segments.push(MediaSegment { - uri: segment_location.clone(), - duration: { - let fragment_opened_at = - fragment_opened_at.as_ref().ok_or(gst::StateChangeError)?; + playlist.segments.push(MediaSegment { + uri: segment_location.clone(), + duration: { + let fragment_opened_at = + fragment_opened_at.as_ref().ok_or(gst::StateChangeError)?; - let segment_duration = fragment_closed_at - fragment_opened_at; + let segment_duration = fragment_closed_at - fragment_opened_at; - segment_duration.seconds().ok_or(gst::StateChangeError)? as f32 - }, - title: None, - byte_range: None, - discontinuity: false, - key: None, - map: None, - program_date_time: None, - daterange: None, - }); + segment_duration.mseconds().ok_or(gst::StateChangeError)? as f32 + / 1_000f32 + }, + title: None, + byte_range: None, + discontinuity: false, + key: None, + map: None, + program_date_time: None, + daterange: None, + }); + + old_segment_locations.push(segment_location); + } let (playlist_location, max_num_segments, max_playlist_length) = { let settings = self.settings.lock().unwrap(); @@ -271,31 +295,35 @@ impl FlexHlsSink { *playlist_index += 1; playlist.media_sequence = *playlist_index as i32 - playlist.segments.len() as i32; - // TODO: this should be a call to the signal exposed by this plugin - let mut playlist_file = self - .new_file_stream(&element, &playlist_location) - .map_err(|_| gst::StateChangeError)? + let mut playlist_stream = element + .emit_by_name(SIGNAL_GET_PLAYLIST_STREAM, &[&playlist_location]) + .expect("Error while getting playlist stream") + .ok_or(gst::StateChangeError)? + .get::() + .map_err(|err| { + gst_error!( + CAT, + "Could not get stream to write playlist content: {}", + err.to_string() + ); + gst::StateChangeError + })? .into_write(); - playlist.write_to(&mut playlist_file).map_err(|err| { - gst_error!( - CAT, - "Could not write new playlist file: {}", - err.to_string() - ); + playlist.write_to(&mut playlist_stream).map_err(|err| { + gst_error!(CAT, "Could not write new playlist: {}", err.to_string()); gst::StateChangeError })?; + let _ = playlist_stream.flush().unwrap(); *playlist_render_state = PlaylistRenderState::Started; - old_segment_locations.push(segment_location); if old_segment_locations.len() > max_num_segments { for _ in 0..old_segment_locations.len() - max_num_segments { let old_segment_location = old_segment_locations.remove(0); - // TODO: trigger event to delete segment location - let _ = fs::remove_file(&old_segment_location).map_err(|err| { - gst_warning!(CAT, "Could not delete segment file: {}", err.to_string()); - }); + let _ = element + .emit_by_name(SIGNAL_DELETE_FRAGMENT, &[&old_segment_location]) + .expect("Error while processing signal handler"); } } } @@ -310,7 +338,7 @@ impl FlexHlsSink { element: &super::FlexHlsSink, ) -> Result { gst_debug!(CAT, obj: element, "Preparing to write final playlist"); - Ok(self.write_playlist(element, element.current_running_time())?) + self.write_playlist(element, None) } fn stop(&self, element: &super::FlexHlsSink) { @@ -371,7 +399,8 @@ impl BinImpl for FlexHlsSink { let s = msg.structure().unwrap(); if let Ok(fragment_closed_at) = s.get::("running-time") { - self.write_playlist(element, fragment_closed_at).unwrap(); + self.write_playlist(element, Some(fragment_closed_at)) + .unwrap(); } } _ => {} @@ -448,6 +477,74 @@ impl ObjectImpl for FlexHlsSink { PROPERTIES.as_ref() } + fn signals() -> &'static [glib::subclass::Signal] { + static SIGNALS: Lazy> = Lazy::new(|| { + vec![ + glib::subclass::Signal::builder( + SIGNAL_GET_PLAYLIST_STREAM, + &[String::static_type().into()], + gio::OutputStream::static_type().into(), + ) + .action() + .class_handler(|_, args| { + let element = args[0] + .get::() + .expect("playlist-stream signal arg"); + let playlist_location = + args[1].get::().expect("playlist-stream signal arg"); + let flexhlssink = FlexHlsSink::from_instance(&element); + + Some( + flexhlssink + .new_file_stream(&element, &playlist_location) + .ok()? + .to_value(), + ) + }) + .build(), + glib::subclass::Signal::builder( + SIGNAL_GET_FRAGMENT_STREAM, + &[String::static_type().into()], + gio::OutputStream::static_type().into(), + ) + .action() + .class_handler(|_, args| { + let element = args[0] + .get::() + .expect("fragment-stream signal arg"); + let fragment_location = + args[1].get::().expect("fragment-stream signal arg"); + let flexhlssink = FlexHlsSink::from_instance(&element); + + Some( + flexhlssink + .new_file_stream(&element, &fragment_location) + .ok()? + .to_value(), + ) + }) + .build(), + glib::subclass::Signal::builder( + SIGNAL_DELETE_FRAGMENT, + &[String::static_type().into()], + glib::types::Type::UNIT.into(), + ) + .action() + .class_handler(|_, args| { + let element = args[0].get::().expect("signal arg"); + let fragment_location = args[1].get::().expect("signal arg"); + let flexhlssink = FlexHlsSink::from_instance(&element); + + flexhlssink.delete_fragment(&fragment_location); + None + }) + .build(), + ] + }); + + SIGNALS.as_ref() + } + fn set_property( &self, _obj: &Self::Type, @@ -630,11 +727,8 @@ impl ElementImpl for FlexHlsSink { element: &Self::Type, transition: gst::StateChange, ) -> Result { - match transition { - gst::StateChange::NullToReady => { - self.start(element)?; - } - _ => (), + if let gst::StateChange::NullToReady = transition { + self.start(element)?; } let ret = self.parent_change_state(element, transition)?; diff --git a/src/playlist.rs b/src/playlist.rs index 7f3c22e..88a368c 100644 --- a/src/playlist.rs +++ b/src/playlist.rs @@ -2,19 +2,10 @@ use m3u8_rs::playlist; pub struct MediaPlaylist(playlist::MediaPlaylist); -impl MediaPlaylist { - fn inner_mut(&mut self) -> &mut playlist::MediaPlaylist { - &mut self.0 - } - - fn inner(&self) -> &playlist::MediaPlaylist { - &self.0 - } -} +impl MediaPlaylist {} #[derive(Copy, Clone, PartialEq)] pub enum PlaylistRenderState { Init, Started, - Ended, } diff --git a/tests/flexhlssink.rs b/tests/flexhlssink.rs index edc7418..d34a644 100644 --- a/tests/flexhlssink.rs +++ b/tests/flexhlssink.rs @@ -1,3 +1,4 @@ +use gio::prelude::*; use glib::prelude::*; use gst::gst_info; use gst::prelude::*; @@ -25,6 +26,35 @@ fn init() { }); } +#[test] +fn test_events() { + init(); + + let mut h = gst_check::Harness::new_parse( + "videotestsrc is-live=true ! x264enc ! h264parse ! flexhlssink", + ); + let flexhlssink = h.find_element("flexhlssink").unwrap(); + + let fragment_location = "test.ts".to_string(); + + let fragment_stream = flexhlssink + .emit_by_name("get-fragment-stream", &[&fragment_location]) + .expect("successful result") + .expect("return some value") + .get::() + .expect("it's a gio::OutputStream value"); + + // write something, just to make sure it is valid! + fragment_stream + .write("TEST".as_bytes(), gio::NONE_CANCELLABLE) + .unwrap(); + + assert!(flexhlssink + .emit_by_name("delete-fragment", &[&fragment_location]) + .expect("successful result") + .is_none()); +} + #[test] fn test_basic_element_with_video_content() { init(); @@ -98,7 +128,7 @@ fn test_basic_element_with_video_content() { "flexhlssink_video_pipeline: waiting for {} buffers", BUFFER_NB ); - for idx in 0..BUFFER_NB { + for _idx in 0..BUFFER_NB { receiver.recv().unwrap(); //gst_info!(CAT, "flexhlssink_video_pipeline: received buffer #{}", idx); }