diff --git a/net/hlssink3/src/hlsbasesink.rs b/net/hlssink3/src/hlsbasesink.rs index ef2abb49b..76745dc70 100644 --- a/net/hlssink3/src/hlsbasesink.rs +++ b/net/hlssink3/src/hlsbasesink.rs @@ -10,6 +10,7 @@ use crate::playlist::Playlist; use chrono::{DateTime, Duration, Utc}; use gio::prelude::*; +use gio::subclass::prelude::OutputStreamImpl; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; @@ -17,8 +18,7 @@ use m3u8_rs::MediaSegment; use std::fs; use std::io::Write; use std::path; -use std::sync::LazyLock; -use std::sync::Mutex; +use std::sync::{LazyLock, Mutex}; const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8"; const DEFAULT_MAX_NUM_SEGMENT_FILES: u32 = 10; @@ -60,20 +60,70 @@ pub enum HlsProgramDateTimeReference { // We need to keep an OutputStream around for writing to the same file // to support the single media file use case. OutputStream not being // thread safe, use this wrapper to keep an OutputStream around in State. -struct GioOutputStream { - stream: gio::OutputStream, +#[derive(Default)] +pub struct HlsBaseSinkGioOutputStream { + stream: Mutex>, + out_bytes: Mutex, } -unsafe impl Send for GioOutputStream {} -unsafe impl Sync for GioOutputStream {} +unsafe impl Send for HlsBaseSinkGioOutputStream {} +unsafe impl Sync for HlsBaseSinkGioOutputStream {} -impl GioOutputStream { - pub fn new(stream: gio::OutputStream) -> Self { - Self { stream } +#[glib::object_subclass] +impl ObjectSubclass for HlsBaseSinkGioOutputStream { + const NAME: &'static str = "GstHlsBaseSinkGioOutputStream"; + type Type = super::HlsBaseSinkGioOutputStream; + type ParentType = gio::OutputStream; +} + +impl ObjectImpl for HlsBaseSinkGioOutputStream {} + +impl OutputStreamImpl for HlsBaseSinkGioOutputStream { + fn write( + &self, + buffer: &[u8], + cancellable: Option<&gio::Cancellable>, + ) -> Result { + let stream = self.stream.lock().unwrap(); + if let Some(ref s) = *stream { + s.write_all(buffer, cancellable)?; + let mut out_bytes = self.out_bytes.lock().unwrap(); + *out_bytes += buffer.len(); + return Ok(buffer.len()); + } + + Ok(0) } - pub fn as_output_stream(&self) -> gio::OutputStream { - self.stream.clone() + fn close(&self, cancellable: Option<&gio::Cancellable>) -> Result<(), glib::Error> { + let stream = self.stream.lock().unwrap().take(); + if let Some(ref s) = stream { + s.close(cancellable)?; + } + + Ok(()) + } + + fn flush(&self, cancellable: Option<&gio::Cancellable>) -> Result<(), glib::Error> { + let stream = self.stream.lock().unwrap(); + if let Some(ref s) = *stream { + s.flush(cancellable)?; + } + + Ok(()) + } +} + +impl super::HlsBaseSinkGioOutputStream { + pub fn new(stream: gio::OutputStream) -> Self { + let obj = glib::Object::new::(); + *obj.imp().stream.lock().unwrap() = Some(stream); + obj + } + + pub fn out_bytes(&self) -> usize { + let out_bytes = self.imp().out_bytes.lock().unwrap(); + *out_bytes } } @@ -118,7 +168,7 @@ pub struct PlaylistContext { #[derive(Default)] pub struct State { context: Option, - stream: Option, + stream: Option, } #[derive(Default)] @@ -434,21 +484,23 @@ impl HlsBaseSink { Some((stream, location)) } else { let location = settings.single_media_file.as_ref().unwrap().clone(); - - let stream = if let Some(s) = &state.stream { - s.as_output_stream() - } else { + if state.stream.is_none() { let stream = self.obj().emit_by_name::>( SIGNAL_GET_FRAGMENT_STREAM, &[&location], )?; - state.stream = Some(GioOutputStream::new(stream.clone())); + let gios = super::HlsBaseSinkGioOutputStream::new(stream); + let stream = gios.upcast_ref::().clone(); - stream - }; + state.stream = Some(gios); - Some((stream, location)) + Some((stream, location)) + } else { + let gios = state.stream.as_ref().unwrap(); + let stream = gios.upcast_ref::().clone(); + Some((stream, location)) + } } } @@ -699,6 +751,11 @@ impl HlsBaseSink { settings.single_media_file.is_some() } + pub fn out_bytes(&self) -> usize { + let state = self.state.lock().unwrap(); + state.stream.as_ref().unwrap().out_bytes() + } + fn byte_ranges( &self, context: &PlaylistContext, diff --git a/net/hlssink3/src/hlssink3/imp.rs b/net/hlssink3/src/hlssink3/imp.rs index e6ef96caa..093f44db9 100644 --- a/net/hlssink3/src/hlssink3/imp.rs +++ b/net/hlssink3/src/hlssink3/imp.rs @@ -17,8 +17,6 @@ use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; use m3u8_rs::{MediaPlaylist, MediaPlaylistType, MediaSegment}; -use std::io::Write; -use std::sync::Arc; use std::sync::LazyLock; use std::sync::Mutex; @@ -38,89 +36,6 @@ macro_rules! base_imp { }; } -// `splitmuxsink` does not know the size of the fragment written out by -// the muxer. We track this by using a wrapper around the OutputStream. -// Implementing Write trait for this allows passing it to a WriteOutputStream. -#[derive(Clone)] -struct CountingOutputStream { - inner: Arc>, -} - -impl CountingOutputStream { - pub fn new(stream: gio::OutputStream) -> Self { - Self { - inner: Arc::new(Mutex::new(CountingOutputStreamInner::new(stream))), - } - } - - pub fn out_bytes(&self) -> u64 { - let inner = self.inner.lock().unwrap(); - inner.out_bytes() - } -} - -impl Write for CountingOutputStream { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - let mut inner = self.inner.lock().unwrap(); - inner.write(buf) - } - - fn flush(&mut self) -> std::io::Result<()> { - let mut inner = self.inner.lock().unwrap(); - inner.flush() - } -} - -struct CountingOutputStreamInner { - stream: gio::OutputStream, - data: Vec, - out_bytes: u64, -} - -unsafe impl Send for CountingOutputStreamInner {} -unsafe impl Sync for CountingOutputStreamInner {} - -impl CountingOutputStreamInner { - pub fn new(stream: gio::OutputStream) -> Self { - Self { - stream, - data: Vec::new(), - out_bytes: 0, - } - } - - pub fn out_bytes(&self) -> u64 { - self.out_bytes - } - - fn inner_flush(&mut self) -> std::io::Result<()> { - let data_len = self.data.len() as u64; - if data_len == 0 { - return Ok(()); - } - - let data: Vec = self.data.drain(0..).collect(); - - let mut s = self.stream.clone().into_write(); - s.write(&data).unwrap(); - - self.out_bytes = data_len; - - Ok(()) - } -} - -impl Write for CountingOutputStreamInner { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - self.data.extend_from_slice(buf); - Ok(buf.len()) - } - - fn flush(&mut self) -> std::io::Result<()> { - self.inner_flush() - } -} - /// Offset between NTP and UNIX epoch in seconds. /// NTP = UNIX + NTP_UNIX_OFFSET. const NTP_UNIX_OFFSET: u64 = 2_208_988_800; @@ -246,7 +161,6 @@ struct HlsSink3State { fragment_running_time: Option, current_segment_location: Option, fragment_start_timestamp: Option>, - stream: Option, offset: u64, } @@ -630,7 +544,25 @@ impl HlsSink3 { (false, playlist_type) }; - let version = if i_frames_only || base_imp!(self).is_single_media_file() { + let is_single_media_file = base_imp!(self).is_single_media_file(); + if is_single_media_file { + // `splitmuxsink` will stop the sink on every fragment, + // don't do that for single media file case as we need + // to keep the stream around for writing. + let giostreamsink = self.settings.lock().unwrap().giostreamsink.clone(); + if giostreamsink.has_property_with_type("close-on-stop", bool::static_type()) { + giostreamsink.set_property("close-on-stop", false); + } else { + gst::element_imp_error!( + self, + gst::ResourceError::Settings, + ("Invalid configuration"), + ["Single media file support with hlssink3 needs GStreamer 1.24 or later"] + ); + } + } + + let version = if i_frames_only || is_single_media_file { Some(4) } else { Some(3) @@ -668,29 +600,17 @@ impl HlsSink3 { state.fragment_running_time = running_time; let settings = self.settings.lock().unwrap(); - let stream = if base_imp!(self).is_single_media_file() { - if state.stream.is_none() { - let gios = CountingOutputStream::new(fragment_stream); - let stream = - gio::WriteOutputStream::new(gios.clone()).upcast::(); - state.stream = Some(gios); - stream - } else { - gio::WriteOutputStream::new(state.stream.as_ref().unwrap().clone()) - .upcast::() - } - } else { - gst::info!( - CAT, - imp = self, - "New segment location: {:?}", - segment_file_location, - ); - fragment_stream - }; + gst::info!( + CAT, + imp = self, + "New segment location: {:?}", + segment_file_location, + ); - settings.giostreamsink.set_property("stream", &stream); + settings + .giostreamsink + .set_property("stream", &fragment_stream); Ok(segment_file_location) } @@ -745,8 +665,8 @@ impl HlsSink3 { let running_time = state.fragment_running_time; let fragment_start_timestamp = state.fragment_start_timestamp.take(); let byte_range = if base_imp!(self).is_single_media_file() { - let length = state.stream.as_ref().unwrap().out_bytes(); let offset = state.offset; + let length = base_imp!(self).out_bytes() as u64 - offset; state.offset += length; Some(m3u8_rs::ByteRange { length, diff --git a/net/hlssink3/src/lib.rs b/net/hlssink3/src/lib.rs index 24e75654c..4f29420f5 100644 --- a/net/hlssink3/src/lib.rs +++ b/net/hlssink3/src/lib.rs @@ -19,6 +19,13 @@ pub mod hlscmafsink; pub mod hlssink3; mod playlist; +glib::wrapper! { + pub struct HlsBaseSinkGioOutputStream(ObjectSubclass) @extends gio::OutputStream; +} + +unsafe impl Send for HlsBaseSinkGioOutputStream {} +unsafe impl Sync for HlsBaseSinkGioOutputStream {} + glib::wrapper! { pub struct HlsBaseSink(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; }