hlssink3/hlscmafsink: Support the use of a single media file

Allow the use of a single file for media. Media playlist will use byte
range tags like below while referencing the same single media file.

```
#EXT-X-PLAYLIST-TYPE:VOD
#EXT-X-MAP:URI="main.mp4",BYTERANGE="768@0"
#EXT-X-BYTERANGE:198120@768
#EXTINF:10,
main.mp4
#EXT-X-BYTERANGE:197426@198888
#EXTINF:10,
main.mp4
```

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2439>
This commit is contained in:
Sanchayan Maity 2025-08-08 15:17:22 +05:30
parent 8ec5d0995f
commit 2539a59b99
3 changed files with 282 additions and 49 deletions

View file

@ -57,6 +57,26 @@ pub enum HlsProgramDateTimeReference {
BufferReferenceTimestamp = 2,
}
// We need to keep an OutputStream around for writing to the same file
// to support the single media file use case. OutputStream not being
// thread safe, use this wrapper to keep an OutputStream around in State.
struct GioOutputStream {
stream: gio::OutputStream,
}
unsafe impl Send for GioOutputStream {}
unsafe impl Sync for GioOutputStream {}
impl GioOutputStream {
pub fn new(stream: gio::OutputStream) -> Self {
Self { stream }
}
pub fn as_output_stream(&self) -> gio::OutputStream {
self.stream.clone()
}
}
struct Settings {
playlist_location: String,
playlist_root: Option<String>,
@ -65,6 +85,7 @@ struct Settings {
enable_program_date_time: bool,
program_date_time_reference: HlsProgramDateTimeReference,
enable_endlist: bool,
single_media_file: Option<String>,
}
impl Default for Settings {
@ -77,6 +98,7 @@ impl Default for Settings {
enable_program_date_time: DEFAULT_PROGRAM_DATE_TIME_TAG,
program_date_time_reference: DEFAULT_PROGRAM_DATE_TIME_REFERENCE,
enable_endlist: DEFAULT_ENDLIST,
single_media_file: None,
}
}
}
@ -90,11 +112,13 @@ pub struct PlaylistContext {
playlist_location: String,
max_num_segment_files: usize,
playlist_length: u32,
single_media_file: bool,
}
#[derive(Default)]
pub struct State {
context: Option<PlaylistContext>,
stream: Option<GioOutputStream>,
}
#[derive(Default)]
@ -163,6 +187,10 @@ impl ObjectImpl for HlsBaseSink {
.blurb("Write \"EXT-X-ENDLIST\" tag to manifest at the end of stream")
.default_value(DEFAULT_ENDLIST)
.build(),
glib::ParamSpecString::builder("single-media-file")
.nick("Single media file")
.blurb("Location of the single media file to write (media playlist will use byte-range addressing)")
.build(),
]
});
@ -208,6 +236,11 @@ impl ObjectImpl for HlsBaseSink {
"enable-endlist" => {
settings.enable_endlist = value.get().expect("type checked upstream");
}
"single-media-file" => {
settings.single_media_file = value
.get::<Option<String>>()
.expect("type checked upstream");
}
_ => unimplemented!(),
};
}
@ -228,6 +261,7 @@ impl ObjectImpl for HlsBaseSink {
.to_value(),
"program-date-time-reference" => settings.program_date_time_reference.to_value(),
"enable-endlist" => settings.enable_endlist.to_value(),
"single-media-file" => settings.single_media_file.to_value(),
_ => unimplemented!(),
}
}
@ -335,6 +369,7 @@ impl HlsBaseSink {
playlist_location: settings.playlist_location.clone(),
max_num_segment_files: settings.max_num_segment_files,
playlist_length: settings.playlist_length,
single_media_file: settings.single_media_file.is_some(),
});
}
@ -351,17 +386,21 @@ impl HlsBaseSink {
}
pub fn get_location(&self, fragment_id: u32) -> Option<String> {
let mut state = self.state.lock().unwrap();
let context = match state.context.as_mut() {
Some(context) => context,
None => {
gst::error!(CAT, imp = self, "Playlist is not configured",);
let settings = self.settings.lock().unwrap();
if settings.single_media_file.is_none() {
let mut state = self.state.lock().unwrap();
let context = match state.context.as_mut() {
Some(context) => context,
None => {
gst::error!(CAT, imp = self, "Playlist is not configured",);
return None;
}
};
return None;
}
};
sprintf::sprintf!(&context.segment_template, fragment_id).ok()
sprintf::sprintf!(&context.segment_template, fragment_id).ok()
} else {
settings.single_media_file.clone()
}
}
pub fn get_fragment_stream(&self, fragment_id: u32) -> Option<(gio::OutputStream, String)> {
@ -375,22 +414,42 @@ impl HlsBaseSink {
}
};
let location = match sprintf::sprintf!(&context.segment_template, fragment_id) {
Ok(file_name) => file_name,
Err(err) => {
gst::error!(CAT, imp = self, "Couldn't build file name, err: {:?}", err,);
let settings = self.settings.lock().unwrap();
if settings.single_media_file.is_none() {
let location = match sprintf::sprintf!(&context.segment_template, fragment_id) {
Ok(file_name) => file_name,
Err(err) => {
gst::error!(CAT, imp = self, "Couldn't build file name, err: {:?}", err,);
return None;
}
};
return None;
}
};
let stream = self.obj().emit_by_name::<Option<gio::OutputStream>>(
SIGNAL_GET_FRAGMENT_STREAM,
&[&location],
)?;
gst::trace!(CAT, imp = self, "Segment location formatted: {}", location);
gst::trace!(CAT, imp = self, "Segment location formatted: {}", location);
let stream = self
.obj()
.emit_by_name::<Option<gio::OutputStream>>(SIGNAL_GET_FRAGMENT_STREAM, &[&location])?;
Some((stream, location))
} else {
let location = settings.single_media_file.as_ref().unwrap().clone();
Some((stream, location))
let stream = if let Some(s) = &state.stream {
s.as_output_stream()
} else {
let stream = self.obj().emit_by_name::<Option<gio::OutputStream>>(
SIGNAL_GET_FRAGMENT_STREAM,
&[&location],
)?;
state.stream = Some(GioOutputStream::new(stream.clone()));
stream
};
Some((stream, location))
}
}
pub fn get_segment_uri(&self, location: &str, prefix: Option<&str>) -> String {
@ -563,7 +622,10 @@ impl HlsBaseSink {
gst::FlowError::Error
})?;
if context.playlist.is_type_undefined() && context.max_num_segment_files > 0 {
let delete_fragment = context.playlist.is_type_undefined()
&& context.max_num_segment_files > 0
&& !context.single_media_file;
if delete_fragment {
// Cleanup old segments from filesystem
while context.old_segment_locations.len() > context.max_num_segment_files {
let old_segment_location = context.old_segment_locations.remove(0);
@ -623,4 +685,9 @@ impl HlsBaseSink {
);
});
}
pub fn is_single_media_file(&self) -> bool {
let settings = self.settings.lock().unwrap();
settings.single_media_file.is_some()
}
}

View file

@ -94,6 +94,7 @@ struct HlsCmafSinkState {
segment_idx: u32,
init_segment: Option<m3u8_rs::Map>,
new_header: bool,
offset: u64,
}
#[derive(Default)]
@ -414,32 +415,65 @@ impl HlsCmafSink {
Playlist::new(playlist, turn_vod, true)
}
fn on_init_segment(&self) -> Result<gio::OutputStreamWrite<gio::OutputStream>, String> {
fn on_init_segment(
&self,
init_segment_size: u64,
) -> Result<gio::OutputStreamWrite<gio::OutputStream>, String> {
let settings = self.settings.lock().unwrap();
let mut state = self.state.lock().unwrap();
let location = match sprintf::sprintf!(&settings.init_location, state.init_idx) {
Ok(location) => location,
Err(err) => {
gst::error!(CAT, imp = self, "Couldn't build file name, err: {:?}", err,);
return Err(String::from("Invalid init segment file pattern"));
}
};
let stream = self
.obj()
.emit_by_name::<Option<gio::OutputStream>>(SIGNAL_GET_INIT_STREAM, &[&location])
.ok_or_else(|| String::from("Error while getting fragment stream"))?
.into_write();
let (stream, location, byte_range) = if !base_imp!(self).is_single_media_file() {
let state = self.state.lock().unwrap();
match sprintf::sprintf!(&settings.init_location, state.init_idx) {
Ok(location) => {
let stream = self
.obj()
.emit_by_name::<Option<gio::OutputStream>>(
SIGNAL_GET_INIT_STREAM,
&[&location],
)
.ok_or_else(|| String::from("Error while getting init stream"))?
.into_write();
(stream, location, None)
}
Err(err) => {
gst::error!(CAT, imp = self, "Couldn't build file name, err: {:?}", err,);
return Err(String::from("Invalid init segment file pattern"));
}
}
} else {
let (stream, location) = self.on_new_fragment().map_err(|err| {
gst::error!(
CAT,
imp = self,
"Couldn't get fragment stream for init segment, {err}",
);
String::from("Couldn't get fragment stream for init segment")
})?;
(
stream,
location,
Some(m3u8_rs::ByteRange {
length: init_segment_size,
offset: Some(0),
}),
)
};
let uri =
base_imp!(self).get_segment_uri(&location, settings.playlist_root_init.as_deref());
let mut state = self.state.lock().unwrap();
state.init_segment = Some(m3u8_rs::Map {
uri,
byte_range,
..Default::default()
});
state.new_header = true;
state.init_idx += 1;
state.offset = init_segment_size;
Ok(stream)
}
@ -463,6 +497,7 @@ impl HlsCmafSink {
running_time: Option<gst::ClockTime>,
location: String,
timestamp: Option<DateTime<Utc>>,
byte_range: Option<m3u8_rs::ByteRange>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let uri = base_imp!(self).get_segment_uri(&location, None);
let mut state = self.state.lock().unwrap();
@ -483,6 +518,7 @@ impl HlsCmafSink {
uri,
duration: duration.mseconds() as f32 / 1_000f32,
map,
byte_range,
..Default::default()
},
)
@ -496,7 +532,7 @@ impl HlsCmafSink {
.flags()
.contains(gst::BufferFlags::DISCONT | gst::BufferFlags::HEADER)
{
let mut stream = self.on_init_segment().map_err(|err| {
let mut stream = self.on_init_segment(first.size() as u64).map_err(|err| {
gst::error!(
CAT,
imp = self,
@ -561,6 +597,18 @@ impl HlsCmafSink {
gst::FlowError::Error
})?;
self.add_segment(duration, running_time, location, None)
let byte_range = if base_imp!(self).is_single_media_file() {
let length = buffer_list.calculate_size() as u64;
let mut state = self.state.lock().unwrap();
let offset = Some(state.offset);
state.offset += length;
Some(m3u8_rs::ByteRange { length, offset })
} else {
None
};
self.add_segment(duration, running_time, location, None, byte_range)
}
}

View file

@ -17,6 +17,8 @@ use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use m3u8_rs::{MediaPlaylist, MediaPlaylistType, MediaSegment};
use std::io::Write;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::Mutex;
@ -36,6 +38,89 @@ macro_rules! base_imp {
};
}
// `splitmuxsink` does not know the size of the fragment written out by
// the muxer. We track this by using a wrapper around the OutputStream.
// Implementing Write trait for this allows passing it to a WriteOutputStream.
#[derive(Clone)]
struct CountingOutputStream {
inner: Arc<Mutex<CountingOutputStreamInner>>,
}
impl CountingOutputStream {
pub fn new(stream: gio::OutputStream) -> Self {
Self {
inner: Arc::new(Mutex::new(CountingOutputStreamInner::new(stream))),
}
}
pub fn out_bytes(&self) -> u64 {
let inner = self.inner.lock().unwrap();
inner.out_bytes()
}
}
impl Write for CountingOutputStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut inner = self.inner.lock().unwrap();
inner.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
let mut inner = self.inner.lock().unwrap();
inner.flush()
}
}
struct CountingOutputStreamInner {
stream: gio::OutputStream,
data: Vec<u8>,
out_bytes: u64,
}
unsafe impl Send for CountingOutputStreamInner {}
unsafe impl Sync for CountingOutputStreamInner {}
impl CountingOutputStreamInner {
pub fn new(stream: gio::OutputStream) -> Self {
Self {
stream,
data: Vec::new(),
out_bytes: 0,
}
}
pub fn out_bytes(&self) -> u64 {
self.out_bytes
}
fn inner_flush(&mut self) -> std::io::Result<()> {
let data_len = self.data.len() as u64;
if data_len == 0 {
return Ok(());
}
let data: Vec<u8> = self.data.drain(0..).collect();
let mut s = self.stream.clone().into_write();
s.write(&data).unwrap();
self.out_bytes = data_len;
Ok(())
}
}
impl Write for CountingOutputStreamInner {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.data.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
self.inner_flush()
}
}
/// Offset between NTP and UNIX epoch in seconds.
/// NTP = UNIX + NTP_UNIX_OFFSET.
const NTP_UNIX_OFFSET: u64 = 2_208_988_800;
@ -161,6 +246,8 @@ struct HlsSink3State {
fragment_running_time: Option<gst::ClockTime>,
current_segment_location: Option<String>,
fragment_start_timestamp: Option<DateTime<Utc>>,
stream: Option<CountingOutputStream>,
offset: u64,
}
#[derive(Default)]
@ -543,8 +630,14 @@ impl HlsSink3 {
(false, playlist_type)
};
let version = if i_frames_only || base_imp!(self).is_single_media_file() {
Some(4)
} else {
Some(3)
};
let playlist = MediaPlaylist {
version: if i_frames_only { Some(4) } else { Some(3) },
version,
target_duration: target_duration as u64,
playlist_type,
i_frames_only,
@ -575,16 +668,29 @@ impl HlsSink3 {
state.fragment_running_time = running_time;
let settings = self.settings.lock().unwrap();
settings
.giostreamsink
.set_property("stream", &fragment_stream);
let stream = if base_imp!(self).is_single_media_file() {
if state.stream.is_none() {
let gios = CountingOutputStream::new(fragment_stream);
let stream =
gio::WriteOutputStream::new(gios.clone()).upcast::<gio::OutputStream>();
state.stream = Some(gios);
stream
} else {
gio::WriteOutputStream::new(state.stream.as_ref().unwrap().clone())
.upcast::<gio::OutputStream>()
}
} else {
gst::info!(
CAT,
imp = self,
"New segment location: {:?}",
segment_file_location,
);
gst::info!(
CAT,
imp = self,
"New segment location: {:?}",
segment_file_location,
);
fragment_stream
};
settings.giostreamsink.set_property("stream", &stream);
Ok(segment_file_location)
}
@ -638,6 +744,17 @@ impl HlsSink3 {
let running_time = state.fragment_running_time;
let fragment_start_timestamp = state.fragment_start_timestamp.take();
let byte_range = if base_imp!(self).is_single_media_file() {
let length = state.stream.as_ref().unwrap().out_bytes();
let offset = state.offset;
state.offset += length;
Some(m3u8_rs::ByteRange {
length,
offset: Some(offset),
})
} else {
None
};
drop(state);
@ -650,6 +767,7 @@ impl HlsSink3 {
MediaSegment {
uri,
duration: duration_msec,
byte_range,
..Default::default()
},
);