mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-09-01 09:13:48 +00:00
threadshare: update examples
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2484>
This commit is contained in:
parent
9dcb2afaf7
commit
db8dabf195
7 changed files with 135 additions and 124 deletions
|
@ -6,11 +6,11 @@ fn main() {
|
|||
gst::init().unwrap();
|
||||
|
||||
let pipe_up = gst::parse::launch(
|
||||
"
|
||||
audiotestsrc is-live=true num-buffers=2000 volume=0.02
|
||||
"pipeline. (name=upstream
|
||||
ts-audiotestsrc is-live=true num-buffers=2000 volume=0.02 context=ts-group-1 context-wait=20
|
||||
! opusenc
|
||||
! ts-intersink inter-context=my-inter-ctx
|
||||
",
|
||||
)",
|
||||
)
|
||||
.unwrap()
|
||||
.downcast::<gst::Pipeline>()
|
||||
|
@ -19,14 +19,15 @@ fn main() {
|
|||
// A downstream pipeline which will receive the Opus encoded audio stream
|
||||
// and render it locally.
|
||||
let pipe_down = gst::parse::launch(
|
||||
"
|
||||
ts-intersrc inter-context=my-inter-ctx context=ts-group-01 context-wait=20
|
||||
"pipeline. (name=downstream
|
||||
ts-intersrc inter-context=my-inter-ctx context=ts-group-1 context-wait=20
|
||||
! opusdec
|
||||
! audioconvert
|
||||
! audioresample
|
||||
! ts-queue context=ts-group-01 context-wait=20 max-size-buffers=1 max-size-bytes=0 max-size-time=0
|
||||
! ts-queue context=ts-group-1 context-wait=20 max-size-buffers=1 max-size-bytes=0 max-size-time=0
|
||||
! ts-blocking-adapter
|
||||
! autoaudiosink
|
||||
",
|
||||
)",
|
||||
)
|
||||
.unwrap()
|
||||
.downcast::<gst::Pipeline>()
|
||||
|
@ -46,10 +47,11 @@ fn main() {
|
|||
// started at the same time. However, an application that dynamically
|
||||
// generates pipelines must ensure that all the pipelines that will be
|
||||
// connected together share the same base time.
|
||||
pipe_up.set_base_time(gst::ClockTime::ZERO);
|
||||
let base_time = clock.time();
|
||||
pipe_up.set_start_time(gst::ClockTime::NONE);
|
||||
pipe_down.set_base_time(gst::ClockTime::ZERO);
|
||||
pipe_up.set_base_time(base_time);
|
||||
pipe_down.set_start_time(gst::ClockTime::NONE);
|
||||
pipe_down.set_base_time(base_time);
|
||||
|
||||
pipe_up.set_state(gst::State::Playing).unwrap();
|
||||
pipe_down.set_state(gst::State::Playing).unwrap();
|
||||
|
|
|
@ -50,6 +50,9 @@ fn main() {
|
|||
gstthreadshare::plugin_register_static().unwrap();
|
||||
self::plugin_register_static().unwrap();
|
||||
|
||||
let main_context = glib::MainContext::default();
|
||||
let _guard = main_context.acquire().unwrap();
|
||||
|
||||
#[cfg(debug_assertions)]
|
||||
gst::warning!(CAT, "RUNNING DEBUG BUILD");
|
||||
|
||||
|
@ -115,67 +118,72 @@ fn main() {
|
|||
|
||||
let l = glib::MainLoop::new(None, false);
|
||||
|
||||
let _bus_watch = pipeline
|
||||
.bus()
|
||||
.unwrap()
|
||||
.add_watch({
|
||||
let terminated_count = Arc::new(AtomicU32::new(0));
|
||||
let l = l.clone();
|
||||
let pipeline = pipeline.clone();
|
||||
move |_, msg| {
|
||||
use gst::MessageView::*;
|
||||
match msg.view() {
|
||||
Eos(_) => {
|
||||
// Actually, we don't post EOS (see sinks impl).
|
||||
gst::info!(CAT, "Received eos");
|
||||
l.quit();
|
||||
let bus = pipeline.bus().unwrap();
|
||||
let mut bus_stream = bus.stream();
|
||||
let pipeline_weak = pipeline.downgrade();
|
||||
let l_clone = l.clone();
|
||||
main_context.spawn_local(async move {
|
||||
use futures::prelude::*;
|
||||
|
||||
return glib::ControlFlow::Break;
|
||||
}
|
||||
Error(msg) => {
|
||||
if let gst::MessageView::Error(msg) = msg.message().view() {
|
||||
if msg.error().matches(gst::LibraryError::Shutdown)
|
||||
&& terminated_count.fetch_add(1, Ordering::SeqCst)
|
||||
== args.streams - 1
|
||||
{
|
||||
let terminated_count = Arc::new(AtomicU32::new(0));
|
||||
|
||||
while let Some(msg) = bus_stream.next().await {
|
||||
use gst::MessageView::*;
|
||||
|
||||
let Some(pipeline) = pipeline_weak.upgrade() else {
|
||||
break;
|
||||
};
|
||||
|
||||
match msg.view() {
|
||||
Eos(_) => {
|
||||
// Actually, we don't post EOS (see sinks impl).
|
||||
gst::info!(CAT, "Received eos");
|
||||
l_clone.quit();
|
||||
|
||||
break;
|
||||
}
|
||||
Error(msg) => {
|
||||
if let gst::MessageView::Error(msg) = msg.message().view() {
|
||||
if msg.error().matches(gst::LibraryError::Shutdown) {
|
||||
if terminated_count.fetch_add(1, Ordering::SeqCst) == args.streams - 1 {
|
||||
gst::info!(CAT, "Received all shutdown requests");
|
||||
l.quit();
|
||||
l_clone.quit();
|
||||
|
||||
return glib::ControlFlow::Break;
|
||||
break;
|
||||
} else {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
gst::error!(
|
||||
CAT,
|
||||
"Error from {:?}: {} ({:?})",
|
||||
msg.src().map(|s| s.path_string()),
|
||||
msg.error(),
|
||||
msg.debug()
|
||||
);
|
||||
l.quit();
|
||||
|
||||
return glib::ControlFlow::Break;
|
||||
}
|
||||
Latency(msg) => {
|
||||
gst::info!(
|
||||
CAT,
|
||||
"Latency requirements have changed for element {}",
|
||||
msg.src()
|
||||
.map(|src| src.name())
|
||||
.as_deref()
|
||||
.unwrap_or("UNKNOWN"),
|
||||
);
|
||||
if let Err(err) = pipeline.recalculate_latency() {
|
||||
gst::error!(CAT, "Error recalculating latency: {err}");
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
|
||||
gst::error!(
|
||||
CAT,
|
||||
"Error from {:?}: {} ({:?})",
|
||||
msg.src().map(|s| s.name()),
|
||||
msg.error(),
|
||||
msg.debug()
|
||||
);
|
||||
l_clone.quit();
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
glib::ControlFlow::Continue
|
||||
Latency(msg) => {
|
||||
gst::info!(
|
||||
CAT,
|
||||
"Latency requirements have changed for element {}",
|
||||
msg.src()
|
||||
.map(|src| src.name())
|
||||
.as_deref()
|
||||
.unwrap_or("UNKNOWN"),
|
||||
);
|
||||
if let Err(err) = pipeline.recalculate_latency() {
|
||||
gst::error!(CAT, "Error recalculating latency: {err}");
|
||||
}
|
||||
}
|
||||
_ => (),
|
||||
}
|
||||
})
|
||||
.expect("Failed to add bus watch");
|
||||
}
|
||||
});
|
||||
|
||||
gst::info!(CAT, "Switching to Ready");
|
||||
let start = Instant::now();
|
||||
|
|
|
@ -57,19 +57,19 @@ impl PadSinkHandlerInner {
|
|||
.expect("ts before Segment start");
|
||||
|
||||
if let Some(last_ts) = self.last_ts {
|
||||
let cur_ts = elem.current_running_time().unwrap();
|
||||
let latency: Duration = (cur_ts - ts).into();
|
||||
let rt = elem.current_running_time().unwrap();
|
||||
let lateness: Duration = (rt - ts).into();
|
||||
let interval: Duration = (ts - last_ts).into();
|
||||
|
||||
if let Some(stats) = self.stats.as_mut() {
|
||||
stats.add_buffer(latency, interval);
|
||||
stats.add_buffer(lateness, interval);
|
||||
}
|
||||
|
||||
debug_or_trace!(
|
||||
CAT,
|
||||
self.is_main_elem,
|
||||
obj = elem,
|
||||
"o latency {latency:.2?}"
|
||||
"o lateness {lateness:.2?}"
|
||||
);
|
||||
debug_or_trace!(
|
||||
CAT,
|
||||
|
|
|
@ -16,14 +16,14 @@ pub struct Stats {
|
|||
max_buffers: Option<f32>,
|
||||
buffer_count: f32,
|
||||
buffer_count_delta: f32,
|
||||
latency_sum: f32,
|
||||
latency_square_sum: f32,
|
||||
latency_sum_delta: f32,
|
||||
latency_square_sum_delta: f32,
|
||||
latency_min: Duration,
|
||||
latency_min_delta: Duration,
|
||||
latency_max: Duration,
|
||||
latency_max_delta: Duration,
|
||||
lateness_sum: f32,
|
||||
lateness_square_sum: f32,
|
||||
lateness_sum_delta: f32,
|
||||
lateness_square_sum_delta: f32,
|
||||
lateness_min: Duration,
|
||||
lateness_min_delta: Duration,
|
||||
lateness_max: Duration,
|
||||
lateness_max_delta: Duration,
|
||||
interval_sum: f32,
|
||||
interval_square_sum: f32,
|
||||
interval_sum_delta: f32,
|
||||
|
@ -51,14 +51,14 @@ impl Stats {
|
|||
pub fn start(&mut self) {
|
||||
self.buffer_count = 0.0;
|
||||
self.buffer_count_delta = 0.0;
|
||||
self.latency_sum = 0.0;
|
||||
self.latency_square_sum = 0.0;
|
||||
self.latency_sum_delta = 0.0;
|
||||
self.latency_square_sum_delta = 0.0;
|
||||
self.latency_min = Duration::MAX;
|
||||
self.latency_min_delta = Duration::MAX;
|
||||
self.latency_max = Duration::ZERO;
|
||||
self.latency_max_delta = Duration::ZERO;
|
||||
self.lateness_sum = 0.0;
|
||||
self.lateness_square_sum = 0.0;
|
||||
self.lateness_sum_delta = 0.0;
|
||||
self.lateness_square_sum_delta = 0.0;
|
||||
self.lateness_min = Duration::MAX;
|
||||
self.lateness_min_delta = Duration::MAX;
|
||||
self.lateness_max = Duration::ZERO;
|
||||
self.lateness_max_delta = Duration::ZERO;
|
||||
self.interval_sum = 0.0;
|
||||
self.interval_square_sum = 0.0;
|
||||
self.interval_sum_delta = 0.0;
|
||||
|
@ -105,7 +105,7 @@ impl Stats {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn add_buffer(&mut self, latency: Duration, interval: Duration) {
|
||||
pub fn add_buffer(&mut self, lateness: Duration, interval: Duration) {
|
||||
if !self.is_active() {
|
||||
return;
|
||||
}
|
||||
|
@ -113,19 +113,19 @@ impl Stats {
|
|||
self.buffer_count += 1.0;
|
||||
self.buffer_count_delta += 1.0;
|
||||
|
||||
// Latency
|
||||
let latency_f32 = latency.as_nanos() as f32;
|
||||
let latency_square = latency_f32.powi(2);
|
||||
// Lateness
|
||||
let lateness_f32 = lateness.as_nanos() as f32;
|
||||
let lateness_square = lateness_f32.powi(2);
|
||||
|
||||
self.latency_sum += latency_f32;
|
||||
self.latency_square_sum += latency_square;
|
||||
self.latency_min = self.latency_min.min(latency);
|
||||
self.latency_max = self.latency_max.max(latency);
|
||||
self.lateness_sum += lateness_f32;
|
||||
self.lateness_square_sum += lateness_square;
|
||||
self.lateness_min = self.lateness_min.min(lateness);
|
||||
self.lateness_max = self.lateness_max.max(lateness);
|
||||
|
||||
self.latency_sum_delta += latency_f32;
|
||||
self.latency_square_sum_delta += latency_square;
|
||||
self.latency_min_delta = self.latency_min_delta.min(latency);
|
||||
self.latency_max_delta = self.latency_max_delta.max(latency);
|
||||
self.lateness_sum_delta += lateness_f32;
|
||||
self.lateness_square_sum_delta += lateness_square;
|
||||
self.lateness_min_delta = self.lateness_min_delta.min(lateness);
|
||||
self.lateness_max_delta = self.lateness_max_delta.max(lateness);
|
||||
|
||||
// Interval
|
||||
let interval_f32 = interval.as_nanos() as f32;
|
||||
|
@ -186,24 +186,24 @@ impl Stats {
|
|||
self.interval_max_delta = Duration::ZERO;
|
||||
self.interval_late_count_delta = 0.0;
|
||||
|
||||
let latency_mean = self.latency_sum_delta / self.buffer_count_delta;
|
||||
let latency_std_dev = f32::sqrt(
|
||||
self.latency_square_sum_delta / self.buffer_count_delta - latency_mean.powi(2),
|
||||
let lateness_mean = self.lateness_sum_delta / self.buffer_count_delta;
|
||||
let lateness_std_dev = f32::sqrt(
|
||||
self.lateness_square_sum_delta / self.buffer_count_delta - lateness_mean.powi(2),
|
||||
);
|
||||
|
||||
gst::info!(
|
||||
CAT,
|
||||
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
|
||||
Duration::from_nanos(latency_mean as u64),
|
||||
Duration::from_nanos(latency_std_dev as u64),
|
||||
self.latency_min_delta,
|
||||
self.latency_max_delta,
|
||||
"o lateness: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
|
||||
Duration::from_nanos(lateness_mean as u64),
|
||||
Duration::from_nanos(lateness_std_dev as u64),
|
||||
self.lateness_min_delta,
|
||||
self.lateness_max_delta,
|
||||
);
|
||||
|
||||
self.latency_sum_delta = 0.0;
|
||||
self.latency_square_sum_delta = 0.0;
|
||||
self.latency_min_delta = Duration::MAX;
|
||||
self.latency_max_delta = Duration::ZERO;
|
||||
self.lateness_sum_delta = 0.0;
|
||||
self.lateness_square_sum_delta = 0.0;
|
||||
self.lateness_min_delta = Duration::MAX;
|
||||
self.lateness_max_delta = Duration::ZERO;
|
||||
|
||||
self.buffer_count_delta = 0.0;
|
||||
}
|
||||
|
@ -254,17 +254,17 @@ impl Stats {
|
|||
);
|
||||
}
|
||||
|
||||
let latency_mean = self.latency_sum / self.buffer_count;
|
||||
let latency_std_dev =
|
||||
f32::sqrt(self.latency_square_sum / self.buffer_count - latency_mean.powi(2));
|
||||
let lateness_mean = self.lateness_sum / self.buffer_count;
|
||||
let lateness_std_dev =
|
||||
f32::sqrt(self.lateness_square_sum / self.buffer_count - lateness_mean.powi(2));
|
||||
|
||||
gst::info!(
|
||||
CAT,
|
||||
"o latency: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
|
||||
Duration::from_nanos(latency_mean as u64),
|
||||
Duration::from_nanos(latency_std_dev as u64),
|
||||
self.latency_min,
|
||||
self.latency_max,
|
||||
"o lateness: mean {:4.2?} σ {:4.1?} [{:4.1?}, {:4.1?}]",
|
||||
Duration::from_nanos(lateness_mean as u64),
|
||||
Duration::from_nanos(lateness_std_dev as u64),
|
||||
self.lateness_min,
|
||||
self.lateness_max,
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,19 +56,19 @@ impl PadSinkHandlerInner {
|
|||
.expect("ts before Segment start");
|
||||
|
||||
if let Some(last_ts) = self.last_ts {
|
||||
let cur_ts = elem.current_running_time().unwrap();
|
||||
let latency: Duration = (cur_ts - ts).into();
|
||||
let rt = elem.current_running_time().unwrap();
|
||||
let lateness: Duration = (rt - ts).into();
|
||||
let interval: Duration = (ts - last_ts).into();
|
||||
|
||||
if let Some(stats) = self.stats.as_mut() {
|
||||
stats.add_buffer(latency, interval);
|
||||
stats.add_buffer(lateness, interval);
|
||||
}
|
||||
|
||||
debug_or_trace!(
|
||||
CAT,
|
||||
self.is_main_elem,
|
||||
obj = elem,
|
||||
"o latency {latency:.2?}"
|
||||
"o lateness {lateness:.2?}"
|
||||
);
|
||||
debug_or_trace!(
|
||||
CAT,
|
||||
|
|
|
@ -169,19 +169,19 @@ impl TaskImpl for TaskSinkTask {
|
|||
.expect("dts before Segment start");
|
||||
|
||||
if let Some(last_ts) = self.last_ts {
|
||||
let cur_ts = self.elem.current_running_time().unwrap();
|
||||
let latency: Duration = (cur_ts - ts).into();
|
||||
let rt = self.elem.current_running_time().unwrap();
|
||||
let lateness: Duration = (rt - ts).into();
|
||||
let interval: Duration = (ts - last_ts).into();
|
||||
|
||||
if let Some(stats) = self.stats.as_mut() {
|
||||
stats.add_buffer(latency, interval);
|
||||
stats.add_buffer(lateness, interval);
|
||||
}
|
||||
|
||||
debug_or_trace!(
|
||||
CAT,
|
||||
self.is_main_elem,
|
||||
obj = self.elem,
|
||||
"o latency {latency:.2?}",
|
||||
"o lateness {lateness:.2?}",
|
||||
);
|
||||
debug_or_trace!(
|
||||
CAT,
|
||||
|
|
|
@ -74,10 +74,11 @@
|
|||
* // started at the same time. However, an application that dynamically
|
||||
* // generates pipelines must ensure that all the pipelines that will be
|
||||
* // connected together share the same base time.
|
||||
* pipe_up.set_base_time(gst::ClockTime::ZERO);
|
||||
* let base_time = clock.time();
|
||||
* pipe_up.set_start_time(gst::ClockTime::NONE);
|
||||
* pipe_down.set_base_time(gst::ClockTime::ZERO);
|
||||
* pipe_up.set_base_time(base_time);
|
||||
* pipe_down.set_start_time(gst::ClockTime::NONE);
|
||||
* pipe_down.set_base_time(base_time);
|
||||
*
|
||||
* pipe_up.set_state(gst::State::Playing).unwrap();
|
||||
* pipe_down.set_state(gst::State::Playing).unwrap();
|
||||
|
|
Loading…
Reference in a new issue