diff --git a/net/rtp/src/rtpbin2/rtpsend.rs b/net/rtp/src/rtpbin2/rtpsend.rs index 04104c0e..e2db2479 100644 --- a/net/rtp/src/rtpbin2/rtpsend.rs +++ b/net/rtp/src/rtpbin2/rtpsend.rs @@ -307,17 +307,14 @@ impl RtpSend { gst::Iterator::from_vec(vec![]) } - fn rtp_send_sink_chain( + fn handle_buffer( &self, - id: usize, + sinkpad: &gst::Pad, + srcpad: &gst::Pad, + internal_session: &SharedSession, buffer: gst::Buffer, + now: Instant, ) -> Result { - let state = self.state.lock().unwrap(); - let Some(session) = state.session_by_id(id) else { - gst::error!(CAT, "No session?"); - return Err(gst::FlowError::Error); - }; - let mapped = buffer.map_readable().map_err(|e| { gst::error!(CAT, imp: self, "Failed to map input buffer {e:?}"); gst::FlowError::Error @@ -330,14 +327,9 @@ impl RtpSend { } }; - let srcpad = session.rtp_send_srcpad.clone().unwrap(); - let sinkpad = session.rtp_send_sinkpad.clone().unwrap(); - let session = session.internal_session.clone(); - let mut session_inner = session.inner.lock().unwrap(); - drop(state); + let mut session_inner = internal_session.inner.lock().unwrap(); - let now = Instant::now(); - let mut ssrc_collision = vec![]; + let mut ssrc_collision: smallvec::SmallVec<[u32; 4]> = smallvec::SmallVec::new(); loop { match session_inner.session.handle_send(&rtp, now) { SendReply::SsrcCollision(ssrc) => { @@ -347,8 +339,10 @@ impl RtpSend { } SendReply::NewSsrc(ssrc, _pt) => { drop(session_inner); - session.config.emit_by_name::<()>("new-ssrc", &[&ssrc]); - session_inner = session.inner.lock().unwrap(); + internal_session + .config + .emit_by_name::<()>("new-ssrc", &[&ssrc]); + session_inner = internal_session.inner.lock().unwrap(); } SendReply::Passthrough => break, SendReply::Drop => return Ok(gst::FlowSuccess::Ok), @@ -374,7 +368,50 @@ impl RtpSend { srcpad.push(buffer) } - fn rtp_send_sink_event(&self, pad: &gst::Pad, event: gst::Event, id: usize) -> bool { + fn rtp_sink_chain_list( + &self, + pad: &gst::Pad, + id: usize, + list: gst::BufferList, + ) -> Result { + let state = self.state.lock().unwrap(); + let Some(session) = state.session_by_id(id) else { + gst::error!(CAT, "No session?"); + return Err(gst::FlowError::Error); + }; + + let srcpad = session.rtp_send_srcpad.clone().unwrap(); + let internal_session = session.internal_session.clone(); + drop(state); + + let now = Instant::now(); + for buffer in list.iter_owned() { + self.handle_buffer(pad, &srcpad, &internal_session, buffer, now)?; + } + Ok(gst::FlowSuccess::Ok) + } + + fn rtp_sink_chain( + &self, + pad: &gst::Pad, + id: usize, + buffer: gst::Buffer, + ) -> Result { + let state = self.state.lock().unwrap(); + let Some(session) = state.session_by_id(id) else { + gst::error!(CAT, "No session?"); + return Err(gst::FlowError::Error); + }; + + let srcpad = session.rtp_send_srcpad.clone().unwrap(); + let internal_session = session.internal_session.clone(); + drop(state); + + let now = Instant::now(); + self.handle_buffer(pad, &srcpad, &internal_session, buffer, now) + } + + fn rtp_sink_event(&self, pad: &gst::Pad, event: gst::Event, id: usize) -> bool { match event.view() { gst::EventView::Caps(caps) => { if let Some((pt, clock_rate)) = pt_clock_rate_from_caps(caps.caps()) { @@ -651,11 +688,18 @@ impl ElementImpl for RtpSend { Vec, )> { let sinkpad = gst::Pad::builder_from_template(templ) - .chain_function(move |_pad, parent, buffer| { + .chain_function(move |pad, parent, buffer| { RtpSend::catch_panic_pad_function( parent, || Err(gst::FlowError::Error), - |this| this.rtp_send_sink_chain(id, buffer), + |this| this.rtp_sink_chain(pad, id, buffer), + ) + }) + .chain_list_function(move |pad, parent, list| { + RtpSend::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this| this.rtp_sink_chain_list(pad, id, list), ) }) .iterate_internal_links_function(|pad, parent| { @@ -669,7 +713,7 @@ impl ElementImpl for RtpSend { RtpSend::catch_panic_pad_function( parent, || false, - |this| this.rtp_send_sink_event(pad, event, id), + |this| this.rtp_sink_event(pad, event, id), ) }) .flags(gst::PadFlags::PROXY_CAPS) diff --git a/net/rtp/tests/rtpbin2.rs b/net/rtp/tests/rtpbin2.rs index add575bf..7abddcec 100644 --- a/net/rtp/tests/rtpbin2.rs +++ b/net/rtp/tests/rtpbin2.rs @@ -47,9 +47,35 @@ fn generate_rtp_buffer(seqno: u16, rtpts: u32, payload_len: usize) -> gst::Buffe gst::Buffer::from_mut_slice(data) } -#[test] -fn test_send() { +#[derive(Debug, Copy, Clone)] +struct PacketInfo { + seq_no: u16, + rtp_ts: u32, + payload_len: usize, +} + +impl PacketInfo { + fn generate_buffer(&self) -> gst::Buffer { + generate_rtp_buffer(self.seq_no, self.rtp_ts, self.payload_len) + } +} + +static PACKETS_TEST_1: [PacketInfo; 2] = [ + PacketInfo { + seq_no: 500, + rtp_ts: 20, + payload_len: 7, + }, + PacketInfo { + seq_no: 501, + rtp_ts: 30, + payload_len: 23, + }, +]; + +fn send_init() -> gst_check::Harness { init(); + let id = next_element_counter(); let elem = gst::ElementFactory::make("rtpsend") @@ -67,19 +93,56 @@ fn test_send() { .build(); h.set_src_caps(caps); - h.push(generate_rtp_buffer(500, 20, 9)).unwrap(); - h.push(generate_rtp_buffer(501, 30, 11)).unwrap(); + h +} - let buffer = h.pull().unwrap(); - let mapped = buffer.map_readable().unwrap(); - let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); - assert_eq!(rtp.sequence_number(), 500); +fn send_push(h: &mut gst_check::Harness, packets: I, buffer_list: bool) +where + I: IntoIterator, +{ + if buffer_list { + let mut list = gst::BufferList::new(); + let list_mut = list.make_mut(); + for packet in packets { + list_mut.add(packet.generate_buffer()); + } + let push_pad = h + .element() + .unwrap() + .static_pad("rtp_sink_0") + .unwrap() + .peer() + .unwrap(); + push_pad.push_list(list).unwrap(); + } else { + for packet in packets { + h.push(packet.generate_buffer()).unwrap(); + } + } +} - let buffer = h.pull().unwrap(); - let mapped = buffer.map_readable().unwrap(); - let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); - assert_eq!(rtp.sequence_number(), 501); +fn send_pull(h: &mut gst_check::Harness, packets: I) +where + I: IntoIterator, +{ + for packet in packets { + let buffer = h.pull().unwrap(); + let mapped = buffer.map_readable().unwrap(); + let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); + assert_eq!(rtp.sequence_number(), packet.seq_no); + } +} +fn send_check_stats(h: &mut gst_check::Harness, packets: I) +where + I: IntoIterator, +{ + let mut n_packets = 0; + let mut n_bytes = 0; + for packet in packets { + n_packets += 1; + n_bytes += packet.payload_len; + } let stats = h.element().unwrap().property::("stats"); let session_stats = stats.get::("0").unwrap(); @@ -93,8 +156,131 @@ fn test_send() { ); assert!(source_stats.get::("sender").unwrap()); assert!(source_stats.get::("local").unwrap()); - assert_eq!(source_stats.get::("packets-sent").unwrap(), 2); - assert_eq!(source_stats.get::("octets-sent").unwrap(), 20); + assert_eq!(source_stats.get::("packets-sent").unwrap(), n_packets); + assert_eq!( + source_stats.get::("octets-sent").unwrap(), + n_bytes as u64 + ); +} + +#[test] +fn test_send() { + init(); + + let mut h = send_init(); + + send_push(&mut h, PACKETS_TEST_1, false); + send_pull(&mut h, PACKETS_TEST_1); + send_check_stats(&mut h, PACKETS_TEST_1); +} + +#[test] +fn test_send_list() { + let mut h = send_init(); + + send_push(&mut h, PACKETS_TEST_1, true); + send_pull(&mut h, PACKETS_TEST_1); + send_check_stats(&mut h, PACKETS_TEST_1); +} + +#[test] +fn test_send_benchmark() { + init(); + let clock = gst::SystemClock::obtain(); + const N_PACKETS: usize = 2 * 1024 * 1024; + let mut packets = Vec::with_capacity(N_PACKETS); + for i in 0..N_PACKETS { + packets.push( + PacketInfo { + seq_no: (i % u16::MAX as usize) as u16, + rtp_ts: i as u32, + payload_len: 8, + } + .generate_buffer(), + ) + } + + let mut h = send_init(); + + let start = clock.time(); + + for packet in packets { + h.push(packet).unwrap(); + } + + let pushed = clock.time(); + + for i in 0..N_PACKETS { + let buffer = h.pull().unwrap(); + let mapped = buffer.map_readable().unwrap(); + let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); + assert_eq!(rtp.sequence_number(), (i % u16::MAX as usize) as u16); + } + + let end = clock.time(); + + let test_time = end.opt_sub(start); + let pull_time = end.opt_sub(pushed); + let push_time = pushed.opt_sub(start); + println!( + "test took {} (push {}, pull {})", + test_time.display(), + push_time.display(), + pull_time.display() + ); +} + +#[test] +fn test_send_list_benchmark() { + init(); + let clock = gst::SystemClock::obtain(); + const N_PACKETS: usize = 2 * 1024 * 1024; + let mut list = gst::BufferList::new(); + let list_mut = list.make_mut(); + for i in 0..N_PACKETS { + list_mut.add( + PacketInfo { + seq_no: (i % u16::MAX as usize) as u16, + rtp_ts: i as u32, + payload_len: 8, + } + .generate_buffer(), + ); + } + let mut h = send_init(); + + let push_pad = h + .element() + .unwrap() + .static_pad("rtp_sink_0") + .unwrap() + .peer() + .unwrap(); + + let start = clock.time(); + + push_pad.push_list(list).unwrap(); + + let pushed = clock.time(); + + for i in 0..N_PACKETS { + let buffer = h.pull().unwrap(); + let mapped = buffer.map_readable().unwrap(); + let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); + assert_eq!(rtp.sequence_number(), (i % u16::MAX as usize) as u16); + } + + let end = clock.time(); + + let test_time = end.opt_sub(start); + let pull_time = end.opt_sub(pushed); + let push_time = pushed.opt_sub(start); + println!( + "test took {} (push {}, pull {})", + test_time.display(), + push_time.display(), + pull_time.display() + ); } #[test]