// // Copyright (C) 2023 Matthew Waters // // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at // . // // SPDX-License-Identifier: MPL-2.0 use std::sync::{atomic::AtomicUsize, Arc, Mutex}; use gst::{prelude::*, Caps}; use gst_check::Harness; use rtp_types::*; static ELEMENT_COUNTER: AtomicUsize = AtomicUsize::new(0); fn next_element_counter() -> usize { ELEMENT_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst) } fn init() { use std::sync::Once; static INIT: Once = Once::new(); INIT.call_once(|| { gst::init().unwrap(); gstrsrtp::plugin_register_static().expect("rtpbin2 test"); }); } const TEST_SSRC: u32 = 0x12345678; const TEST_PT: u8 = 96; const TEST_CLOCK_RATE: u32 = 48000; fn generate_rtp_buffer(seqno: u16, rtpts: u32, payload_len: usize) -> gst::Buffer { let payload = vec![4; payload_len]; let packet = RtpPacketBuilder::new() .ssrc(TEST_SSRC) .payload_type(TEST_PT) .sequence_number(seqno) .timestamp(rtpts) .payload(payload.as_slice()); let size = packet.calculate_size().unwrap(); let mut data = vec![0; size]; packet.write_into(&mut data).unwrap(); gst::Buffer::from_mut_slice(data) } #[derive(Debug, Copy, Clone)] struct PacketInfo { seq_no: u16, rtp_ts: u32, payload_len: usize, } impl PacketInfo { fn generate_buffer(&self, dts: Option) -> gst::Buffer { let mut buf = generate_rtp_buffer(self.seq_no, self.rtp_ts, self.payload_len); let buf_mut = buf.make_mut(); buf_mut.set_dts(dts); buf } } fn send_init() -> gst_check::Harness { init(); let id = next_element_counter(); let elem = gst::ElementFactory::make("rtpsend") .property("rtp-id", id.to_string()) .build() .unwrap(); let mut h = Harness::with_element(&elem, Some("rtp_sink_0"), Some("rtp_src_0")); h.play(); let caps = Caps::builder("application/x-rtp") .field("media", "audio") .field("payload", TEST_PT as i32) .field("clock-rate", TEST_CLOCK_RATE as i32) .field("encoding-name", "custom-test") .build(); h.set_src_caps(caps); h } fn send_push(h: &mut gst_check::Harness, packets: I, buffer_list: bool) where I: IntoIterator, { if buffer_list { let mut list = gst::BufferList::new(); let list_mut = list.make_mut(); for packet in packets { list_mut.add(packet.generate_buffer(None)); } let push_pad = h .element() .unwrap() .static_pad("rtp_sink_0") .unwrap() .peer() .unwrap(); push_pad.push_list(list).unwrap(); } else { for packet in packets { h.push(packet.generate_buffer(None)).unwrap(); } } } fn send_pull(h: &mut gst_check::Harness, packets: I) where I: IntoIterator, { for packet in packets { let buffer = h.pull().unwrap(); let mapped = buffer.map_readable().unwrap(); let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); assert_eq!(rtp.sequence_number(), packet.seq_no); } } fn send_check_stats(h: &mut gst_check::Harness, packets: I) where I: IntoIterator, { let mut n_packets = 0; let mut n_bytes = 0; for packet in packets { n_packets += 1; n_bytes += packet.payload_len; } let stats = h.element().unwrap().property::("stats"); let session_stats = stats.get::("0").unwrap(); let source_stats = session_stats .get::(TEST_SSRC.to_string()) .unwrap(); assert_eq!(source_stats.get::("ssrc").unwrap(), TEST_SSRC); assert_eq!( source_stats.get::("clock-rate").unwrap(), TEST_CLOCK_RATE ); assert!(source_stats.get::("sender").unwrap()); assert!(source_stats.get::("local").unwrap()); assert_eq!(source_stats.get::("packets-sent").unwrap(), n_packets); assert_eq!( source_stats.get::("octets-sent").unwrap(), n_bytes as u64 ); } #[test] fn test_send() { init(); let mut h = send_init(); send_push(&mut h, PACKETS_TEST_1, false); send_pull(&mut h, PACKETS_TEST_1); send_check_stats(&mut h, PACKETS_TEST_1); } #[test] fn test_send_list() { let mut h = send_init(); send_push(&mut h, PACKETS_TEST_1, true); send_pull(&mut h, PACKETS_TEST_1); send_check_stats(&mut h, PACKETS_TEST_1); } #[test] fn test_send_benchmark() { init(); let clock = gst::SystemClock::obtain(); const N_PACKETS: usize = 2 * 1024 * 1024; let mut packets = Vec::with_capacity(N_PACKETS); for i in 0..N_PACKETS { packets.push( PacketInfo { seq_no: (i % u16::MAX as usize) as u16, rtp_ts: i as u32, payload_len: 8, } .generate_buffer(None), ) } let mut h = send_init(); let start = clock.time(); for packet in packets { h.push(packet).unwrap(); } let pushed = clock.time(); for i in 0..N_PACKETS { let buffer = h.pull().unwrap(); let mapped = buffer.map_readable().unwrap(); let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); assert_eq!(rtp.sequence_number(), (i % u16::MAX as usize) as u16); } let end = clock.time(); let test_time = end.opt_sub(start); let pull_time = end.opt_sub(pushed); let push_time = pushed.opt_sub(start); println!( "test took {} (push {}, pull {})", test_time.display(), push_time.display(), pull_time.display() ); } #[test] fn test_send_list_benchmark() { init(); let clock = gst::SystemClock::obtain(); const N_PACKETS: usize = 2 * 1024 * 1024; let mut list = gst::BufferList::new(); let list_mut = list.make_mut(); for i in 0..N_PACKETS { list_mut.add( PacketInfo { seq_no: (i % u16::MAX as usize) as u16, rtp_ts: i as u32, payload_len: 8, } .generate_buffer(None), ); } let mut h = send_init(); let push_pad = h .element() .unwrap() .static_pad("rtp_sink_0") .unwrap() .peer() .unwrap(); let start = clock.time(); push_pad.push_list(list).unwrap(); let pushed = clock.time(); for i in 0..N_PACKETS { let buffer = h.pull().unwrap(); let mapped = buffer.map_readable().unwrap(); let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); assert_eq!(rtp.sequence_number(), (i % u16::MAX as usize) as u16); } let end = clock.time(); let test_time = end.opt_sub(start); let pull_time = end.opt_sub(pushed); let push_time = pushed.opt_sub(start); println!( "test took {} (push {}, pull {})", test_time.display(), push_time.display(), pull_time.display() ); } fn receive_init() -> Arc> { let id = next_element_counter(); let elem = gst::ElementFactory::make("rtprecv") .property("rtp-id", id.to_string()) .build() .unwrap(); let h = Arc::new(Mutex::new(Harness::with_element( &elem, Some("rtp_sink_0"), None, ))); let weak_h = Arc::downgrade(&h); let mut inner = h.lock().unwrap(); inner .element() .unwrap() .connect_pad_added(move |_elem, pad| { weak_h .upgrade() .unwrap() .lock() .unwrap() .add_element_src_pad(pad) }); inner.play(); let caps = Caps::builder("application/x-rtp") .field("media", "audio") .field("payload", TEST_PT as i32) .field("clock-rate", TEST_CLOCK_RATE as i32) .field("encoding-name", "custom-test") .build(); inner.set_src_caps(caps); drop(inner); h } fn receive_push(h: Arc>, packets: I, buffer_list: bool) where I: IntoIterator, { let inner = h.lock().unwrap(); // Cannot push with harness lock as the 'pad-added' handler needs to add the newly created pad to // the harness and needs to also take the harness lock. Workaround by pushing from the // internal harness pad directly. let push_pad = inner .element() .unwrap() .static_pad("rtp_sink_0") .unwrap() .peer() .unwrap(); drop(inner); if buffer_list { let mut list = gst::BufferList::new(); let list_mut = list.make_mut(); for packet in packets { list_mut.add(packet.generate_buffer(None)); } push_pad.push_list(list).unwrap(); } else { for packet in packets { push_pad.push(packet.generate_buffer(None)).unwrap(); } } } fn receive_pull(h: Arc>, packets: I) where I: IntoIterator, { let mut inner = h.lock().unwrap(); for packet in packets { let buffer = inner.pull().unwrap(); let mapped = buffer.map_readable().unwrap(); let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); assert_eq!(rtp.sequence_number(), packet.seq_no); } } fn receive_check_stats(h: Arc>, packets: I) where I: IntoIterator, { let mut n_packets = 0; let mut n_bytes = 0; for packet in packets { n_packets += 1; n_bytes += packet.payload_len; } let inner = h.lock().unwrap(); let stats = inner.element().unwrap().property::("stats"); drop(inner); let session_stats = stats.get::("0").unwrap(); let source_stats = session_stats .get::(TEST_SSRC.to_string()) .unwrap(); let jitterbuffers_stats = session_stats .get::("jitterbuffer-stats") .unwrap(); assert_eq!(jitterbuffers_stats.len(), 1); let jitterbuffer_stats = jitterbuffers_stats .first() .unwrap() .get::() .unwrap(); assert_eq!(source_stats.get::("ssrc").unwrap(), TEST_SSRC); assert_eq!( source_stats.get::("clock-rate").unwrap(), TEST_CLOCK_RATE ); assert!(source_stats.get::("sender").unwrap()); assert!(!source_stats.get::("local").unwrap()); assert_eq!( source_stats.get::("packets-received").unwrap(), n_packets ); assert_eq!( source_stats.get::("octets-received").unwrap(), n_bytes as u64 ); assert_eq!(jitterbuffer_stats.get::("num-late").unwrap(), 0); assert_eq!(jitterbuffer_stats.get::("num-lost").unwrap(), 0); assert_eq!(jitterbuffer_stats.get::("num-duplicates").unwrap(), 0); assert_eq!( jitterbuffer_stats.get::("num-pushed").unwrap(), n_packets ); assert_eq!(jitterbuffer_stats.get::("pt").unwrap(), TEST_PT as i32); assert_eq!( jitterbuffer_stats.get::("ssrc").unwrap(), TEST_SSRC as i32 ); } static PACKETS_TEST_1: [PacketInfo; 2] = [ PacketInfo { seq_no: 500, rtp_ts: 20, payload_len: 13, }, PacketInfo { seq_no: 501, rtp_ts: 30, payload_len: 7, }, ]; #[test] fn test_receive() { init(); let h = receive_init(); receive_push(h.clone(), PACKETS_TEST_1, false); receive_pull(h.clone(), PACKETS_TEST_1); receive_check_stats(h, PACKETS_TEST_1); } #[test] fn test_receive_list() { init(); let h = receive_init(); receive_push(h.clone(), PACKETS_TEST_1, true); receive_pull(h.clone(), PACKETS_TEST_1); receive_check_stats(h, PACKETS_TEST_1); } #[test] fn test_receive_flush() { init(); let h = receive_init(); receive_push(h.clone(), PACKETS_TEST_1, false); let mut inner = h.lock().unwrap(); let seqnum = gst::Seqnum::next(); inner.push_event(gst::event::FlushStart::builder().seqnum(seqnum).build()); inner.push_event(gst::event::FlushStop::builder(false).seqnum(seqnum).build()); let event = inner.pull_event().unwrap(); let gst::EventView::FlushStart(fs) = event.view() else { unreachable!(); }; assert_eq!(fs.seqnum(), seqnum); let event = inner.pull_event().unwrap(); let gst::EventView::FlushStop(fs) = event.view() else { unreachable!(); }; assert_eq!(fs.seqnum(), seqnum); } #[test] fn test_receive_benchmark() { init(); let clock = gst::SystemClock::obtain(); //const N_PACKETS: usize = 1024 * 1024; const N_PACKETS: usize = 1024; let mut packets = Vec::with_capacity(N_PACKETS); for i in 0..N_PACKETS { packets.push( PacketInfo { seq_no: (i % u16::MAX as usize) as u16, rtp_ts: i as u32, payload_len: 8, } .generate_buffer(None), ) } let h = receive_init(); let inner = h.lock().unwrap(); let push_pad = inner .element() .unwrap() .static_pad("rtp_sink_0") .unwrap() .peer() .unwrap(); drop(inner); let start = clock.time(); for packet in packets { push_pad.push(packet).unwrap(); } let pushed = clock.time(); let mut inner = h.lock().unwrap(); for i in 0..N_PACKETS { let buffer = inner.pull().unwrap(); let mapped = buffer.map_readable().unwrap(); let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); assert_eq!(rtp.sequence_number(), (i % u16::MAX as usize) as u16); } let end = clock.time(); drop(inner); let test_time = end.opt_sub(start); let pull_time = end.opt_sub(pushed); let push_time = pushed.opt_sub(start); println!( "test took {} (push {}, pull {})", test_time.display(), push_time.display(), pull_time.display() ); } #[test] fn test_receive_list_benchmark() { init(); let clock = gst::SystemClock::obtain(); const N_PACKETS: usize = 32 * 1024; const N_PUSHES: usize = 1024 / 32; let h = receive_init(); let inner = h.lock().unwrap(); let push_pad = inner .element() .unwrap() .static_pad("rtp_sink_0") .unwrap() .peer() .unwrap(); drop(inner); let mut lists = Vec::with_capacity(N_PUSHES); for p in 0..N_PUSHES { let mut list = gst::BufferList::new(); let list_mut = list.make_mut(); for i in 0..N_PACKETS { list_mut.add( PacketInfo { seq_no: ((p * N_PACKETS + i) % u16::MAX as usize) as u16, rtp_ts: (p * N_PACKETS + i) as u32, payload_len: 8, } .generate_buffer(None), ); } lists.push(list); } let start = clock.time(); for list in lists { push_pad.push_list(list).unwrap(); } let pushed = clock.time(); let mut inner = h.lock().unwrap(); for p in 0..N_PUSHES { for i in 0..N_PACKETS { let buffer = inner.pull().unwrap(); let mapped = buffer.map_readable().unwrap(); let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap(); assert_eq!( rtp.sequence_number(), ((p * N_PACKETS + i) % u16::MAX as usize) as u16 ); } } let end = clock.time(); drop(inner); let test_time = end.opt_sub(start); let pull_time = end.opt_sub(pushed); let push_time = pushed.opt_sub(start); println!( "test took {} (push {}, pull {})", test_time.display(), push_time.display(), pull_time.display() ); } #[test] fn recv_release_sink_pad() { init(); let id = next_element_counter(); let elem = gst::ElementFactory::make("rtprecv") .property("rtp-id", id.to_string()) .build() .unwrap(); elem.set_state(gst::State::Playing).unwrap(); let sinkpad = elem.request_pad_simple("rtp_sink_0").unwrap(); let stream_start = gst::event::StreamStart::new("random"); sinkpad.send_event(stream_start); let caps = Caps::builder("application/x-rtp") .field("media", "audio") .field("payload", TEST_PT as i32) .field("clock-rate", TEST_CLOCK_RATE as i32) .field("encoding-name", "custom-test") .build(); sinkpad.send_event(gst::event::Caps::new(&caps)); let segment = gst::FormattedSegment::::new(); sinkpad.send_event(gst::event::Segment::new(&segment)); let (sender, recv) = std::sync::mpsc::sync_channel(1); elem.connect_pad_added({ let sender = sender.clone(); move |_elem, pad| { let other_pad = gst::Pad::builder(gst::PadDirection::Sink) .chain_function(|_pad, _parent, _buffer| Ok(gst::FlowSuccess::Ok)) .build(); other_pad.set_active(true).unwrap(); pad.link(&other_pad).unwrap(); sender.send(other_pad).unwrap(); } }); // push two buffers to get past the rtpsource validation sinkpad .chain( PacketInfo { seq_no: 30, rtp_ts: 10, payload_len: 4, } .generate_buffer(Some(gst::ClockTime::from_mseconds(50))), ) .unwrap(); sinkpad .chain( PacketInfo { seq_no: 31, rtp_ts: 10, payload_len: 4, } .generate_buffer(Some(gst::ClockTime::from_mseconds(100))), ) .unwrap(); let _other_pad = recv.recv().unwrap(); elem.release_request_pad(&sinkpad); elem.set_state(gst::State::Null).unwrap(); }