diff --git a/net/hlssink3/src/hlsbasesink.rs b/net/hlssink3/src/hlsbasesink.rs index d72ba4fd9..857860122 100644 --- a/net/hlssink3/src/hlsbasesink.rs +++ b/net/hlssink3/src/hlsbasesink.rs @@ -57,6 +57,26 @@ pub enum HlsProgramDateTimeReference { BufferReferenceTimestamp = 2, } +// We need to keep an OutputStream around for writing to the same file +// to support the single media file use case. OutputStream not being +// thread safe, use this wrapper to keep an OutputStream around in State. +struct GioOutputStream { + stream: gio::OutputStream, +} + +unsafe impl Send for GioOutputStream {} +unsafe impl Sync for GioOutputStream {} + +impl GioOutputStream { + pub fn new(stream: gio::OutputStream) -> Self { + Self { stream } + } + + pub fn as_output_stream(&self) -> gio::OutputStream { + self.stream.clone() + } +} + struct Settings { playlist_location: String, playlist_root: Option, @@ -65,6 +85,7 @@ struct Settings { enable_program_date_time: bool, program_date_time_reference: HlsProgramDateTimeReference, enable_endlist: bool, + single_media_file: Option, } impl Default for Settings { @@ -77,6 +98,7 @@ impl Default for Settings { enable_program_date_time: DEFAULT_PROGRAM_DATE_TIME_TAG, program_date_time_reference: DEFAULT_PROGRAM_DATE_TIME_REFERENCE, enable_endlist: DEFAULT_ENDLIST, + single_media_file: None, } } } @@ -90,11 +112,13 @@ pub struct PlaylistContext { playlist_location: String, max_num_segment_files: usize, playlist_length: u32, + single_media_file: bool, } #[derive(Default)] pub struct State { context: Option, + stream: Option, } #[derive(Default)] @@ -163,6 +187,10 @@ impl ObjectImpl for HlsBaseSink { .blurb("Write \"EXT-X-ENDLIST\" tag to manifest at the end of stream") .default_value(DEFAULT_ENDLIST) .build(), + glib::ParamSpecString::builder("single-media-file") + .nick("Single media file") + .blurb("Location of the single media file to write (media playlist will use byte-range addressing)") + .build(), ] }); @@ -208,6 +236,11 @@ impl ObjectImpl for HlsBaseSink { "enable-endlist" => { settings.enable_endlist = value.get().expect("type checked upstream"); } + "single-media-file" => { + settings.single_media_file = value + .get::>() + .expect("type checked upstream"); + } _ => unimplemented!(), }; } @@ -228,6 +261,7 @@ impl ObjectImpl for HlsBaseSink { .to_value(), "program-date-time-reference" => settings.program_date_time_reference.to_value(), "enable-endlist" => settings.enable_endlist.to_value(), + "single-media-file" => settings.single_media_file.to_value(), _ => unimplemented!(), } } @@ -335,6 +369,7 @@ impl HlsBaseSink { playlist_location: settings.playlist_location.clone(), max_num_segment_files: settings.max_num_segment_files, playlist_length: settings.playlist_length, + single_media_file: settings.single_media_file.is_some(), }); } @@ -351,17 +386,21 @@ impl HlsBaseSink { } pub fn get_location(&self, fragment_id: u32) -> Option { - let mut state = self.state.lock().unwrap(); - let context = match state.context.as_mut() { - Some(context) => context, - None => { - gst::error!(CAT, imp = self, "Playlist is not configured",); + let settings = self.settings.lock().unwrap(); + if settings.single_media_file.is_none() { + let mut state = self.state.lock().unwrap(); + let context = match state.context.as_mut() { + Some(context) => context, + None => { + gst::error!(CAT, imp = self, "Playlist is not configured",); + return None; + } + }; - return None; - } - }; - - sprintf::sprintf!(&context.segment_template, fragment_id).ok() + sprintf::sprintf!(&context.segment_template, fragment_id).ok() + } else { + settings.single_media_file.clone() + } } pub fn get_fragment_stream(&self, fragment_id: u32) -> Option<(gio::OutputStream, String)> { @@ -375,22 +414,42 @@ impl HlsBaseSink { } }; - let location = match sprintf::sprintf!(&context.segment_template, fragment_id) { - Ok(file_name) => file_name, - Err(err) => { - gst::error!(CAT, imp = self, "Couldn't build file name, err: {:?}", err,); + let settings = self.settings.lock().unwrap(); + if settings.single_media_file.is_none() { + let location = match sprintf::sprintf!(&context.segment_template, fragment_id) { + Ok(file_name) => file_name, + Err(err) => { + gst::error!(CAT, imp = self, "Couldn't build file name, err: {:?}", err,); + return None; + } + }; - return None; - } - }; + let stream = self.obj().emit_by_name::>( + SIGNAL_GET_FRAGMENT_STREAM, + &[&location], + )?; - gst::trace!(CAT, imp = self, "Segment location formatted: {}", location); + gst::trace!(CAT, imp = self, "Segment location formatted: {}", location); - let stream = self - .obj() - .emit_by_name::>(SIGNAL_GET_FRAGMENT_STREAM, &[&location])?; + Some((stream, location)) + } else { + let location = settings.single_media_file.as_ref().unwrap().clone(); - Some((stream, location)) + let stream = if let Some(s) = &state.stream { + s.as_output_stream() + } else { + let stream = self.obj().emit_by_name::>( + SIGNAL_GET_FRAGMENT_STREAM, + &[&location], + )?; + + state.stream = Some(GioOutputStream::new(stream.clone())); + + stream + }; + + Some((stream, location)) + } } pub fn get_segment_uri(&self, location: &str, prefix: Option<&str>) -> String { @@ -563,7 +622,10 @@ impl HlsBaseSink { gst::FlowError::Error })?; - if context.playlist.is_type_undefined() && context.max_num_segment_files > 0 { + let delete_fragment = context.playlist.is_type_undefined() + && context.max_num_segment_files > 0 + && !context.single_media_file; + if delete_fragment { // Cleanup old segments from filesystem while context.old_segment_locations.len() > context.max_num_segment_files { let old_segment_location = context.old_segment_locations.remove(0); @@ -623,4 +685,9 @@ impl HlsBaseSink { ); }); } + + pub fn is_single_media_file(&self) -> bool { + let settings = self.settings.lock().unwrap(); + settings.single_media_file.is_some() + } } diff --git a/net/hlssink3/src/hlscmafsink/imp.rs b/net/hlssink3/src/hlscmafsink/imp.rs index 84c398450..14f330f55 100644 --- a/net/hlssink3/src/hlscmafsink/imp.rs +++ b/net/hlssink3/src/hlscmafsink/imp.rs @@ -94,6 +94,7 @@ struct HlsCmafSinkState { segment_idx: u32, init_segment: Option, new_header: bool, + offset: u64, } #[derive(Default)] @@ -414,32 +415,65 @@ impl HlsCmafSink { Playlist::new(playlist, turn_vod, true) } - fn on_init_segment(&self) -> Result, String> { + fn on_init_segment( + &self, + init_segment_size: u64, + ) -> Result, String> { let settings = self.settings.lock().unwrap(); - let mut state = self.state.lock().unwrap(); - let location = match sprintf::sprintf!(&settings.init_location, state.init_idx) { - Ok(location) => location, - Err(err) => { - gst::error!(CAT, imp = self, "Couldn't build file name, err: {:?}", err,); - return Err(String::from("Invalid init segment file pattern")); - } - }; - let stream = self - .obj() - .emit_by_name::>(SIGNAL_GET_INIT_STREAM, &[&location]) - .ok_or_else(|| String::from("Error while getting fragment stream"))? - .into_write(); + let (stream, location, byte_range) = if !base_imp!(self).is_single_media_file() { + let state = self.state.lock().unwrap(); + + match sprintf::sprintf!(&settings.init_location, state.init_idx) { + Ok(location) => { + let stream = self + .obj() + .emit_by_name::>( + SIGNAL_GET_INIT_STREAM, + &[&location], + ) + .ok_or_else(|| String::from("Error while getting init stream"))? + .into_write(); + + (stream, location, None) + } + Err(err) => { + gst::error!(CAT, imp = self, "Couldn't build file name, err: {:?}", err,); + return Err(String::from("Invalid init segment file pattern")); + } + } + } else { + let (stream, location) = self.on_new_fragment().map_err(|err| { + gst::error!( + CAT, + imp = self, + "Couldn't get fragment stream for init segment, {err}", + ); + String::from("Couldn't get fragment stream for init segment") + })?; + + ( + stream, + location, + Some(m3u8_rs::ByteRange { + length: init_segment_size, + offset: Some(0), + }), + ) + }; let uri = base_imp!(self).get_segment_uri(&location, settings.playlist_root_init.as_deref()); + let mut state = self.state.lock().unwrap(); state.init_segment = Some(m3u8_rs::Map { uri, + byte_range, ..Default::default() }); state.new_header = true; state.init_idx += 1; + state.offset = init_segment_size; Ok(stream) } @@ -463,6 +497,7 @@ impl HlsCmafSink { running_time: Option, location: String, timestamp: Option>, + byte_range: Option, ) -> Result { let uri = base_imp!(self).get_segment_uri(&location, None); let mut state = self.state.lock().unwrap(); @@ -483,6 +518,7 @@ impl HlsCmafSink { uri, duration: duration.mseconds() as f32 / 1_000f32, map, + byte_range, ..Default::default() }, ) @@ -496,7 +532,7 @@ impl HlsCmafSink { .flags() .contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER) { - let mut stream = self.on_init_segment().map_err(|err| { + let mut stream = self.on_init_segment(first.size() as u64).map_err(|err| { gst::error!( CAT, imp = self, @@ -561,6 +597,18 @@ impl HlsCmafSink { gst::FlowError::Error })?; - self.add_segment(duration, running_time, location, None) + let byte_range = if base_imp!(self).is_single_media_file() { + let length = buffer_list.calculate_size() as u64; + + let mut state = self.state.lock().unwrap(); + let offset = Some(state.offset); + state.offset += length; + + Some(m3u8_rs::ByteRange { length, offset }) + } else { + None + }; + + self.add_segment(duration, running_time, location, None, byte_range) } } diff --git a/net/hlssink3/src/hlssink3/imp.rs b/net/hlssink3/src/hlssink3/imp.rs index cf61ee95b..e6ef96caa 100644 --- a/net/hlssink3/src/hlssink3/imp.rs +++ b/net/hlssink3/src/hlssink3/imp.rs @@ -17,6 +17,8 @@ use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; use m3u8_rs::{MediaPlaylist, MediaPlaylistType, MediaSegment}; +use std::io::Write; +use std::sync::Arc; use std::sync::LazyLock; use std::sync::Mutex; @@ -36,6 +38,89 @@ macro_rules! base_imp { }; } +// `splitmuxsink` does not know the size of the fragment written out by +// the muxer. We track this by using a wrapper around the OutputStream. +// Implementing Write trait for this allows passing it to a WriteOutputStream. +#[derive(Clone)] +struct CountingOutputStream { + inner: Arc>, +} + +impl CountingOutputStream { + pub fn new(stream: gio::OutputStream) -> Self { + Self { + inner: Arc::new(Mutex::new(CountingOutputStreamInner::new(stream))), + } + } + + pub fn out_bytes(&self) -> u64 { + let inner = self.inner.lock().unwrap(); + inner.out_bytes() + } +} + +impl Write for CountingOutputStream { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + let mut inner = self.inner.lock().unwrap(); + inner.write(buf) + } + + fn flush(&mut self) -> std::io::Result<()> { + let mut inner = self.inner.lock().unwrap(); + inner.flush() + } +} + +struct CountingOutputStreamInner { + stream: gio::OutputStream, + data: Vec, + out_bytes: u64, +} + +unsafe impl Send for CountingOutputStreamInner {} +unsafe impl Sync for CountingOutputStreamInner {} + +impl CountingOutputStreamInner { + pub fn new(stream: gio::OutputStream) -> Self { + Self { + stream, + data: Vec::new(), + out_bytes: 0, + } + } + + pub fn out_bytes(&self) -> u64 { + self.out_bytes + } + + fn inner_flush(&mut self) -> std::io::Result<()> { + let data_len = self.data.len() as u64; + if data_len == 0 { + return Ok(()); + } + + let data: Vec = self.data.drain(0..).collect(); + + let mut s = self.stream.clone().into_write(); + s.write(&data).unwrap(); + + self.out_bytes = data_len; + + Ok(()) + } +} + +impl Write for CountingOutputStreamInner { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + self.data.extend_from_slice(buf); + Ok(buf.len()) + } + + fn flush(&mut self) -> std::io::Result<()> { + self.inner_flush() + } +} + /// Offset between NTP and UNIX epoch in seconds. /// NTP = UNIX + NTP_UNIX_OFFSET. const NTP_UNIX_OFFSET: u64 = 2_208_988_800; @@ -161,6 +246,8 @@ struct HlsSink3State { fragment_running_time: Option, current_segment_location: Option, fragment_start_timestamp: Option>, + stream: Option, + offset: u64, } #[derive(Default)] @@ -543,8 +630,14 @@ impl HlsSink3 { (false, playlist_type) }; + let version = if i_frames_only || base_imp!(self).is_single_media_file() { + Some(4) + } else { + Some(3) + }; + let playlist = MediaPlaylist { - version: if i_frames_only { Some(4) } else { Some(3) }, + version, target_duration: target_duration as u64, playlist_type, i_frames_only, @@ -575,16 +668,29 @@ impl HlsSink3 { state.fragment_running_time = running_time; let settings = self.settings.lock().unwrap(); - settings - .giostreamsink - .set_property("stream", &fragment_stream); + let stream = if base_imp!(self).is_single_media_file() { + if state.stream.is_none() { + let gios = CountingOutputStream::new(fragment_stream); + let stream = + gio::WriteOutputStream::new(gios.clone()).upcast::(); + state.stream = Some(gios); + stream + } else { + gio::WriteOutputStream::new(state.stream.as_ref().unwrap().clone()) + .upcast::() + } + } else { + gst::info!( + CAT, + imp = self, + "New segment location: {:?}", + segment_file_location, + ); - gst::info!( - CAT, - imp = self, - "New segment location: {:?}", - segment_file_location, - ); + fragment_stream + }; + + settings.giostreamsink.set_property("stream", &stream); Ok(segment_file_location) } @@ -638,6 +744,17 @@ impl HlsSink3 { let running_time = state.fragment_running_time; let fragment_start_timestamp = state.fragment_start_timestamp.take(); + let byte_range = if base_imp!(self).is_single_media_file() { + let length = state.stream.as_ref().unwrap().out_bytes(); + let offset = state.offset; + state.offset += length; + Some(m3u8_rs::ByteRange { + length, + offset: Some(offset), + }) + } else { + None + }; drop(state); @@ -650,6 +767,7 @@ impl HlsSink3 { MediaSegment { uri, duration: duration_msec, + byte_range, ..Default::default() }, );