mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-09 18:55:27 +00:00
1600d3b055
There is now two elements, rtpsend and rtprecv that represent the two halves of a rtpsession. This avoids the potential pipeline loop if two peers are sending/receiving data towards each other. The two halves can be connected by setting the rtp-id property on each element to the same value and they will behave like a combined rtpbin-like element. Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1426>
262 lines
8.4 KiB
Rust
262 lines
8.4 KiB
Rust
//
|
|
// Copyright (C) 2023 Matthew Waters <matthew@centricular.com>
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
|
// <https://mozilla.org/MPL/2.0/>.
|
|
//
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
use std::sync::{atomic::AtomicUsize, Arc, Mutex};
|
|
|
|
use gst::{prelude::*, Caps};
|
|
use gst_check::Harness;
|
|
use rtp_types::*;
|
|
|
|
static ELEMENT_COUNTER: AtomicUsize = AtomicUsize::new(0);
|
|
|
|
fn next_element_counter() -> usize {
|
|
ELEMENT_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst)
|
|
}
|
|
|
|
fn init() {
|
|
use std::sync::Once;
|
|
static INIT: Once = Once::new();
|
|
|
|
INIT.call_once(|| {
|
|
gst::init().unwrap();
|
|
gstrsrtp::plugin_register_static().expect("rtpbin2 test");
|
|
});
|
|
}
|
|
|
|
const TEST_SSRC: u32 = 0x12345678;
|
|
const TEST_PT: u8 = 96;
|
|
const TEST_CLOCK_RATE: u32 = 48000;
|
|
|
|
fn generate_rtp_buffer(seqno: u16, rtpts: u32, payload_len: usize) -> gst::Buffer {
|
|
let payload = vec![4; payload_len];
|
|
let packet = RtpPacketBuilder::new()
|
|
.ssrc(TEST_SSRC)
|
|
.payload_type(TEST_PT)
|
|
.sequence_number(seqno)
|
|
.timestamp(rtpts)
|
|
.payload(payload.as_slice());
|
|
let size = packet.calculate_size().unwrap();
|
|
let mut data = vec![0; size];
|
|
packet.write_into(&mut data).unwrap();
|
|
gst::Buffer::from_mut_slice(data)
|
|
}
|
|
|
|
#[test]
|
|
fn test_send() {
|
|
init();
|
|
let id = next_element_counter();
|
|
|
|
let elem = gst::ElementFactory::make("rtpsend")
|
|
.property("rtp-id", id.to_string())
|
|
.build()
|
|
.unwrap();
|
|
let mut h = Harness::with_element(&elem, Some("rtp_sink_0"), Some("rtp_src_0"));
|
|
h.play();
|
|
|
|
let caps = Caps::builder("application/x-rtp")
|
|
.field("media", "audio")
|
|
.field("payload", TEST_PT as i32)
|
|
.field("clock-rate", TEST_CLOCK_RATE as i32)
|
|
.field("encoding-name", "custom-test")
|
|
.build();
|
|
h.set_src_caps(caps);
|
|
|
|
h.push(generate_rtp_buffer(500, 20, 9)).unwrap();
|
|
h.push(generate_rtp_buffer(501, 30, 11)).unwrap();
|
|
|
|
let buffer = h.pull().unwrap();
|
|
let mapped = buffer.map_readable().unwrap();
|
|
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
|
|
assert_eq!(rtp.sequence_number(), 500);
|
|
|
|
let buffer = h.pull().unwrap();
|
|
let mapped = buffer.map_readable().unwrap();
|
|
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
|
|
assert_eq!(rtp.sequence_number(), 501);
|
|
|
|
let stats = h.element().unwrap().property::<gst::Structure>("stats");
|
|
|
|
let session_stats = stats.get::<gst::Structure>("0").unwrap();
|
|
let source_stats = session_stats
|
|
.get::<gst::Structure>(TEST_SSRC.to_string())
|
|
.unwrap();
|
|
assert_eq!(source_stats.get::<u32>("ssrc").unwrap(), TEST_SSRC);
|
|
assert_eq!(
|
|
source_stats.get::<u32>("clock-rate").unwrap(),
|
|
TEST_CLOCK_RATE
|
|
);
|
|
assert!(source_stats.get::<bool>("sender").unwrap());
|
|
assert!(source_stats.get::<bool>("local").unwrap());
|
|
assert_eq!(source_stats.get::<u64>("packets-sent").unwrap(), 2);
|
|
assert_eq!(source_stats.get::<u64>("octets-sent").unwrap(), 20);
|
|
}
|
|
|
|
#[test]
|
|
fn test_receive() {
|
|
init();
|
|
let id = next_element_counter();
|
|
|
|
let elem = gst::ElementFactory::make("rtprecv")
|
|
.property("rtp-id", id.to_string())
|
|
.build()
|
|
.unwrap();
|
|
let h = Arc::new(Mutex::new(Harness::with_element(
|
|
&elem,
|
|
Some("rtp_sink_0"),
|
|
None,
|
|
)));
|
|
let weak_h = Arc::downgrade(&h);
|
|
let mut inner = h.lock().unwrap();
|
|
inner
|
|
.element()
|
|
.unwrap()
|
|
.connect_pad_added(move |_elem, pad| {
|
|
weak_h
|
|
.upgrade()
|
|
.unwrap()
|
|
.lock()
|
|
.unwrap()
|
|
.add_element_src_pad(pad)
|
|
});
|
|
inner.play();
|
|
|
|
let caps = Caps::builder("application/x-rtp")
|
|
.field("media", "audio")
|
|
.field("payload", TEST_PT as i32)
|
|
.field("clock-rate", TEST_CLOCK_RATE as i32)
|
|
.field("encoding-name", "custom-test")
|
|
.build();
|
|
inner.set_src_caps(caps);
|
|
|
|
// Cannot push with harness lock as the 'pad-added' handler needs to add the newly created pad to
|
|
// the harness and needs to also take the harness lock. Workaround by pushing from the
|
|
// internal harness pad directly.
|
|
let push_pad = inner
|
|
.element()
|
|
.unwrap()
|
|
.static_pad("rtp_sink_0")
|
|
.unwrap()
|
|
.peer()
|
|
.unwrap();
|
|
drop(inner);
|
|
push_pad.push(generate_rtp_buffer(500, 20, 9)).unwrap();
|
|
push_pad.push(generate_rtp_buffer(501, 30, 11)).unwrap();
|
|
let mut inner = h.lock().unwrap();
|
|
|
|
let buffer = inner.pull().unwrap();
|
|
let mapped = buffer.map_readable().unwrap();
|
|
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
|
|
assert_eq!(rtp.sequence_number(), 500);
|
|
|
|
let buffer = inner.pull().unwrap();
|
|
let mapped = buffer.map_readable().unwrap();
|
|
let rtp = rtp_types::RtpPacket::parse(&mapped).unwrap();
|
|
assert_eq!(rtp.sequence_number(), 501);
|
|
|
|
let stats = inner.element().unwrap().property::<gst::Structure>("stats");
|
|
|
|
let session_stats = stats.get::<gst::Structure>("0").unwrap();
|
|
let source_stats = session_stats
|
|
.get::<gst::Structure>(TEST_SSRC.to_string())
|
|
.unwrap();
|
|
let jitterbuffers_stats = session_stats
|
|
.get::<gst::List>("jitterbuffer-stats")
|
|
.unwrap();
|
|
assert_eq!(jitterbuffers_stats.len(), 1);
|
|
let jitterbuffer_stats = jitterbuffers_stats
|
|
.first()
|
|
.unwrap()
|
|
.get::<gst::Structure>()
|
|
.unwrap();
|
|
assert_eq!(source_stats.get::<u32>("ssrc").unwrap(), TEST_SSRC);
|
|
assert_eq!(
|
|
source_stats.get::<u32>("clock-rate").unwrap(),
|
|
TEST_CLOCK_RATE
|
|
);
|
|
assert!(source_stats.get::<bool>("sender").unwrap());
|
|
assert!(!source_stats.get::<bool>("local").unwrap());
|
|
assert_eq!(source_stats.get::<u64>("packets-received").unwrap(), 2);
|
|
assert_eq!(source_stats.get::<u64>("octets-received").unwrap(), 20);
|
|
assert_eq!(jitterbuffer_stats.get::<u64>("num-late").unwrap(), 0);
|
|
assert_eq!(jitterbuffer_stats.get::<u64>("num-lost").unwrap(), 0);
|
|
assert_eq!(jitterbuffer_stats.get::<u64>("num-duplicates").unwrap(), 0);
|
|
assert_eq!(jitterbuffer_stats.get::<u64>("num-pushed").unwrap(), 2);
|
|
assert_eq!(jitterbuffer_stats.get::<i32>("pt").unwrap(), TEST_PT as i32);
|
|
assert_eq!(
|
|
jitterbuffer_stats.get::<i32>("ssrc").unwrap(),
|
|
TEST_SSRC as i32
|
|
);
|
|
}
|
|
|
|
#[test]
|
|
fn test_receive_flush() {
|
|
init();
|
|
let id = next_element_counter();
|
|
|
|
let elem = gst::ElementFactory::make("rtprecv")
|
|
.property("rtp-id", id.to_string())
|
|
.build()
|
|
.unwrap();
|
|
let h = Arc::new(Mutex::new(Harness::with_element(
|
|
&elem,
|
|
Some("rtp_sink_0"),
|
|
None,
|
|
)));
|
|
let weak_h = Arc::downgrade(&h);
|
|
let mut inner = h.lock().unwrap();
|
|
inner
|
|
.element()
|
|
.unwrap()
|
|
.connect_pad_added(move |_elem, pad| {
|
|
weak_h
|
|
.upgrade()
|
|
.unwrap()
|
|
.lock()
|
|
.unwrap()
|
|
.add_element_src_pad(pad)
|
|
});
|
|
inner.play();
|
|
|
|
let caps = Caps::builder("application/x-rtp")
|
|
.field("media", "audio")
|
|
.field("payload", TEST_PT as i32)
|
|
.field("clock-rate", TEST_CLOCK_RATE as i32)
|
|
.field("encoding-name", "custom-test")
|
|
.build();
|
|
inner.set_src_caps(caps);
|
|
|
|
// Cannot push with harness lock as the 'pad-added' handler needs to add the newly created pad to
|
|
// the harness and needs to also take the harness lock. Workaround by pushing from the
|
|
// internal harness pad directly.
|
|
let push_pad = inner
|
|
.element()
|
|
.unwrap()
|
|
.static_pad("rtp_sink_0")
|
|
.unwrap()
|
|
.peer()
|
|
.unwrap();
|
|
drop(inner);
|
|
push_pad.push(generate_rtp_buffer(500, 20, 9)).unwrap();
|
|
push_pad.push(generate_rtp_buffer(501, 30, 11)).unwrap();
|
|
let mut inner = h.lock().unwrap();
|
|
let seqnum = gst::Seqnum::next();
|
|
inner.push_event(gst::event::FlushStart::builder().seqnum(seqnum).build());
|
|
inner.push_event(gst::event::FlushStop::builder(false).seqnum(seqnum).build());
|
|
|
|
let event = inner.pull_event().unwrap();
|
|
let gst::EventView::FlushStart(fs) = event.view() else {
|
|
unreachable!();
|
|
};
|
|
assert_eq!(fs.seqnum(), seqnum);
|
|
let event = inner.pull_event().unwrap();
|
|
let gst::EventView::FlushStop(fs) = event.view() else {
|
|
unreachable!();
|
|
};
|
|
assert_eq!(fs.seqnum(), seqnum);
|
|
}
|