diff --git a/generic/threadshare/examples/benchmark.rs b/generic/threadshare/examples/benchmark.rs index 33cc3a42..0ecff74e 100644 --- a/generic/threadshare/examples/benchmark.rs +++ b/generic/threadshare/examples/benchmark.rs @@ -19,6 +19,8 @@ use gst::glib; use gst::prelude::*; +use gst::{gst_error, gst_info}; +use once_cell::sync::Lazy; use std::env; use std::sync::atomic::{AtomicU64, Ordering}; @@ -28,6 +30,14 @@ use std::time::{Duration, Instant}; const THROUGHPUT_PERIOD: Duration = Duration::from_secs(20); +pub static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "ts-benchmark", + gst::DebugColorFlags::empty(), + Some("Thread-sharing benchmarking receiver"), + ) +}); + fn main() { gst::init().unwrap(); @@ -55,32 +65,49 @@ fn main() { } let args = env::args().collect::>(); - assert_eq!(args.len(), 6); + assert!(args.len() > 4); let n_streams: u16 = args[1].parse().unwrap(); let source = &args[2]; let n_groups: u32 = args[3].parse().unwrap(); let wait: u32 = args[4].parse().unwrap(); + // Nb buffers to await before stopping. + let max_buffers: Option = if args.len() > 5 { + args[5].parse().ok() + } else { + None + }; + let is_rtp = args.len() > 6 && (args[6] == "rtp"); + + let rtp_caps = gst::Caps::builder("audio/x-rtp") + .field("media", "audio") + .field("payload", 8i32) + .field("clock-rate", 8000) + .field("encoding-name", "PCMA") + .build(); + let l = glib::MainLoop::new(None, false); let pipeline = gst::Pipeline::new(None); let counter = Arc::new(AtomicU64::new(0)); for i in 0..n_streams { + let build_context = || format!("context-{}", (i as u32) % n_groups); + let sink = gst::ElementFactory::make("fakesink", Some(format!("sink-{}", i).as_str())).unwrap(); sink.set_property("sync", false); sink.set_property("async", false); - - let counter_clone = Arc::clone(&counter); - sink.static_pad("sink").unwrap().add_probe( - gst::PadProbeType::BUFFER, - move |_pad, _probe_info| { - let _ = counter_clone.fetch_add(1, Ordering::SeqCst); - gst::PadProbeReturn::Ok - }, + sink.set_property("signal-handoffs", true); + sink.connect( + "handoff", + true, + glib::clone!(@strong counter => move |_| { + let _ = counter.fetch_add(1, Ordering::SeqCst); + None + }), ); - let source = match source.as_str() { + let (source, context) = match source.as_str() { "udpsrc" => { let source = gst::ElementFactory::make("udpsrc", Some(format!("source-{}", i).as_str())) @@ -88,17 +115,22 @@ fn main() { source.set_property("port", 40000i32 + i as i32); source.set_property("retrieve-sender-address", false); - source + (source, None) } "ts-udpsrc" => { + let context = build_context(); let source = gst::ElementFactory::make("ts-udpsrc", Some(format!("source-{}", i).as_str())) .unwrap(); source.set_property("port", 40000i32 + i as i32); - source.set_property("context", format!("context-{}", (i as u32) % n_groups)); + source.set_property("context", &context); source.set_property("context-wait", wait); - source + if is_rtp { + source.set_property("caps", &rtp_caps); + } + + (source, Some(context)) } "tcpclientsrc" => { let source = gst::ElementFactory::make( @@ -109,9 +141,10 @@ fn main() { source.set_property("host", "127.0.0.1"); source.set_property("port", 40000i32); - source + (source, None) } "ts-tcpclientsrc" => { + let context = build_context(); let source = gst::ElementFactory::make( "ts-tcpclientsrc", Some(format!("source-{}", i).as_str()), @@ -119,10 +152,10 @@ fn main() { .unwrap(); source.set_property("host", "127.0.0.1"); source.set_property("port", 40000i32); - source.set_property("context", format!("context-{}", (i as u32) % n_groups)); + source.set_property("context", &context); source.set_property("context-wait", wait); - source + (source, Some(context)) } "tonegeneratesrc" => { let source = gst::ElementFactory::make( @@ -134,23 +167,51 @@ fn main() { sink.set_property("sync", true); - source + (source, None) } "ts-tonesrc" => { + let context = build_context(); let source = gst::ElementFactory::make("ts-tonesrc", Some(format!("source-{}", i).as_str())) .unwrap(); source.set_property("samples-per-buffer", (wait as u32) * 8000 / 1000); - source.set_property("context", format!("context-{}", (i as u32) % n_groups)); + source.set_property("context", &context); source.set_property("context-wait", wait); - source + (source, Some(context)) } _ => unimplemented!(), }; - pipeline.add_many(&[&source, &sink]).unwrap(); - source.link(&sink).unwrap(); + if is_rtp { + let jb = + gst::ElementFactory::make("ts-jitterbuffer", Some(format!("jb-{}", i).as_str())) + .unwrap(); + if let Some(context) = context { + jb.set_property("context", &context); + } + jb.set_property("context-wait", wait); + jb.set_property("latency", wait); + + let elements = &[&source, &jb, &sink]; + pipeline.add_many(elements).unwrap(); + gst::Element::link_many(elements).unwrap(); + } else { + let queue = if let Some(context) = context { + let queue = + gst::ElementFactory::make("ts-queue", Some(format!("queue-{}", i).as_str())) + .unwrap(); + queue.set_property("context", &context); + queue.set_property("context-wait", wait); + queue + } else { + gst::ElementFactory::make("queue2", Some(format!("queue-{}", i).as_str())).unwrap() + }; + + let elements = &[&source, &queue, &sink]; + pipeline.add_many(elements).unwrap(); + gst::Element::link_many(elements).unwrap(); + } } let bus = pipeline.bus().unwrap(); @@ -161,7 +222,8 @@ fn main() { match msg.view() { MessageView::Eos(..) => l_clone.quit(), MessageView::Error(err) => { - println!( + gst_error!( + CAT, "Error from {:?}: {} ({:?})", err.src().map(|s| s.path_string()), err.error(), @@ -178,8 +240,9 @@ fn main() { pipeline.set_state(gst::State::Playing).unwrap(); - println!("started"); + gst_info!(CAT, "started"); + let l_clone = l.clone(); thread::spawn(move || { let throughput_factor = 1_000f32 / (n_streams as f32); let mut prev_reset_instant: Option = None; @@ -188,10 +251,24 @@ fn main() { loop { count = counter.fetch_and(0, Ordering::SeqCst); + if let Some(max_buffers) = max_buffers { + if count > max_buffers { + gst_info!(CAT, "Stopping"); + let stopping_instant = Instant::now(); + pipeline.set_state(gst::State::Ready).unwrap(); + gst_info!(CAT, "Stopped. Took {:?}", stopping_instant.elapsed()); + pipeline.set_state(gst::State::Null).unwrap(); + gst_info!(CAT, "Unprepared"); + l_clone.quit(); + break; + } + } + reset_instant = Instant::now(); if let Some(prev_reset_instant) = prev_reset_instant { - println!( + gst_info!( + CAT, "{:>6.2} / s / stream", (count as f32) * throughput_factor / ((reset_instant - prev_reset_instant).as_millis() as f32) diff --git a/generic/threadshare/examples/udpsrc_benchmark_sender.rs b/generic/threadshare/examples/udpsrc_benchmark_sender.rs index 2377f698..995bc23f 100644 --- a/generic/threadshare/examples/udpsrc_benchmark_sender.rs +++ b/generic/threadshare/examples/udpsrc_benchmark_sender.rs @@ -23,9 +23,17 @@ use std::{env, thread, time}; fn main() { let args = env::args().collect::>(); - assert_eq!(args.len(), 2); + assert!(args.len() > 1); let n_streams: u16 = args[1].parse().unwrap(); + if args.len() > 2 && args[2] == "rtp" { + send_rtp_buffers(n_streams); + } else { + send_raw_buffers(n_streams); + } +} + +fn send_raw_buffers(n_streams: u16) { let buffer = [0; 160]; let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap(); @@ -51,3 +59,60 @@ fn main() { } } } + +fn send_rtp_buffers(n_streams: u16) { + use gst::glib; + use gst::prelude::*; + + gst::init().unwrap(); + + #[cfg(debug_assertions)] + { + use std::path::Path; + + let mut path = Path::new("target/debug"); + if !path.exists() { + path = Path::new("../../target/debug"); + } + + gst::Registry::get().scan_path(path); + } + #[cfg(not(debug_assertions))] + { + use std::path::Path; + + let mut path = Path::new("target/release"); + if !path.exists() { + path = Path::new("../../target/release"); + } + + gst::Registry::get().scan_path(path); + } + + let l = glib::MainLoop::new(None, false); + let pipeline = gst::Pipeline::new(None); + for i in 0..n_streams { + let src = + gst::ElementFactory::make("audiotestsrc", Some(format!("audiotestsrc-{}", i).as_str())) + .unwrap(); + src.set_property("is-live", true); + + let enc = + gst::ElementFactory::make("alawenc", Some(format!("alawenc-{}", i).as_str())).unwrap(); + let pay = + gst::ElementFactory::make("rtppcmapay", Some(format!("rtppcmapay-{}", i).as_str())) + .unwrap(); + let sink = gst::ElementFactory::make("ts-udpsink", Some(format!("udpsink-{}", i).as_str())) + .unwrap(); + sink.set_property("clients", format!("127.0.0.1:{}", i + 40000)); + sink.set_property("context", "context-udpsink"); + sink.set_property("context-wait", 20u32); + + let elements = &[&src, &enc, &pay, &sink]; + pipeline.add_many(elements).unwrap(); + gst::Element::link_many(elements).unwrap(); + } + + pipeline.set_state(gst::State::Playing).unwrap(); + l.run(); +} diff --git a/generic/threadshare/src/jitterbuffer/imp.rs b/generic/threadshare/src/jitterbuffer/imp.rs index 084d1f5d..f7cfd265 100644 --- a/generic/threadshare/src/jitterbuffer/imp.rs +++ b/generic/threadshare/src/jitterbuffer/imp.rs @@ -181,20 +181,32 @@ impl SinkHandler { ) -> Result { let s = caps.structure(0).ok_or(gst::FlowError::Error)?; - gst_info!(CAT, obj: element, "Parsing {:?}", caps); + gst_debug!(CAT, obj: element, "Parsing {:?}", caps); - let payload = s.get::("payload").map_err(|_| gst::FlowError::Error)?; + let payload = s.get::("payload").map_err(|err| { + gst_debug!(CAT, obj: element, "Caps 'payload': {}", err); + gst::FlowError::Error + })?; if pt != 0 && payload as u8 != pt { + gst_debug!( + CAT, + obj: element, + "Caps 'payload' ({}) doesn't match payload type ({})", + payload, + pt + ); return Err(gst::FlowError::Error); } inner.last_pt = Some(pt); - let clock_rate = s - .get::("clock-rate") - .map_err(|_| gst::FlowError::Error)?; + let clock_rate = s.get::("clock-rate").map_err(|err| { + gst_debug!(CAT, obj: element, "Caps 'clock-rate': {}", err); + gst::FlowError::Error + })?; if clock_rate <= 0 { + gst_debug!(CAT, obj: element, "Caps 'clock-rate' <= 0"); return Err(gst::FlowError::Error); } state.clock_rate = Some(clock_rate as u32); @@ -372,8 +384,14 @@ impl SinkHandler { drop(state); let caps = element .try_emit_by_name::>("request-pt-map", &[&(pt as u32)]) - .map_err(|_| gst::FlowError::Error)? - .ok_or(gst::FlowError::Error)?; + .map_err(|err| { + gst_error!(CAT, obj: pad, "Emitting 'request-pt-map': {}", err); + gst::FlowError::Error + })? + .ok_or_else(|| { + gst_error!(CAT, obj: pad, "Signal 'request-pt-map' retuned None"); + gst::FlowError::Error + })?; let mut state = jb.state.lock().unwrap(); self.parse_caps(inner, &mut state, element, &caps, pt)?; state @@ -1247,7 +1265,7 @@ static CAT: Lazy = Lazy::new(|| { impl JitterBuffer { fn clear_pt_map(&self, element: &super::JitterBuffer) { - gst_info!(CAT, obj: element, "Clearing PT map"); + gst_debug!(CAT, obj: element, "Clearing PT map"); let mut state = self.state.lock().unwrap(); state.clock_rate = None; @@ -1255,7 +1273,7 @@ impl JitterBuffer { } fn prepare(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> { - gst_info!(CAT, obj: element, "Preparing"); + gst_debug!(CAT, obj: element, "Preparing"); let context = { let settings = self.settings.lock().unwrap(); @@ -1274,7 +1292,7 @@ impl JitterBuffer { ) })?; - gst_info!(CAT, obj: element, "Prepared"); + gst_debug!(CAT, obj: element, "Prepared"); Ok(()) }