From 5ada3da580d2a0205739570f687f049e2ce77d7c Mon Sep 17 00:00:00 2001 From: Matthew Waters Date: Sun, 16 Feb 2025 15:16:29 +1100 Subject: [PATCH] tests/rtpbin2: improve performance of receive_list_benchmark Instead of receiving each output buffer through GstHarness individually, process the received buffer lists directly by prociding our own pad and chain/chain_list handlers. Reduces test time locally from 30+s to <2s. Part-of: --- net/rtp/tests/rtpbin2.rs | 80 +++++++++++++++++++++++++++++++--------- 1 file changed, 62 insertions(+), 18 deletions(-) diff --git a/net/rtp/tests/rtpbin2.rs b/net/rtp/tests/rtpbin2.rs index a056ffa5d..903bd830b 100644 --- a/net/rtp/tests/rtpbin2.rs +++ b/net/rtp/tests/rtpbin2.rs @@ -274,6 +274,13 @@ fn test_send_list_benchmark() { } fn receive_init() -> Arc> { + receive_init_with_new_srcpad(|h, srcpad| h.add_element_src_pad(srcpad)) +} + +fn receive_init_with_new_srcpad(new_srcpad: F) -> Arc> +where + F: Fn(&mut gst_check::Harness, &gst::Pad) + Send + Sync + 'static, +{ let id = next_element_counter(); let elem = gst::ElementFactory::make("rtprecv") @@ -291,12 +298,9 @@ fn receive_init() -> Arc> { .element() .unwrap() .connect_pad_added(move |_elem, pad| { - weak_h - .upgrade() - .unwrap() - .lock() - .unwrap() - .add_element_src_pad(pad) + let h = weak_h.upgrade().unwrap(); + let mut h = h.lock().unwrap(); + new_srcpad(&mut h, pad); }); inner.play(); let caps = Caps::builder("application/x-rtp") @@ -529,13 +533,41 @@ fn test_receive_benchmark() { ); } +#[derive(Debug)] +enum BufferOrList { + Buffer(gst::Buffer), + List(gst::BufferList), +} + #[test] fn test_receive_list_benchmark() { init(); let clock = gst::SystemClock::obtain(); const N_PACKETS: usize = 32 * 1024; const N_PUSHES: usize = 1024 / 32; - let h = receive_init(); + + let (send, recv) = std::sync::mpsc::channel(); + + let sinkpad = gst::Pad::builder(gst::PadDirection::Sink) + .name("sink") + .chain_list_function({ + let send = send.clone(); + move |_pad, _parent, list| { + send.send(BufferOrList::List(list)).unwrap(); + Ok(gst::FlowSuccess::Ok) + } + }) + .chain_function(move |_pad, _parent, buffer| { + send.send(BufferOrList::Buffer(buffer)).unwrap(); + + Ok(gst::FlowSuccess::Ok) + }) + .build(); + sinkpad.set_active(true).unwrap(); + + let h = receive_init_with_new_srcpad(move |_h, srcpad| { + srcpad.link(&sinkpad).unwrap(); + }); let inner = h.lock().unwrap(); let push_pad = inner @@ -572,21 +604,33 @@ fn test_receive_list_benchmark() { let pushed = clock.time(); - let mut inner = h.lock().unwrap(); - for p in 0..N_PUSHES { - for i in 0..N_PACKETS { - let buffer = inner.pull().unwrap(); - let mapped = buffer.map_readable().unwrap(); - let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); - assert_eq!( - rtp.sequence_number(), - ((p * N_PACKETS + i) % u16::MAX as usize) as u16 - ); + let mut idx = 0; + loop { + let list = recv.recv().unwrap(); + println!("{idx} received {list:?}"); + match list { + BufferOrList::List(list) => { + list.foreach(|buffer, _buf_idx| { + let mapped = buffer.map_readable().unwrap(); + let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); + assert_eq!(rtp.sequence_number(), (idx % u16::MAX as usize) as u16); + idx += 1; + std::ops::ControlFlow::Continue(()) + }); + } + BufferOrList::Buffer(buffer) => { + let mapped = buffer.map_readable().unwrap(); + let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); + assert_eq!(rtp.sequence_number(), (idx % u16::MAX as usize) as u16); + idx += 1; + } + } + if idx >= N_PUSHES * N_PACKETS { + break; } } let end = clock.time(); - drop(inner); let test_time = end.opt_sub(start); let pull_time = end.opt_sub(pushed);