diff --git a/generic/threadshare/examples/benchmark.rs b/generic/threadshare/examples/benchmark.rs index cdaceeb9a..beed48a10 100644 --- a/generic/threadshare/examples/benchmark.rs +++ b/generic/threadshare/examples/benchmark.rs @@ -19,6 +19,12 @@ use glib::prelude::*; use gst::prelude::*; use std::env; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::Arc; +use std::thread; +use std::time::{Duration, Instant}; + +const THROUGHPUT_PERIOD: Duration = Duration::from_secs(20); fn main() { gst::init().unwrap(); @@ -55,6 +61,7 @@ fn main() { let l = glib::MainLoop::new(None, false); let pipeline = gst::Pipeline::new(None); + let counter = Arc::new(AtomicU64::new(0)); for i in 0..n_streams { let sink = @@ -62,6 +69,15 @@ fn main() { sink.set_property("sync", &false).unwrap(); sink.set_property("async", &false).unwrap(); + let counter_clone = Arc::clone(&counter); + sink.get_static_pad("sink").unwrap().add_probe( + gst::PadProbeType::BUFFER, + move |_pad, _probe_info| { + let _ = counter_clone.fetch_add(1, Ordering::SeqCst); + gst::PadProbeReturn::Ok + }, + ); + let source = match source.as_str() { "udpsrc" => { let source = @@ -90,12 +106,24 @@ fn main() { source } + "tcpclientsrc" => { + let source = gst::ElementFactory::make( + "tcpclientsrc", + Some(format!("source-{}", i).as_str()), + ) + .unwrap(); + source.set_property("host", &"127.0.0.1").unwrap(); + source.set_property("port", &(40000i32)).unwrap(); + + source + } "ts-tcpclientsrc" => { let source = gst::ElementFactory::make( "ts-tcpclientsrc", Some(format!("source-{}", i).as_str()), ) .unwrap(); + source.set_property("address", &"127.0.0.1").unwrap(); source.set_property("port", &(40000u32)).unwrap(); source .set_property("context", &format!("context-{}", (i as u32) % n_groups)) @@ -166,5 +194,31 @@ fn main() { println!("started"); + thread::spawn(move || { + let throughput_factor = 1_000f32 / (n_streams as f32); + let mut prev_reset_instant: Option = None; + let mut count; + let mut reset_instant; + + loop { + count = counter.fetch_and(0, Ordering::SeqCst); + reset_instant = Instant::now(); + + if let Some(prev_reset_instant) = prev_reset_instant { + println!( + "{:>5.1} / s / stream", + (count as f32) * throughput_factor + / ((reset_instant - prev_reset_instant).as_millis() as f32) + ); + } + + if let Some(sleep_duration) = THROUGHPUT_PERIOD.checked_sub(reset_instant.elapsed()) { + thread::sleep(sleep_duration); + } + + prev_reset_instant = Some(reset_instant); + } + }); + l.run(); }