Trying to fix appsink not receiving signals

This commit is contained in:
Rafael Caricio 2021-05-13 00:49:40 +02:00
parent 7cf41b8709
commit 4ad2b7f850
Signed by: rafaelcaricio
GPG key ID: 3C86DBCE8E93C947
2 changed files with 106 additions and 158 deletions

View file

@ -99,7 +99,7 @@ impl FlexHlsSink {
}
}
fn on_format_location(&self, fragment_id: u32) -> Result<(), String> {
fn on_format_location(&self, fragment_id: u32) -> Result<String, String> {
let mut state = self.state.lock().unwrap();
let (current_segment_location, current_segment_file) = match &mut *state {
State::Stopped => return Err("Not in Started state".to_string()),
@ -132,7 +132,7 @@ impl FlexHlsSink {
err.to_string()
})?;
*current_segment_location = Some(segment_file_location);
*current_segment_location = Some(segment_file_location.clone());
*current_segment_file = Some(segment_file);
gst_info!(
@ -140,7 +140,7 @@ impl FlexHlsSink {
"New segment location: {}",
current_segment_location.as_ref().unwrap()
);
Ok(())
Ok(segment_file_location)
}
fn start(
@ -193,7 +193,7 @@ impl FlexHlsSink {
element: &super::FlexHlsSink,
fragment_closed_at: gst::ClockTime,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_debug!(CAT, obj: element, "Preparing to write new playlist");
gst_info!(CAT, obj: element, "Preparing to write new playlist");
let mut state = self.state.lock().unwrap();
match &mut *state {
@ -486,8 +486,12 @@ impl ObjectImpl for FlexHlsSink {
let splitmuxsink = gst::ElementFactory::make("splitmuxsink", Some("split_mux_sink"))
.expect("Could not make element splitmuxsink");
let app_sink = gst::ElementFactory::make("appsink", Some("app_sink"))
let app_sink = gst::ElementFactory::make("appsink", Some("giostreamsink_replacement_sink"))
.expect("Could not make element appsink");
app_sink.set_property("sync", &false).unwrap();
app_sink.set_property("async", &false).unwrap();
app_sink.set_property("emit-signals", &true).unwrap();
let mux = gst::ElementFactory::make("mpegtsmux", Some("mpeg-ts_mux"))
.expect("Could not make element mpegtsmux");
@ -516,40 +520,43 @@ impl ObjectImpl for FlexHlsSink {
gst_info!(CAT, "Got fragment-id: {}", fragment_id);
if let Err(err) = this.on_format_location(fragment_id) {
gst_error!(CAT, "on format-location handler: {}", err);
match this.on_format_location(fragment_id) {
Ok(segment_location) => Some(segment_location.to_value()),
Err(err) => {
gst_error!(CAT, "on format-location handler: {}", err);
Some("unknown_segment".to_value())
}
}
None
})
.unwrap();
let sink = app_sink.downcast_ref::<gst_app::AppSink>().unwrap();
let appsink = app_sink.downcast_ref::<gst_app::AppSink>().unwrap();
let this = self.clone();
let element_weak = obj.downgrade();
sink.set_callbacks(
gst_app::AppSinkCallbacks::builder()
.new_sample(move |app_sink| {
let sample = app_sink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
appsink.connect_new_sample(move |appsink| {
let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
let mut state = this.state.lock().unwrap();
let (current_segment_file, current_segment_location) = match &mut *state {
State::Stopped => return Err(gst::FlowError::Error),
State::Started {
current_segment_file,
current_segment_location,
..
} => (current_segment_file, current_segment_location),
};
gst_info!(CAT, "Got new sample buffer[{}]", buffer.size());
if let (Some(segment_file), Some(segment_location)) =
(current_segment_file, current_segment_location)
{
let segment_location = segment_location.clone();
let data = buffer.map_readable().unwrap();
segment_file.write(&data).map_err(|err| {
let error_msg = gst::error_msg!(
let mut state = this.state.lock().unwrap();
let (current_segment_file, current_segment_location) = match &mut *state {
State::Stopped => return Err(gst::FlowError::Error),
State::Started {
current_segment_file,
current_segment_location,
..
} => (current_segment_file, current_segment_location),
};
if let (Some(segment_file), Some(segment_location)) =
(current_segment_file, current_segment_location)
{
let segment_location = segment_location.clone();
let data = buffer.map_readable().unwrap();
segment_file.write(&data).map_err(|err| {
let error_msg = gst::error_msg!(
gst::ResourceError::OpenWrite,
[
"Could not write to segment file \"{}\": {}",
@ -557,17 +564,15 @@ impl ObjectImpl for FlexHlsSink {
err.to_string(),
]
);
let element = element_weak.upgrade().unwrap();
element.post_error_message(error_msg);
let element = element_weak.upgrade().unwrap();
element.post_error_message(error_msg);
gst::FlowError::Error
})?;
}
gst::FlowError::Error
})?;
}
Ok(gst::FlowSuccess::Ok)
})
.build(),
);
Ok(gst::FlowSuccess::Ok)
});
settings.splitmuxsink = Some(splitmuxsink);
settings.app_sink = Some(app_sink);
@ -727,8 +732,8 @@ impl ElementImpl for FlexHlsSink {
.as_ref()
.unwrap()
.release_request_pad(pad);
element.remove_pad(pad).unwrap();
pad.set_active(false).unwrap();
element.remove_pad(pad).unwrap();
let ghost_pad = pad.downcast_ref::<gst::GhostPad>().unwrap();
if "audio" == ghost_pad.name() {

View file

@ -4,6 +4,8 @@ use gst::prelude::*;
use gst_base::prelude::*;
use once_cell::sync::Lazy;
use std::sync::mpsc;
use std::thread;
use std::time::Duration;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
@ -27,31 +29,39 @@ fn init() {
fn test_basic_element_with_video_content() {
init();
const BUFFER_NB: i32 = 3;
const BUFFER_NB: i32 = 500;
let pipeline = gst::Pipeline::new(Some("video_pipeline"));
let video_src = gst::ElementFactory::make("videotestsrc", Some("videotestsrc")).unwrap();
let video_src = gst::ElementFactory::make("videotestsrc", Some("test_videotestsrc")).unwrap();
video_src.set_property("is-live", &true).unwrap();
video_src.set_property("num-buffers", &BUFFER_NB).unwrap();
let x264enc= gst::ElementFactory::make("x264enc", Some("x264enc")).unwrap();
let h264parse = gst::ElementFactory::make("h264parse", Some("h264parse")).unwrap();
let x264enc= gst::ElementFactory::make("x264enc", Some("test_x264enc")).unwrap();
let h264parse = gst::ElementFactory::make("h264parse", Some("test_h264parse")).unwrap();
let flexhlssink = gst::ElementFactory::make("flexhlssink", Some("flexhlssink")).unwrap();
let tee = gst::ElementFactory::make("tee", Some("test_tee")).unwrap();
let hls_queue = gst::ElementFactory::make("queue", Some("test_hls_queue")).unwrap();
let flexhlssink = gst::ElementFactory::make("flexhlssink", Some("test_flexhlssink")).unwrap();
flexhlssink.set_property("target-duration", &6u32).unwrap();
// 4 - 271
// 6 - 391
// let app_sink = gst::ElementFactory::make("appsink", Some("appsink")).unwrap();
// app_sink.set_property("sync", &false).unwrap();
// app_sink.set_property("async", &false).unwrap();
// app_sink.set_property("emit-signals", &true).unwrap();
let app_queue = gst::ElementFactory::make("queue", Some("test_app_queue")).unwrap();
let app_sink = gst::ElementFactory::make("appsink", Some("test_appsink")).unwrap();
app_sink.set_property("sync", &false).unwrap();
app_sink.set_property("async", &false).unwrap();
app_sink.set_property("emit-signals", &true).unwrap();
pipeline
.add_many(&[
&video_src,
&x264enc,
&h264parse,
//&app_sink
&tee,
&app_queue,
&app_sink,
&hls_queue,
&flexhlssink,
])
.unwrap();
@ -60,34 +70,46 @@ fn test_basic_element_with_video_content() {
&video_src,
&x264enc,
&h264parse,
&flexhlssink,
&tee,
]).unwrap();
// let appsink = app_sink.dynamic_cast::<gst_app::AppSink>().unwrap();
// let (sender, receiver) = mpsc::channel();
// appsink.connect_new_sample(move |appsink| {
// let _sample = appsink
// .emit_by_name("pull-sample", &[])
// .unwrap()
// .unwrap()
// .get::<gst::Sample>()
// .unwrap();
//
// sender.send(()).unwrap();
// Ok(gst::FlowSuccess::Ok)
// });
gst::Element::link_many(&[&app_queue, &app_sink]).unwrap();
gst::Element::link_many(&[&hls_queue, &flexhlssink]).unwrap();
// Link the appsink
let tee_app_pad = tee.request_pad_simple("src_%u").unwrap();
let app_queue_pad = app_queue.static_pad("sink").unwrap();
tee_app_pad.link(&app_queue_pad).unwrap();
// Link the flexhlssink branch
let tee_hls_pad = tee.request_pad_simple("src_%u").unwrap();
let hls_queue_pad = hls_queue.static_pad("sink").unwrap();
tee_hls_pad.link(&hls_queue_pad).unwrap();
let appsink = app_sink.dynamic_cast::<gst_app::AppSink>().unwrap();
let (sender, receiver) = mpsc::channel();
appsink.connect_new_sample(move |appsink| {
let sample = appsink.pull_sample().map_err(|_| gst::FlowError::Eos)?;
let buffer = sample.buffer().ok_or(gst::FlowError::Error)?;
gst_info!(CAT, "TEST sample buffer[{}]", buffer.size());
sender.send(()).unwrap();
Ok(gst::FlowSuccess::Ok)
});
pipeline.set_state(gst::State::Playing).unwrap();
//
// gst_info!(
// CAT,
// "flexhlssink_video_pipeline: waiting for {} buffers",
// BUFFER_NB
// );
// for idx in 0..BUFFER_NB {
// receiver.recv().unwrap();
// gst_info!(CAT, "flexhlssink_video_pipeline: received buffer #{}", idx);
// }
gst_info!(
CAT,
"flexhlssink_video_pipeline: waiting for {} buffers",
BUFFER_NB
);
for idx in 0..BUFFER_NB {
receiver.recv().unwrap();
gst_info!(CAT, "flexhlssink_video_pipeline: received buffer #{}", idx);
}
pipeline.set_state(gst::State::Null).unwrap();
}
@ -99,22 +121,16 @@ fn test_basic_element_properties() {
const BUFFER_NB: i32 = 3;
let pipeline = gst::Pipeline::new(None);
let pipeline = gst::Pipeline::new(Some("audio_pipeline"));
let audio_src = gst::ElementFactory::make("audiotestsrc", Some("audiotestsrc")).unwrap();
audio_src.set_property("is-live", &true).unwrap();
audio_src.set_property("num-buffers", &BUFFER_NB).unwrap();
// let video_src = gst::ElementFactory::make("videotestsrc", Some("videotestsrc")).unwrap();
// video_src.set_property("is-live", &true).unwrap();
// video_src.set_property("num-buffers", &BUFFER_NB).unwrap();
let tee = gst::ElementFactory::make("tee", Some("tee")).unwrap();
let hls_queue = gst::ElementFactory::make("queue", Some("hls_queue")).unwrap();
// let decodebin = gst::ElementFactory::make("decodebin", Some("decodebin_base")).unwrap();
// let audio_convert = gst::ElementFactory::make("audioconvert", Some("audioconvert")).unwrap();
let hls_avenc_aac = gst::ElementFactory::make("avenc_aac", Some("hls_avenc_aac")).unwrap();
let flexhlssink = gst::ElementFactory::make("flexhlssink", Some("flexhlssink")).unwrap();
flexhlssink.set_property("target-duration", &6u32).unwrap();
@ -132,8 +148,6 @@ fn test_basic_element_properties() {
&app_queue,
&app_sink,
&hls_queue,
//&decodebin,
//&audio_convert,
&hls_avenc_aac,
&flexhlssink,
])
@ -142,87 +156,16 @@ fn test_basic_element_properties() {
gst::Element::link_many(&[&audio_src, &tee]).unwrap();
gst::Element::link_many(&[&app_queue, &app_sink]).unwrap();
gst::Element::link_many(&[&hls_queue, &hls_avenc_aac, &flexhlssink]).unwrap();
// gst::Element::link_many(&[&audio_convert, &flexhlssink]).unwrap();
// gst::Element::link_many(&[&hls_queue, &decodebin, &audio_convert, &flexhlssink]).unwrap();
// hls_queue.link_pads(Some("src"), &hls_audio_convert, Some("sink")).unwrap();
// audio_convert.link_pads(Some("src"), , Some("audio")).unwrap();
// Link the appsink
let tee_app_pad = tee.request_pad_simple("src_%u").unwrap();
println!(
"Obtained request pad {} for the app branch",
tee_app_pad.name()
);
let app_queue_pad = app_queue.static_pad("sink").unwrap();
tee_app_pad.link(&app_queue_pad).unwrap();
// Link the flexhlssink branch
let tee_hls_pad = tee.request_pad_simple("src_%u").unwrap();
println!(
"Obtained request pad {} for the HLS branch",
tee_hls_pad.name()
);
let hls_queue_pad = hls_queue.static_pad("sink").unwrap();
tee_hls_pad.link(&hls_queue_pad).unwrap();
//
// let audio_convert_clone = audio_convert.clone();
// let flexhlssink_clone = flexhlssink.clone();
// decodebin.connect_pad_added(move |_, pad| {
// let caps = pad.current_caps().unwrap();
// let s = caps.structure(0).unwrap();
//
// let audio_convert_sink_pad = audio_convert_clone.static_pad("sink").unwrap();
//
// if s.name() == "audio/x-raw" && !audio_convert_sink_pad.is_linked() {
// pad.link(&audio_convert_sink_pad).unwrap();
//
// let audio_convert_src_pad = audio_convert_clone.static_pad("src").unwrap();
// let hls_audio_pad = flexhlssink_clone.request_pad_simple("audio").unwrap();
// println!("Caps for new audio_convert_src_pad: {:?}", audio_convert_src_pad.caps());
// println!("Caps for new hls_audio_pad: {:?}", hls_audio_pad.caps());
// audio_convert_src_pad.link(&hls_audio_pad).unwrap();
// }
// });
// audio_src.connect_pad_added(move |src, src_pad| {
// println!(
// "Received new pad {} from {}",
// src_pad.name(),
// src.name()
// );
// let tee_sink_pad = tee.request_pad_simple("sink").unwrap();
// if tee_sink_pad.is_linked() {
// println!("Already linked!");
// return;
// }
//
// let new_pad_caps = src_pad
// .current_caps()
// .expect("Failed to get caps of new pad.");
// let new_pad_struct = new_pad_caps
// .structure(0)
// .expect("Failed to get first structure of caps.");
// let new_pad_type = new_pad_struct.name();
// let is_audio = new_pad_type.starts_with("audio/x-raw");
// if !is_audio {
// println!(
// "It has type {} which is not audio. Ignoring.",
// new_pad_type
// );
// return;
// }
//
// src_pad.link(&tee_sink_pad).unwrap();
// });
// let audio_convert_pad = audio_convert.static_pad("src").unwrap();
// println!(
// "Obtained request pad {} from the audioconvert",
// audio_convert_pad.name()
// );
// println!("Caps for new hls_audio_pad: {:?}", hls_audio_pad.allowed_caps());
// audio_convert_pad.link(&hls_audio_pad).unwrap();
let appsink = app_sink.dynamic_cast::<gst_app::AppSink>().unwrap();
let (sender, receiver) = mpsc::channel();
@ -242,12 +185,12 @@ fn test_basic_element_properties() {
gst_info!(
CAT,
"flexhlssink_pipeline: waiting for {} buffers",
"audio_pipeline: waiting for {} buffers",
BUFFER_NB
);
for idx in 0..BUFFER_NB {
receiver.recv().unwrap();
gst_info!(CAT, "flexhlssink_pipeline: received buffer #{}", idx);
gst_info!(CAT, "audio_pipeline: received buffer #{}", idx);
}
pipeline.set_state(gst::State::Null).unwrap();