ts/examples: add rtp mode with jitter-buffer & trace stop duration

This commit is contained in:
François Laignel 2022-03-21 12:25:48 +01:00 committed by Sebastian Dröge
parent ab96219c19
commit 97985d6442
3 changed files with 194 additions and 35 deletions

View file

@ -19,6 +19,7 @@
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
use once_cell::sync::Lazy;
use std::env; use std::env;
use std::sync::atomic::{AtomicU64, Ordering}; use std::sync::atomic::{AtomicU64, Ordering};
@ -28,6 +29,14 @@ use std::time::{Duration, Instant};
const THROUGHPUT_PERIOD: Duration = Duration::from_secs(20); const THROUGHPUT_PERIOD: Duration = Duration::from_secs(20);
pub static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"ts-benchmark",
gst::DebugColorFlags::empty(),
Some("Thread-sharing benchmarking receiver"),
)
});
fn main() { fn main() {
gst::init().unwrap(); gst::init().unwrap();
@ -55,32 +64,49 @@ fn main() {
} }
let args = env::args().collect::<Vec<_>>(); let args = env::args().collect::<Vec<_>>();
assert_eq!(args.len(), 6); assert!(args.len() > 4);
let n_streams: u16 = args[1].parse().unwrap(); let n_streams: u16 = args[1].parse().unwrap();
let source = &args[2]; let source = &args[2];
let n_groups: u32 = args[3].parse().unwrap(); let n_groups: u32 = args[3].parse().unwrap();
let wait: u32 = args[4].parse().unwrap(); let wait: u32 = args[4].parse().unwrap();
// Nb buffers to await before stopping.
let max_buffers: Option<u64> = if args.len() > 5 {
args[5].parse().ok()
} else {
None
};
let is_rtp = args.len() > 6 && (args[6] == "rtp");
let rtp_caps = gst::Caps::builder("audio/x-rtp")
.field("media", "audio")
.field("payload", 8i32)
.field("clock-rate", 8000)
.field("encoding-name", "PCMA")
.build();
let l = glib::MainLoop::new(None, false); let l = glib::MainLoop::new(None, false);
let pipeline = gst::Pipeline::new(None); let pipeline = gst::Pipeline::new(None);
let counter = Arc::new(AtomicU64::new(0)); let counter = Arc::new(AtomicU64::new(0));
for i in 0..n_streams { for i in 0..n_streams {
let build_context = || format!("context-{}", (i as u32) % n_groups);
let sink = let sink =
gst::ElementFactory::make("fakesink", Some(format!("sink-{}", i).as_str())).unwrap(); gst::ElementFactory::make("fakesink", Some(format!("sink-{}", i).as_str())).unwrap();
sink.set_property("sync", false); sink.set_property("sync", false);
sink.set_property("async", false); sink.set_property("async", false);
sink.set_property("signal-handoffs", true);
let counter_clone = Arc::clone(&counter); sink.connect(
sink.static_pad("sink").unwrap().add_probe( "handoff",
gst::PadProbeType::BUFFER, true,
move |_pad, _probe_info| { glib::clone!(@strong counter => move |_| {
let _ = counter_clone.fetch_add(1, Ordering::SeqCst); let _ = counter.fetch_add(1, Ordering::SeqCst);
gst::PadProbeReturn::Ok None
}, }),
); );
let source = match source.as_str() { let (source, context) = match source.as_str() {
"udpsrc" => { "udpsrc" => {
let source = let source =
gst::ElementFactory::make("udpsrc", Some(format!("source-{}", i).as_str())) gst::ElementFactory::make("udpsrc", Some(format!("source-{}", i).as_str()))
@ -88,17 +114,22 @@ fn main() {
source.set_property("port", 40000i32 + i as i32); source.set_property("port", 40000i32 + i as i32);
source.set_property("retrieve-sender-address", false); source.set_property("retrieve-sender-address", false);
source (source, None)
} }
"ts-udpsrc" => { "ts-udpsrc" => {
let context = build_context();
let source = let source =
gst::ElementFactory::make("ts-udpsrc", Some(format!("source-{}", i).as_str())) gst::ElementFactory::make("ts-udpsrc", Some(format!("source-{}", i).as_str()))
.unwrap(); .unwrap();
source.set_property("port", 40000i32 + i as i32); source.set_property("port", 40000i32 + i as i32);
source.set_property("context", format!("context-{}", (i as u32) % n_groups)); source.set_property("context", &context);
source.set_property("context-wait", wait); source.set_property("context-wait", wait);
source if is_rtp {
source.set_property("caps", &rtp_caps);
}
(source, Some(context))
} }
"tcpclientsrc" => { "tcpclientsrc" => {
let source = gst::ElementFactory::make( let source = gst::ElementFactory::make(
@ -109,9 +140,10 @@ fn main() {
source.set_property("host", "127.0.0.1"); source.set_property("host", "127.0.0.1");
source.set_property("port", 40000i32); source.set_property("port", 40000i32);
source (source, None)
} }
"ts-tcpclientsrc" => { "ts-tcpclientsrc" => {
let context = build_context();
let source = gst::ElementFactory::make( let source = gst::ElementFactory::make(
"ts-tcpclientsrc", "ts-tcpclientsrc",
Some(format!("source-{}", i).as_str()), Some(format!("source-{}", i).as_str()),
@ -119,10 +151,10 @@ fn main() {
.unwrap(); .unwrap();
source.set_property("host", "127.0.0.1"); source.set_property("host", "127.0.0.1");
source.set_property("port", 40000i32); source.set_property("port", 40000i32);
source.set_property("context", format!("context-{}", (i as u32) % n_groups)); source.set_property("context", &context);
source.set_property("context-wait", wait); source.set_property("context-wait", wait);
source (source, Some(context))
} }
"tonegeneratesrc" => { "tonegeneratesrc" => {
let source = gst::ElementFactory::make( let source = gst::ElementFactory::make(
@ -134,23 +166,51 @@ fn main() {
sink.set_property("sync", true); sink.set_property("sync", true);
source (source, None)
} }
"ts-tonesrc" => { "ts-tonesrc" => {
let context = build_context();
let source = let source =
gst::ElementFactory::make("ts-tonesrc", Some(format!("source-{}", i).as_str())) gst::ElementFactory::make("ts-tonesrc", Some(format!("source-{}", i).as_str()))
.unwrap(); .unwrap();
source.set_property("samples-per-buffer", (wait as u32) * 8000 / 1000); source.set_property("samples-per-buffer", (wait as u32) * 8000 / 1000);
source.set_property("context", format!("context-{}", (i as u32) % n_groups)); source.set_property("context", &context);
source.set_property("context-wait", wait); source.set_property("context-wait", wait);
source (source, Some(context))
} }
_ => unimplemented!(), _ => unimplemented!(),
}; };
pipeline.add_many(&[&source, &sink]).unwrap(); if is_rtp {
source.link(&sink).unwrap(); let jb =
gst::ElementFactory::make("ts-jitterbuffer", Some(format!("jb-{}", i).as_str()))
.unwrap();
if let Some(context) = context {
jb.set_property("context", &context);
}
jb.set_property("context-wait", wait);
jb.set_property("latency", wait);
let elements = &[&source, &jb, &sink];
pipeline.add_many(elements).unwrap();
gst::Element::link_many(elements).unwrap();
} else {
let queue = if let Some(context) = context {
let queue =
gst::ElementFactory::make("ts-queue", Some(format!("queue-{}", i).as_str()))
.unwrap();
queue.set_property("context", &context);
queue.set_property("context-wait", wait);
queue
} else {
gst::ElementFactory::make("queue2", Some(format!("queue-{}", i).as_str())).unwrap()
};
let elements = &[&source, &queue, &sink];
pipeline.add_many(elements).unwrap();
gst::Element::link_many(elements).unwrap();
}
} }
let bus = pipeline.bus().unwrap(); let bus = pipeline.bus().unwrap();
@ -161,7 +221,8 @@ fn main() {
match msg.view() { match msg.view() {
MessageView::Eos(..) => l_clone.quit(), MessageView::Eos(..) => l_clone.quit(),
MessageView::Error(err) => { MessageView::Error(err) => {
println!( gst::error!(
CAT,
"Error from {:?}: {} ({:?})", "Error from {:?}: {} ({:?})",
err.src().map(|s| s.path_string()), err.src().map(|s| s.path_string()),
err.error(), err.error(),
@ -178,8 +239,9 @@ fn main() {
pipeline.set_state(gst::State::Playing).unwrap(); pipeline.set_state(gst::State::Playing).unwrap();
println!("started"); gst::info!(CAT, "started");
let l_clone = l.clone();
thread::spawn(move || { thread::spawn(move || {
let throughput_factor = 1_000f32 / (n_streams as f32); let throughput_factor = 1_000f32 / (n_streams as f32);
let mut prev_reset_instant: Option<Instant> = None; let mut prev_reset_instant: Option<Instant> = None;
@ -188,10 +250,24 @@ fn main() {
loop { loop {
count = counter.fetch_and(0, Ordering::SeqCst); count = counter.fetch_and(0, Ordering::SeqCst);
if let Some(max_buffers) = max_buffers {
if count > max_buffers {
gst::info!(CAT, "Stopping");
let stopping_instant = Instant::now();
pipeline.set_state(gst::State::Ready).unwrap();
gst::info!(CAT, "Stopped. Took {:?}", stopping_instant.elapsed());
pipeline.set_state(gst::State::Null).unwrap();
gst::info!(CAT, "Unprepared");
l_clone.quit();
break;
}
}
reset_instant = Instant::now(); reset_instant = Instant::now();
if let Some(prev_reset_instant) = prev_reset_instant { if let Some(prev_reset_instant) = prev_reset_instant {
println!( gst::info!(
CAT,
"{:>6.2} / s / stream", "{:>6.2} / s / stream",
(count as f32) * throughput_factor (count as f32) * throughput_factor
/ ((reset_instant - prev_reset_instant).as_millis() as f32) / ((reset_instant - prev_reset_instant).as_millis() as f32)

View file

@ -23,9 +23,17 @@ use std::{env, thread, time};
fn main() { fn main() {
let args = env::args().collect::<Vec<_>>(); let args = env::args().collect::<Vec<_>>();
assert_eq!(args.len(), 2); assert!(args.len() > 1);
let n_streams: u16 = args[1].parse().unwrap(); let n_streams: u16 = args[1].parse().unwrap();
if args.len() > 2 && args[2] == "rtp" {
send_rtp_buffers(n_streams);
} else {
send_raw_buffers(n_streams);
}
}
fn send_raw_buffers(n_streams: u16) {
let buffer = [0; 160]; let buffer = [0; 160];
let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap(); let socket = net::UdpSocket::bind("0.0.0.0:0").unwrap();
@ -51,3 +59,60 @@ fn main() {
} }
} }
} }
fn send_rtp_buffers(n_streams: u16) {
use gst::glib;
use gst::prelude::*;
gst::init().unwrap();
#[cfg(debug_assertions)]
{
use std::path::Path;
let mut path = Path::new("target/debug");
if !path.exists() {
path = Path::new("../../target/debug");
}
gst::Registry::get().scan_path(path);
}
#[cfg(not(debug_assertions))]
{
use std::path::Path;
let mut path = Path::new("target/release");
if !path.exists() {
path = Path::new("../../target/release");
}
gst::Registry::get().scan_path(path);
}
let l = glib::MainLoop::new(None, false);
let pipeline = gst::Pipeline::new(None);
for i in 0..n_streams {
let src =
gst::ElementFactory::make("audiotestsrc", Some(format!("audiotestsrc-{}", i).as_str()))
.unwrap();
src.set_property("is-live", true);
let enc =
gst::ElementFactory::make("alawenc", Some(format!("alawenc-{}", i).as_str())).unwrap();
let pay =
gst::ElementFactory::make("rtppcmapay", Some(format!("rtppcmapay-{}", i).as_str()))
.unwrap();
let sink = gst::ElementFactory::make("ts-udpsink", Some(format!("udpsink-{}", i).as_str()))
.unwrap();
sink.set_property("clients", format!("127.0.0.1:{}", i + 40000));
sink.set_property("context", "context-udpsink");
sink.set_property("context-wait", 20u32);
let elements = &[&src, &enc, &pay, &sink];
pipeline.add_many(elements).unwrap();
gst::Element::link_many(elements).unwrap();
}
pipeline.set_state(gst::State::Playing).unwrap();
l.run();
}

View file

@ -180,20 +180,32 @@ impl SinkHandler {
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
let s = caps.structure(0).ok_or(gst::FlowError::Error)?; let s = caps.structure(0).ok_or(gst::FlowError::Error)?;
gst::info!(CAT, obj: element, "Parsing {:?}", caps); gst::debug!(CAT, obj: element, "Parsing {:?}", caps);
let payload = s.get::<i32>("payload").map_err(|_| gst::FlowError::Error)?; let payload = s.get::<i32>("payload").map_err(|err| {
gst::debug!(CAT, obj: element, "Caps 'payload': {}", err);
gst::FlowError::Error
})?;
if pt != 0 && payload as u8 != pt { if pt != 0 && payload as u8 != pt {
gst::debug!(
CAT,
obj: element,
"Caps 'payload' ({}) doesn't match payload type ({})",
payload,
pt
);
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
} }
inner.last_pt = Some(pt); inner.last_pt = Some(pt);
let clock_rate = s let clock_rate = s.get::<i32>("clock-rate").map_err(|err| {
.get::<i32>("clock-rate") gst::debug!(CAT, obj: element, "Caps 'clock-rate': {}", err);
.map_err(|_| gst::FlowError::Error)?; gst::FlowError::Error
})?;
if clock_rate <= 0 { if clock_rate <= 0 {
gst::debug!(CAT, obj: element, "Caps 'clock-rate' <= 0");
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
} }
state.clock_rate = Some(clock_rate as u32); state.clock_rate = Some(clock_rate as u32);
@ -371,8 +383,14 @@ impl SinkHandler {
drop(state); drop(state);
let caps = element let caps = element
.try_emit_by_name::<Option<gst::Caps>>("request-pt-map", &[&(pt as u32)]) .try_emit_by_name::<Option<gst::Caps>>("request-pt-map", &[&(pt as u32)])
.map_err(|_| gst::FlowError::Error)? .map_err(|err| {
.ok_or(gst::FlowError::Error)?; gst::error!(CAT, obj: pad, "Emitting 'request-pt-map': {}", err);
gst::FlowError::Error
})?
.ok_or_else(|| {
gst::error!(CAT, obj: pad, "Signal 'request-pt-map' retuned None");
gst::FlowError::Error
})?;
let mut state = jb.state.lock().unwrap(); let mut state = jb.state.lock().unwrap();
self.parse_caps(inner, &mut state, element, &caps, pt)?; self.parse_caps(inner, &mut state, element, &caps, pt)?;
state state
@ -1248,7 +1266,7 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
impl JitterBuffer { impl JitterBuffer {
fn clear_pt_map(&self, element: &super::JitterBuffer) { fn clear_pt_map(&self, element: &super::JitterBuffer) {
gst::info!(CAT, obj: element, "Clearing PT map"); gst::debug!(CAT, obj: element, "Clearing PT map");
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
state.clock_rate = None; state.clock_rate = None;
@ -1256,7 +1274,7 @@ impl JitterBuffer {
} }
fn prepare(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> { fn prepare(&self, element: &super::JitterBuffer) -> Result<(), gst::ErrorMessage> {
gst::info!(CAT, obj: element, "Preparing"); gst::debug!(CAT, obj: element, "Preparing");
let context = { let context = {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
@ -1275,7 +1293,7 @@ impl JitterBuffer {
) )
})?; })?;
gst::info!(CAT, obj: element, "Prepared"); gst::debug!(CAT, obj: element, "Prepared");
Ok(()) Ok(())
} }