Use gio::OutputStream to enable flexibility

This commit is contained in:
Rafael Caricio 2021-05-13 19:54:23 +02:00
parent 8ffc3b65e6
commit b160278c6b
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
4 changed files with 50 additions and 79 deletions

2
.gitignore vendored
View file

@ -1,2 +1,4 @@
/target /target
Cargo.lock Cargo.lock
playlist.m3u8
segment*

View file

@ -12,8 +12,9 @@ crate-type = ["cdylib", "rlib", "staticlib"]
path = "src/lib.rs" path = "src/lib.rs"
[dependencies] [dependencies]
glib = { git = "https://github.com/gtk-rs/gtk-rs" } glib = { git = "https://github.com/gtk-rs/gtk-rs-core" }
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"] } gio = { git = "https://github.com/gtk-rs/gtk-rs-core" }
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] } gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_16"] }
gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"]} gst-app = { package = "gstreamer-app", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_14"]}
once_cell = "1.7.2" once_cell = "1.7.2"

View file

@ -1,3 +1,4 @@
use gio::prelude::*;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
@ -10,6 +11,7 @@ use std::fs::{File, OpenOptions};
use std::io::Write; use std::io::Write;
use std::path; use std::path;
use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::{Arc, Mutex, MutexGuard};
use gio::glib::WeakRef;
const DEFAULT_LOCATION: &str = "segment%05d.ts"; const DEFAULT_LOCATION: &str = "segment%05d.ts";
const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8"; const DEFAULT_PLAYLIST_LOCATION: &str = "playlist.m3u8";
@ -40,7 +42,7 @@ struct Settings {
// TODO: old_locations ? Maybe just use another thread and send msgs with files to delete ? // TODO: old_locations ? Maybe just use another thread and send msgs with files to delete ?
splitmuxsink: Option<gst::Element>, splitmuxsink: Option<gst::Element>,
app_sink: Option<gst::Element>, giostreamsink: Option<gst::Element>,
muxer: Option<gst::Element>, muxer: Option<gst::Element>,
video_sink: bool, video_sink: bool,
audio_sink: bool, audio_sink: bool,
@ -58,7 +60,7 @@ impl Default for Settings {
send_keyframe_requests: DEFAULT_SEND_KEYFRAME_REQUESTS, send_keyframe_requests: DEFAULT_SEND_KEYFRAME_REQUESTS,
splitmuxsink: None, splitmuxsink: None,
app_sink: None, giostreamsink: None,
muxer: None, muxer: None,
video_sink: false, video_sink: false,
audio_sink: false, audio_sink: false,
@ -110,17 +112,13 @@ impl FlexHlsSink {
} => (current_segment_location, current_segment_file), } => (current_segment_location, current_segment_file),
}; };
let settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let seq_num = format!("{:0>5}", fragment_id); let seq_num = format!("{:0>5}", fragment_id);
let segment_file_location = settings let segment_file_location = settings
.location .location
.replace(BACKWARDS_COMPATIBLE_PLACEHOLDER, &seq_num); .replace(BACKWARDS_COMPATIBLE_PLACEHOLDER, &seq_num);
gst_trace!( gst_trace!(CAT, "Segment location formatted: {}", segment_file_location);
CAT,
"Segment location formatted: {}",
segment_file_location
);
let segment_file_location_clone = segment_file_location.clone(); let segment_file_location_clone = segment_file_location.clone();
let segment_file = File::create(&segment_file_location).map_err(move |err| { let segment_file = File::create(&segment_file_location).map_err(move |err| {
@ -135,6 +133,12 @@ impl FlexHlsSink {
*current_segment_location = Some(segment_file_location.clone()); *current_segment_location = Some(segment_file_location.clone());
*current_segment_file = Some(segment_file); *current_segment_file = Some(segment_file);
let giostreamsink = settings.giostreamsink.as_ref().unwrap();
let stream = self
.get_fragment_stream(segment_file_location.clone())
.map_err(|err| err.to_string())?;
giostreamsink.set_property("stream", &stream).unwrap();
gst_info!( gst_info!(
CAT, CAT,
"New segment location: {}", "New segment location: {}",
@ -143,6 +147,21 @@ impl FlexHlsSink {
Ok(segment_file_location) Ok(segment_file_location)
} }
fn get_fragment_stream(&self, location: String) -> Result<gio::WriteOutputStream, glib::Error> {
let file_stream = File::create(&location).map_err(|err| {
glib::Error::new(
gst::URIError::BadReference,
format!(
"Could create segment file {} for writing: {}",
&location,
err.to_string()
)
.as_str(),
)
})?;
Ok(gio::WriteOutputStream::new(file_stream))
}
fn start( fn start(
&self, &self,
element: &super::FlexHlsSink, element: &super::FlexHlsSink,
@ -486,11 +505,9 @@ impl ObjectImpl for FlexHlsSink {
let splitmuxsink = gst::ElementFactory::make("splitmuxsink", Some("split_mux_sink")) let splitmuxsink = gst::ElementFactory::make("splitmuxsink", Some("split_mux_sink"))
.expect("Could not make element splitmuxsink"); .expect("Could not make element splitmuxsink");
let app_sink = gst::ElementFactory::make("appsink", Some("giostreamsink_replacement_sink")) let giostreamsink = gst::ElementFactory::make("giostreamsink", Some("giostream_sink"))
.expect("Could not make element appsink"); .expect("Could not make element giostreamsink");
// app_sink.set_property("sync", &false).unwrap(); giostreamsink.set_property("async", &false).unwrap();
// app_sink.set_property("async", &false).unwrap();
// app_sink.set_property("emit-signals", &true).unwrap();
let mux = gst::ElementFactory::make("mpegtsmux", Some("mpeg-ts_mux")) let mux = gst::ElementFactory::make("mpegtsmux", Some("mpeg-ts_mux"))
.expect("Could not make element mpegtsmux"); .expect("Could not make element mpegtsmux");
@ -505,9 +522,8 @@ impl ObjectImpl for FlexHlsSink {
), ),
("send-keyframe-requests", &true), ("send-keyframe-requests", &true),
("muxer", &mux), ("muxer", &mux),
("sink", &app_sink), ("sink", &giostreamsink),
("reset-muxer", &false), ("reset-muxer", &false),
("async-finalize", &false),
]) ])
.unwrap(); .unwrap();
@ -531,59 +547,11 @@ impl ObjectImpl for FlexHlsSink {
}) })
.unwrap(); .unwrap();
let appsink = app_sink.downcast_ref::<gst_app::AppSink>().unwrap(); let temp_stream = gio::MemoryOutputStream::new_resizable();
appsink.set_emit_signals(true); giostreamsink.set_property("stream", &temp_stream).unwrap();
appsink.connect_eos(|appsink| {
gst_info!(CAT, "Got EOS from giostreamsink_replacement_sink");
});
let this = self.clone();
let element_weak = obj.downgrade();
appsink.connect_new_sample(move |appsink| {
gst_info!(CAT, "Got new sample from giostreamsink_replacement_sink");
let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
gst_info!(CAT, "Got new sample buffer[{}]", buffer.size());
let mut state = this.state.lock().unwrap();
let (current_segment_file, current_segment_location) = match &mut *state {
State::Stopped => return Err(gst::FlowError::Error),
State::Started {
current_segment_file,
current_segment_location,
..
} => (current_segment_file, current_segment_location),
};
if let (Some(segment_file), Some(segment_location)) =
(current_segment_file, current_segment_location)
{
let segment_location = segment_location.clone();
let data = buffer.map_readable().unwrap();
segment_file.write(&data).map_err(|err| {
let error_msg = gst::error_msg!(
gst::ResourceError::OpenWrite,
[
"Could not write to segment file \"{}\": {}",
segment_location,
err.to_string(),
]
);
let element = element_weak.upgrade().unwrap();
element.post_error_message(error_msg);
gst::FlowError::Error
})?;
}
Ok(gst::FlowSuccess::Ok)
});
settings.splitmuxsink = Some(splitmuxsink); settings.splitmuxsink = Some(splitmuxsink);
settings.app_sink = Some(app_sink); settings.giostreamsink = Some(giostreamsink);
settings.muxer = Some(mux); settings.muxer = Some(mux);
} }
} }
@ -679,7 +647,9 @@ impl ElementImpl for FlexHlsSink {
Some(sms) => sms, Some(sms) => sms,
}; };
let peer_pad = splitmuxsink.request_pad_simple("audio_0").unwrap(); let peer_pad = splitmuxsink.request_pad_simple("audio_0").unwrap();
let sink_pad = gst::GhostPad::from_template_with_target(&templ, Some("audio"), &peer_pad).unwrap(); let sink_pad =
gst::GhostPad::from_template_with_target(&templ, Some("audio"), &peer_pad)
.unwrap();
element.add_pad(&sink_pad).unwrap(); element.add_pad(&sink_pad).unwrap();
sink_pad.set_active(true).unwrap(); sink_pad.set_active(true).unwrap();
settings.audio_sink = true; settings.audio_sink = true;
@ -701,7 +671,9 @@ impl ElementImpl for FlexHlsSink {
}; };
let peer_pad = splitmuxsink.request_pad_simple("video").unwrap(); let peer_pad = splitmuxsink.request_pad_simple("video").unwrap();
let sink_pad = gst::GhostPad::from_template_with_target(&templ, Some("video"), &peer_pad).unwrap(); let sink_pad =
gst::GhostPad::from_template_with_target(&templ, Some("video"), &peer_pad)
.unwrap();
element.add_pad(&sink_pad).unwrap(); element.add_pad(&sink_pad).unwrap();
sink_pad.set_active(true).unwrap(); sink_pad.set_active(true).unwrap();
settings.video_sink = true; settings.video_sink = true;
@ -709,13 +681,9 @@ impl ElementImpl for FlexHlsSink {
Some(sink_pad.upcast()) Some(sink_pad.upcast())
} }
None => { None => {
gst_debug!( gst_debug!(CAT, obj: element, "template name returned `None`",);
CAT,
obj: element,
"template name returned `None`",
);
None None
}, }
Some(other_name) => { Some(other_name) => {
gst_debug!( gst_debug!(
CAT, CAT,

View file

@ -29,7 +29,7 @@ fn init() {
fn test_basic_element_with_video_content() { fn test_basic_element_with_video_content() {
init(); init();
const BUFFER_NB: i32 = 200; const BUFFER_NB: i32 = 100;
let pipeline = gst::Pipeline::new(Some("video_pipeline")); let pipeline = gst::Pipeline::new(Some("video_pipeline"));
@ -93,7 +93,7 @@ fn test_basic_element_with_video_content() {
let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?; let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?; let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
gst_info!(CAT, "TEST sample buffer[{}]", buffer.size()); //gst_info!(CAT, "TEST sample buffer[{}]", buffer.size());
sender.send(()).unwrap(); sender.send(()).unwrap();
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
@ -108,7 +108,7 @@ fn test_basic_element_with_video_content() {
); );
for idx in 0..BUFFER_NB { for idx in 0..BUFFER_NB {
receiver.recv().unwrap(); receiver.recv().unwrap();
gst_info!(CAT, "flexhlssink_video_pipeline: received buffer #{}", idx); //gst_info!(CAT, "flexhlssink_video_pipeline: received buffer #{}", idx);
} }
pipeline.set_state(gst::State::Null).unwrap(); pipeline.set_state(gst::State::Null).unwrap();