hlssink3/hlscmafsink: Make GioOutputStream a sub-class of OutputStream

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2439>
This commit is contained in:
Sanchayan Maity 2025-08-11 18:56:09 +05:30
parent 917e1f5885
commit 1f68cf4f44
3 changed files with 113 additions and 129 deletions

View file

@ -10,6 +10,7 @@
use crate::playlist::Playlist;
use chrono::{DateTime, Duration, Utc};
use gio::prelude::*;
use gio::subclass::prelude::OutputStreamImpl;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
@ -17,8 +18,7 @@ use m3u8_rs::MediaSegment;
use std::fs;
use std::io::Write;
use std::path;
use std::sync::LazyLock;
use std::sync::Mutex;
use std::sync::{LazyLock, Mutex};
const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8";
const DEFAULT_MAX_NUM_SEGMENT_FILES: u32 = 10;
@ -60,20 +60,70 @@ pub enum HlsProgramDateTimeReference {
// We need to keep an OutputStream around for writing to the same file
// to support the single media file use case. OutputStream not being
// thread safe, use this wrapper to keep an OutputStream around in State.
struct GioOutputStream {
stream: gio::OutputStream,
#[derive(Default)]
pub struct HlsBaseSinkGioOutputStream {
stream: Mutex<Option<gio::OutputStream>>,
out_bytes: Mutex<usize>,
}
unsafe impl Send for GioOutputStream {}
unsafe impl Sync for GioOutputStream {}
unsafe impl Send for HlsBaseSinkGioOutputStream {}
unsafe impl Sync for HlsBaseSinkGioOutputStream {}
impl GioOutputStream {
pub fn new(stream: gio::OutputStream) -> Self {
Self { stream }
#[glib::object_subclass]
impl ObjectSubclass for HlsBaseSinkGioOutputStream {
const NAME: &'static str = "GstHlsBaseSinkGioOutputStream";
type Type = super::HlsBaseSinkGioOutputStream;
type ParentType = gio::OutputStream;
}
impl ObjectImpl for HlsBaseSinkGioOutputStream {}
impl OutputStreamImpl for HlsBaseSinkGioOutputStream {
fn write(
&self,
buffer: &[u8],
cancellable: Option<&gio::Cancellable>,
) -> Result<usize, glib::Error> {
let stream = self.stream.lock().unwrap();
if let Some(ref s) = *stream {
s.write_all(buffer, cancellable)?;
let mut out_bytes = self.out_bytes.lock().unwrap();
*out_bytes += buffer.len();
return Ok(buffer.len());
}
Ok(0)
}
pub fn as_output_stream(&self) -> gio::OutputStream {
self.stream.clone()
fn close(&self, cancellable: Option<&gio::Cancellable>) -> Result<(), glib::Error> {
let stream = self.stream.lock().unwrap().take();
if let Some(ref s) = stream {
s.close(cancellable)?;
}
Ok(())
}
fn flush(&self, cancellable: Option<&gio::Cancellable>) -> Result<(), glib::Error> {
let stream = self.stream.lock().unwrap();
if let Some(ref s) = *stream {
s.flush(cancellable)?;
}
Ok(())
}
}
impl super::HlsBaseSinkGioOutputStream {
pub fn new(stream: gio::OutputStream) -> Self {
let obj = glib::Object::new::<Self>();
*obj.imp().stream.lock().unwrap() = Some(stream);
obj
}
pub fn out_bytes(&self) -> usize {
let out_bytes = self.imp().out_bytes.lock().unwrap();
*out_bytes
}
}
@ -118,7 +168,7 @@ pub struct PlaylistContext {
#[derive(Default)]
pub struct State {
context: Option<PlaylistContext>,
stream: Option<GioOutputStream>,
stream: Option<super::HlsBaseSinkGioOutputStream>,
}
#[derive(Default)]
@ -434,21 +484,23 @@ impl HlsBaseSink {
Some((stream, location))
} else {
let location = settings.single_media_file.as_ref().unwrap().clone();
let stream = if let Some(s) = &state.stream {
s.as_output_stream()
} else {
if state.stream.is_none() {
let stream = self.obj().emit_by_name::<Option<gio::OutputStream>>(
SIGNAL_GET_FRAGMENT_STREAM,
&[&location],
)?;
state.stream = Some(GioOutputStream::new(stream.clone()));
let gios = super::HlsBaseSinkGioOutputStream::new(stream);
let stream = gios.upcast_ref::<gio::OutputStream>().clone();
stream
};
state.stream = Some(gios);
Some((stream, location))
Some((stream, location))
} else {
let gios = state.stream.as_ref().unwrap();
let stream = gios.upcast_ref::<gio::OutputStream>().clone();
Some((stream, location))
}
}
}
@ -699,6 +751,11 @@ impl HlsBaseSink {
settings.single_media_file.is_some()
}
pub fn out_bytes(&self) -> usize {
let state = self.state.lock().unwrap();
state.stream.as_ref().unwrap().out_bytes()
}
fn byte_ranges(
&self,
context: &PlaylistContext,

View file

@ -17,8 +17,6 @@ use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use m3u8_rs::{MediaPlaylist, MediaPlaylistType, MediaSegment};
use std::io::Write;
use std::sync::Arc;
use std::sync::LazyLock;
use std::sync::Mutex;
@ -38,89 +36,6 @@ macro_rules! base_imp {
};
}
// `splitmuxsink` does not know the size of the fragment written out by
// the muxer. We track this by using a wrapper around the OutputStream.
// Implementing Write trait for this allows passing it to a WriteOutputStream.
#[derive(Clone)]
struct CountingOutputStream {
inner: Arc<Mutex<CountingOutputStreamInner>>,
}
impl CountingOutputStream {
pub fn new(stream: gio::OutputStream) -> Self {
Self {
inner: Arc::new(Mutex::new(CountingOutputStreamInner::new(stream))),
}
}
pub fn out_bytes(&self) -> u64 {
let inner = self.inner.lock().unwrap();
inner.out_bytes()
}
}
impl Write for CountingOutputStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
let mut inner = self.inner.lock().unwrap();
inner.write(buf)
}
fn flush(&mut self) -> std::io::Result<()> {
let mut inner = self.inner.lock().unwrap();
inner.flush()
}
}
struct CountingOutputStreamInner {
stream: gio::OutputStream,
data: Vec<u8>,
out_bytes: u64,
}
unsafe impl Send for CountingOutputStreamInner {}
unsafe impl Sync for CountingOutputStreamInner {}
impl CountingOutputStreamInner {
pub fn new(stream: gio::OutputStream) -> Self {
Self {
stream,
data: Vec::new(),
out_bytes: 0,
}
}
pub fn out_bytes(&self) -> u64 {
self.out_bytes
}
fn inner_flush(&mut self) -> std::io::Result<()> {
let data_len = self.data.len() as u64;
if data_len == 0 {
return Ok(());
}
let data: Vec<u8> = self.data.drain(0..).collect();
let mut s = self.stream.clone().into_write();
s.write(&data).unwrap();
self.out_bytes = data_len;
Ok(())
}
}
impl Write for CountingOutputStreamInner {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
self.data.extend_from_slice(buf);
Ok(buf.len())
}
fn flush(&mut self) -> std::io::Result<()> {
self.inner_flush()
}
}
/// Offset between NTP and UNIX epoch in seconds.
/// NTP = UNIX + NTP_UNIX_OFFSET.
const NTP_UNIX_OFFSET: u64 = 2_208_988_800;
@ -246,7 +161,6 @@ struct HlsSink3State {
fragment_running_time: Option<gst::ClockTime>,
current_segment_location: Option<String>,
fragment_start_timestamp: Option<DateTime<Utc>>,
stream: Option<CountingOutputStream>,
offset: u64,
}
@ -630,7 +544,25 @@ impl HlsSink3 {
(false, playlist_type)
};
let version = if i_frames_only || base_imp!(self).is_single_media_file() {
let is_single_media_file = base_imp!(self).is_single_media_file();
if is_single_media_file {
// `splitmuxsink` will stop the sink on every fragment,
// don't do that for single media file case as we need
// to keep the stream around for writing.
let giostreamsink = self.settings.lock().unwrap().giostreamsink.clone();
if giostreamsink.has_property_with_type("close-on-stop", bool::static_type()) {
giostreamsink.set_property("close-on-stop", false);
} else {
gst::element_imp_error!(
self,
gst::ResourceError::Settings,
("Invalid configuration"),
["Single media file support with hlssink3 needs GStreamer 1.24 or later"]
);
}
}
let version = if i_frames_only || is_single_media_file {
Some(4)
} else {
Some(3)
@ -668,29 +600,17 @@ impl HlsSink3 {
state.fragment_running_time = running_time;
let settings = self.settings.lock().unwrap();
let stream = if base_imp!(self).is_single_media_file() {
if state.stream.is_none() {
let gios = CountingOutputStream::new(fragment_stream);
let stream =
gio::WriteOutputStream::new(gios.clone()).upcast::<gio::OutputStream>();
state.stream = Some(gios);
stream
} else {
gio::WriteOutputStream::new(state.stream.as_ref().unwrap().clone())
.upcast::<gio::OutputStream>()
}
} else {
gst::info!(
CAT,
imp = self,
"New segment location: {:?}",
segment_file_location,
);
fragment_stream
};
gst::info!(
CAT,
imp = self,
"New segment location: {:?}",
segment_file_location,
);
settings.giostreamsink.set_property("stream", &stream);
settings
.giostreamsink
.set_property("stream", &fragment_stream);
Ok(segment_file_location)
}
@ -745,8 +665,8 @@ impl HlsSink3 {
let running_time = state.fragment_running_time;
let fragment_start_timestamp = state.fragment_start_timestamp.take();
let byte_range = if base_imp!(self).is_single_media_file() {
let length = state.stream.as_ref().unwrap().out_bytes();
let offset = state.offset;
let length = base_imp!(self).out_bytes() as u64 - offset;
state.offset += length;
Some(m3u8_rs::ByteRange {
length,

View file

@ -19,6 +19,13 @@ pub mod hlscmafsink;
pub mod hlssink3;
mod playlist;
glib::wrapper! {
pub struct HlsBaseSinkGioOutputStream(ObjectSubclass<hlsbasesink::HlsBaseSinkGioOutputStream>) @extends gio::OutputStream;
}
unsafe impl Send for HlsBaseSinkGioOutputStream {}
unsafe impl Sync for HlsBaseSinkGioOutputStream {}
glib::wrapper! {
pub struct HlsBaseSink(ObjectSubclass<hlsbasesink::HlsBaseSink>) @extends gst::Bin, gst::Element, gst::Object;
}