tests/rtpbin2: improve performance of receive_list_benchmark

Instead of receiving each output buffer through GstHarness individually,
process the received buffer lists directly by prociding our own pad and
chain/chain_list handlers.  Reduces test time locally from 30+s to <2s.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2081>
This commit is contained in:
Matthew Waters 2025-02-16 15:16:29 +11:00 committed by GStreamer Marge Bot
parent bb5e0c9917
commit 5ada3da580

View file

@ -274,6 +274,13 @@ fn test_send_list_benchmark() {
}
fn receive_init() -> Arc<Mutex<gst_check::Harness>> {
receive_init_with_new_srcpad(|h, srcpad| h.add_element_src_pad(srcpad))
}
fn receive_init_with_new_srcpad<F>(new_srcpad: F) -> Arc<Mutex<gst_check::Harness>>
where
F: Fn(&mut gst_check::Harness, &gst::Pad) + Send + Sync + 'static,
{
let id = next_element_counter();
let elem = gst::ElementFactory::make("rtprecv")
@ -291,12 +298,9 @@ fn receive_init() -> Arc<Mutex<gst_check::Harness>> {
.element()
.unwrap()
.connect_pad_added(move |_elem, pad| {
weak_h
.upgrade()
.unwrap()
.lock()
.unwrap()
.add_element_src_pad(pad)
let h = weak_h.upgrade().unwrap();
let mut h = h.lock().unwrap();
new_srcpad(&mut h, pad);
});
inner.play();
let caps = Caps::builder("application/x-rtp")
@ -529,13 +533,41 @@ fn test_receive_benchmark() {
);
}
#[derive(Debug)]
enum BufferOrList {
Buffer(gst::Buffer),
List(gst::BufferList),
}
#[test]
fn test_receive_list_benchmark() {
init();
let clock = gst::SystemClock::obtain();
const N_PACKETS: usize = 32 * 1024;
const N_PUSHES: usize = 1024 / 32;
let h = receive_init();
let (send, recv) = std::sync::mpsc::channel();
let sinkpad = gst::Pad::builder(gst::PadDirection::Sink)
.name("sink")
.chain_list_function({
let send = send.clone();
move |_pad, _parent, list| {
send.send(BufferOrList::List(list)).unwrap();
Ok(gst::FlowSuccess::Ok)
}
})
.chain_function(move |_pad, _parent, buffer| {
send.send(BufferOrList::Buffer(buffer)).unwrap();
Ok(gst::FlowSuccess::Ok)
})
.build();
sinkpad.set_active(true).unwrap();
let h = receive_init_with_new_srcpad(move |_h, srcpad| {
srcpad.link(&sinkpad).unwrap();
});
let inner = h.lock().unwrap();
let push_pad = inner
@ -572,21 +604,33 @@ fn test_receive_list_benchmark() {
let pushed = clock.time();
let mut inner = h.lock().unwrap();
for p in 0..N_PUSHES {
for i in 0..N_PACKETS {
let buffer = inner.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(
rtp.sequence_number(),
((p * N_PACKETS + i) % u16::MAX as usize) as u16
);
let mut idx = 0;
loop {
let list = recv.recv().unwrap();
println!("{idx} received {list:?}");
match list {
BufferOrList::List(list) => {
list.foreach(|buffer, _buf_idx| {
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), (idx % u16::MAX as usize) as u16);
idx += 1;
std::ops::ControlFlow::Continue(())
});
}
BufferOrList::Buffer(buffer) => {
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), (idx % u16::MAX as usize) as u16);
idx += 1;
}
}
if idx >= N_PUSHES * N_PACKETS {
break;
}
}
let end = clock.time();
drop(inner);
let test_time = end.opt_sub(start);
let pull_time = end.opt_sub(pushed);