rtp/send: support receiving buffer lists

Can reduce processing overhead if many buffers are pushed concurrently.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1618>
This commit is contained in:
Matthew Waters 2024-06-12 17:51:45 +10:00 committed by GStreamer Marge Bot
parent 2d1f556794
commit df4a4fb2ef
2 changed files with 265 additions and 35 deletions

View file

@ -307,17 +307,14 @@ impl RtpSend {
gst::Iterator::from_vec(vec![])
}
fn rtp_send_sink_chain(
fn handle_buffer(
&self,
id: usize,
sinkpad: &gst::Pad,
srcpad: &gst::Pad,
internal_session: &SharedSession,
buffer: gst::Buffer,
now: Instant,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let state = self.state.lock().unwrap();
let Some(session) = state.session_by_id(id) else {
gst::error!(CAT, "No session?");
return Err(gst::FlowError::Error);
};
let mapped = buffer.map_readable().map_err(|e| {
gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}");
gst::FlowError::Error
@ -330,14 +327,9 @@ impl RtpSend {
}
};
let srcpad = session.rtp_send_srcpad.clone().unwrap();
let sinkpad = session.rtp_send_sinkpad.clone().unwrap();
let session = session.internal_session.clone();
let mut session_inner = session.inner.lock().unwrap();
drop(state);
let mut session_inner = internal_session.inner.lock().unwrap();
let now = Instant::now();
let mut ssrc_collision = vec![];
let mut ssrc_collision: smallvec::SmallVec<[u32; 4]> = smallvec::SmallVec::new();
loop {
match session_inner.session.handle_send(&rtp, now) {
SendReply::SsrcCollision(ssrc) => {
@ -347,8 +339,10 @@ impl RtpSend {
}
SendReply::NewSsrc(ssrc, _pt) => {
drop(session_inner);
session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]);
session_inner = session.inner.lock().unwrap();
internal_session
.config
.emit_by_name::<()>("new-ssrc", &[&ssrc]);
session_inner = internal_session.inner.lock().unwrap();
}
SendReply::Passthrough => break,
SendReply::Drop => return Ok(gst::FlowSuccess::Ok),
@ -374,7 +368,50 @@ impl RtpSend {
srcpad.push(buffer)
}
fn rtp_send_sink_event(&self, pad: &gst::Pad, event: gst::Event, id: usize) -> bool {
fn rtp_sink_chain_list(
&self,
pad: &gst::Pad,
id: usize,
list: gst::BufferList,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let state = self.state.lock().unwrap();
let Some(session) = state.session_by_id(id) else {
gst::error!(CAT, "No session?");
return Err(gst::FlowError::Error);
};
let srcpad = session.rtp_send_srcpad.clone().unwrap();
let internal_session = session.internal_session.clone();
drop(state);
let now = Instant::now();
for buffer in list.iter_owned() {
self.handle_buffer(pad, &srcpad, &internal_session, buffer, now)?;
}
Ok(gst::FlowSuccess::Ok)
}
fn rtp_sink_chain(
&self,
pad: &gst::Pad,
id: usize,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let state = self.state.lock().unwrap();
let Some(session) = state.session_by_id(id) else {
gst::error!(CAT, "No session?");
return Err(gst::FlowError::Error);
};
let srcpad = session.rtp_send_srcpad.clone().unwrap();
let internal_session = session.internal_session.clone();
drop(state);
let now = Instant::now();
self.handle_buffer(pad, &srcpad, &internal_session, buffer, now)
}
fn rtp_sink_event(&self, pad: &gst::Pad, event: gst::Event, id: usize) -> bool {
match event.view() {
gst::EventView::Caps(caps) => {
if let Some((pt, clock_rate)) = pt_clock_rate_from_caps(caps.caps()) {
@ -651,11 +688,18 @@ impl ElementImpl for RtpSend {
Vec<gst::Event>,
)> {
let sinkpad = gst::Pad::builder_from_template(templ)
.chain_function(move |_pad, parent, buffer| {
.chain_function(move |pad, parent, buffer| {
RtpSend::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.rtp_send_sink_chain(id, buffer),
|this| this.rtp_sink_chain(pad, id, buffer),
)
})
.chain_list_function(move |pad, parent, list| {
RtpSend::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.rtp_sink_chain_list(pad, id, list),
)
})
.iterate_internal_links_function(|pad, parent| {
@ -669,7 +713,7 @@ impl ElementImpl for RtpSend {
RtpSend::catch_panic_pad_function(
parent,
|| false,
|this| this.rtp_send_sink_event(pad, event, id),
|this| this.rtp_sink_event(pad, event, id),
)
})
.flags(gst::PadFlags::PROXY_CAPS)

View file

@ -47,9 +47,35 @@ fn generate_rtp_buffer(seqno: u16, rtpts: u32, payload_len: usize) -> gst::Buffe
gst::Buffer::from_mut_slice(data)
}
#[test]
fn test_send() {
#[derive(Debug, Copy, Clone)]
struct PacketInfo {
seq_no: u16,
rtp_ts: u32,
payload_len: usize,
}
impl PacketInfo {
fn generate_buffer(&self) -> gst::Buffer {
generate_rtp_buffer(self.seq_no, self.rtp_ts, self.payload_len)
}
}
static PACKETS_TEST_1: [PacketInfo; 2] = [
PacketInfo {
seq_no: 500,
rtp_ts: 20,
payload_len: 7,
},
PacketInfo {
seq_no: 501,
rtp_ts: 30,
payload_len: 23,
},
];
fn send_init() -> gst_check::Harness {
init();
let id = next_element_counter();
let elem = gst::ElementFactory::make("rtpsend")
@ -67,19 +93,56 @@ fn test_send() {
.build();
h.set_src_caps(caps);
h.push(generate_rtp_buffer(500, 20, 9)).unwrap();
h.push(generate_rtp_buffer(501, 30, 11)).unwrap();
h
}
fn send_push<I>(h: &mut gst_check::Harness, packets: I, buffer_list: bool)
where
I: IntoIterator<Item = PacketInfo>,
{
if buffer_list {
let mut list = gst::BufferList::new();
let list_mut = list.make_mut();
for packet in packets {
list_mut.add(packet.generate_buffer());
}
let push_pad = h
.element()
.unwrap()
.static_pad("rtp_sink_0")
.unwrap()
.peer()
.unwrap();
push_pad.push_list(list).unwrap();
} else {
for packet in packets {
h.push(packet.generate_buffer()).unwrap();
}
}
}
fn send_pull<I>(h: &mut gst_check::Harness, packets: I)
where
I: IntoIterator<Item = PacketInfo>,
{
for packet in packets {
let buffer = h.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), 500);
let buffer = h.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), 501);
assert_eq!(rtp.sequence_number(), packet.seq_no);
}
}
fn send_check_stats<I>(h: &mut gst_check::Harness, packets: I)
where
I: IntoIterator<Item = PacketInfo>,
{
let mut n_packets = 0;
let mut n_bytes = 0;
for packet in packets {
n_packets += 1;
n_bytes += packet.payload_len;
}
let stats = h.element().unwrap().property::<gst::Structure>("stats");
let session_stats = stats.get::<gst::Structure>("0").unwrap();
@ -93,8 +156,131 @@ fn test_send() {
);
assert!(source_stats.get::<bool>("sender").unwrap());
assert!(source_stats.get::<bool>("local").unwrap());
assert_eq!(source_stats.get::<u64>("packets-sent").unwrap(), 2);
assert_eq!(source_stats.get::<u64>("octets-sent").unwrap(), 20);
assert_eq!(source_stats.get::<u64>("packets-sent").unwrap(), n_packets);
assert_eq!(
source_stats.get::<u64>("octets-sent").unwrap(),
n_bytes as u64
);
}
#[test]
fn test_send() {
init();
let mut h = send_init();
send_push(&mut h, PACKETS_TEST_1, false);
send_pull(&mut h, PACKETS_TEST_1);
send_check_stats(&mut h, PACKETS_TEST_1);
}
#[test]
fn test_send_list() {
let mut h = send_init();
send_push(&mut h, PACKETS_TEST_1, true);
send_pull(&mut h, PACKETS_TEST_1);
send_check_stats(&mut h, PACKETS_TEST_1);
}
#[test]
fn test_send_benchmark() {
init();
let clock = gst::SystemClock::obtain();
const N_PACKETS: usize = 2 * 1024 * 1024;
let mut packets = Vec::with_capacity(N_PACKETS);
for i in 0..N_PACKETS {
packets.push(
PacketInfo {
seq_no: (i % u16::MAX as usize) as u16,
rtp_ts: i as u32,
payload_len: 8,
}
.generate_buffer(),
)
}
let mut h = send_init();
let start = clock.time();
for packet in packets {
h.push(packet).unwrap();
}
let pushed = clock.time();
for i in 0..N_PACKETS {
let buffer = h.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), (i % u16::MAX as usize) as u16);
}
let end = clock.time();
let test_time = end.opt_sub(start);
let pull_time = end.opt_sub(pushed);
let push_time = pushed.opt_sub(start);
println!(
"test took {} (push {}, pull {})",
test_time.display(),
push_time.display(),
pull_time.display()
);
}
#[test]
fn test_send_list_benchmark() {
init();
let clock = gst::SystemClock::obtain();
const N_PACKETS: usize = 2 * 1024 * 1024;
let mut list = gst::BufferList::new();
let list_mut = list.make_mut();
for i in 0..N_PACKETS {
list_mut.add(
PacketInfo {
seq_no: (i % u16::MAX as usize) as u16,
rtp_ts: i as u32,
payload_len: 8,
}
.generate_buffer(),
);
}
let mut h = send_init();
let push_pad = h
.element()
.unwrap()
.static_pad("rtp_sink_0")
.unwrap()
.peer()
.unwrap();
let start = clock.time();
push_pad.push_list(list).unwrap();
let pushed = clock.time();
for i in 0..N_PACKETS {
let buffer = h.pull().unwrap();
let mapped = buffer.map_readable().unwrap();
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
assert_eq!(rtp.sequence_number(), (i % u16::MAX as usize) as u16);
}
let end = clock.time();
let test_time = end.opt_sub(start);
let pull_time = end.opt_sub(pushed);
let push_time = pushed.opt_sub(start);
println!(
"test took {} (push {}, pull {})",
test_time.display(),
push_time.display(),
pull_time.display()
);
}
#[test]