mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-10 11:15:33 +00:00
12400b6b87
And set properties as part of object construction wherever it makes sense.
621 lines
18 KiB
Rust
621 lines
18 KiB
Rust
// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
|
|
//
|
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
|
// <https://mozilla.org/MPL/2.0/>.
|
|
//
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
use gst::prelude::*;
|
|
|
|
use gst_rtp::rtp_buffer::*;
|
|
use gst_rtp::RTPBuffer;
|
|
|
|
use rand::Rng;
|
|
|
|
#[must_use]
|
|
struct RaptorqTest {
|
|
protected_packets: usize,
|
|
repair_packets: usize,
|
|
repair_window: usize,
|
|
symbol_size: usize,
|
|
mtu: usize,
|
|
initial_seq: u16,
|
|
lost_buffers: Vec<usize>,
|
|
swapped_buffers: Vec<usize>,
|
|
input_buffers: usize,
|
|
expect_output_buffers: usize,
|
|
}
|
|
|
|
fn init() {
|
|
use std::sync::Once;
|
|
static INIT: Once = Once::new();
|
|
|
|
INIT.call_once(|| {
|
|
gst::init().unwrap();
|
|
gstraptorq::plugin_register_static().expect("Failed to register raptorqenc plugin");
|
|
});
|
|
}
|
|
|
|
impl RaptorqTest {
|
|
fn new() -> Self {
|
|
init();
|
|
|
|
let enc = gst::ElementFactory::make("raptorqenc").build().unwrap();
|
|
|
|
let protected_packets = enc.property::<u32>("protected-packets") as usize;
|
|
let repair_packets = enc.property::<u32>("repair-packets") as usize;
|
|
let repair_window = enc.property::<u32>("repair-window") as usize;
|
|
let symbol_size = enc.property::<u32>("symbol-size") as usize;
|
|
let mtu = enc.property::<u32>("mtu") as usize;
|
|
|
|
Self {
|
|
protected_packets,
|
|
repair_packets,
|
|
repair_window,
|
|
symbol_size,
|
|
mtu,
|
|
initial_seq: 42,
|
|
lost_buffers: vec![0],
|
|
swapped_buffers: vec![],
|
|
input_buffers: protected_packets,
|
|
expect_output_buffers: protected_packets,
|
|
}
|
|
}
|
|
|
|
fn protected_packets(mut self, protected_packets: usize) -> Self {
|
|
self.protected_packets = protected_packets;
|
|
self
|
|
}
|
|
|
|
fn repair_packets(mut self, repair_packets: usize) -> Self {
|
|
self.repair_packets = repair_packets;
|
|
self
|
|
}
|
|
|
|
fn repair_window(mut self, repair_window: usize) -> Self {
|
|
self.repair_window = repair_window;
|
|
self
|
|
}
|
|
|
|
fn symbol_size(mut self, symbol_size: usize) -> Self {
|
|
self.symbol_size = symbol_size;
|
|
self
|
|
}
|
|
|
|
fn initial_seq(mut self, initial_seq: u16) -> Self {
|
|
self.initial_seq = initial_seq;
|
|
self
|
|
}
|
|
|
|
fn mtu(mut self, mtu: usize) -> Self {
|
|
self.mtu = mtu;
|
|
self
|
|
}
|
|
|
|
fn lost_buffers(mut self, lost_buffers: Vec<usize>) -> Self {
|
|
self.lost_buffers = lost_buffers;
|
|
self
|
|
}
|
|
|
|
fn swapped_buffers(mut self, swapped_buffers: Vec<usize>) -> Self {
|
|
self.swapped_buffers = swapped_buffers;
|
|
self
|
|
}
|
|
|
|
fn input_buffers(mut self, input_buffers: usize) -> Self {
|
|
self.input_buffers = input_buffers;
|
|
self
|
|
}
|
|
|
|
fn expect_output_buffers(mut self, expect_output_buffers: usize) -> Self {
|
|
self.expect_output_buffers = expect_output_buffers;
|
|
self
|
|
}
|
|
|
|
fn run(self) {
|
|
assert!(self.input_buffers >= self.protected_packets);
|
|
|
|
// 1. Decoder Setup:
|
|
let enc = gst::ElementFactory::make("raptorqenc")
|
|
.property("protected-packets", self.protected_packets as u32)
|
|
.property("repair-packets", self.repair_packets as u32)
|
|
.property("repair-window", self.repair_window as u32)
|
|
.property("symbol-size", self.symbol_size as u32)
|
|
.property("mtu", self.mtu as u32)
|
|
.build()
|
|
.unwrap();
|
|
|
|
let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
|
|
let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
|
|
|
|
h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
|
|
|
|
// 2. Decoder Setup:
|
|
let dec = gst::ElementFactory::make("raptorqdec").build().unwrap();
|
|
|
|
let mut h_dec = gst_check::Harness::with_element(&dec, Some("sink"), Some("src"));
|
|
let mut h_dec_fec = gst_check::Harness::with_element(&dec, Some("fec_0"), None);
|
|
|
|
let caps = gst::Caps::builder("application/x-rtp")
|
|
.field("raptor-scheme-id", "6")
|
|
.field("repair-window", "1000000")
|
|
.field("t", self.symbol_size.to_string())
|
|
.build();
|
|
|
|
h_dec.set_src_caps_str("application/x-rtp");
|
|
h_dec_fec.set_src_caps(caps);
|
|
|
|
let mut rng = rand::thread_rng();
|
|
|
|
let input_buffers = (0..self.input_buffers)
|
|
.map(|i| {
|
|
// payload size without RTP Header and ADUI Header
|
|
let size = rng.gen_range(1..self.mtu - 12 - 3);
|
|
let data = (0..size).map(|_| rng.gen()).collect::<Vec<u8>>();
|
|
|
|
let mut buf = gst::Buffer::new_rtp_with_sizes(size as u32, 0, 0).unwrap();
|
|
{
|
|
let buf_mut = buf.get_mut().unwrap();
|
|
buf_mut.set_pts(gst::ClockTime::ZERO);
|
|
buf_mut.set_dts(gst::ClockTime::ZERO);
|
|
|
|
let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
|
|
let payload = rtpbuf.payload_mut().unwrap();
|
|
|
|
payload.copy_from_slice(data.as_slice());
|
|
rtpbuf.set_seq(self.initial_seq.wrapping_add(i as u16));
|
|
rtpbuf.set_timestamp(0);
|
|
}
|
|
|
|
buf
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
// 3. Encoder Operations:
|
|
|
|
// Do not consume buffers here so we can compare it with the output
|
|
for buf in &input_buffers {
|
|
let result = h_enc.push(buf.clone());
|
|
assert!(result.is_ok());
|
|
}
|
|
|
|
assert_eq!(h_enc.buffers_in_queue(), self.input_buffers as u32);
|
|
|
|
let mut media_packets = (0..self.input_buffers)
|
|
.map(|_| {
|
|
let result = h_enc.pull();
|
|
assert!(result.is_ok());
|
|
result.unwrap()
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
// Simulate out of order packets
|
|
for x in self.swapped_buffers.chunks_exact(2) {
|
|
media_packets.swap(x[0], x[1])
|
|
}
|
|
|
|
// Check if repair packets pushed from encoder are delayed properly
|
|
let delay_step = ((self.repair_window / self.repair_packets) as u64).mseconds();
|
|
let mut delay = delay_step;
|
|
|
|
let repair_packets = (0..self.repair_packets)
|
|
.map(|_| {
|
|
// Set time just before the timer to push the buffer fires up,
|
|
// we shouldn't see the buffer just yet.
|
|
h_enc_fec.set_time(delay - gst::ClockTime::NSECOND).unwrap();
|
|
assert_eq!(h_enc_fec.buffers_in_queue(), 0);
|
|
|
|
// Advance time to the delay and crank clock id, we should
|
|
// get a buffer with adjusted timestamps now. All input buffers
|
|
// have zero timestamp, so the pts/dts/rtp-timestamp should be
|
|
// equal to delay.
|
|
h_enc_fec.set_time(delay).unwrap();
|
|
h_enc_fec.crank_single_clock_wait().unwrap();
|
|
|
|
let result = h_enc_fec.pull();
|
|
assert!(result.is_ok());
|
|
|
|
let buf = result.unwrap();
|
|
assert_eq!(buf.pts().unwrap(), delay);
|
|
assert_eq!(buf.dts().unwrap(), delay);
|
|
|
|
let ts = RTPBuffer::from_buffer_readable(&buf).unwrap().timestamp();
|
|
let expected_ts =
|
|
*delay.mul_div_round(*gst::ClockTime::SECOND, 8000).unwrap() as u32;
|
|
|
|
assert_eq!(ts, expected_ts);
|
|
|
|
delay += delay_step;
|
|
buf
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
// 4. Decoder Operations:
|
|
|
|
// remove media packets to simulate packet loss
|
|
let media_packets = media_packets
|
|
.iter()
|
|
.cloned()
|
|
.enumerate()
|
|
.filter(|(i, _)| !self.lost_buffers.contains(i))
|
|
.map(|(_, x)| x)
|
|
.collect::<Vec<_>>();
|
|
|
|
// Push media packets to decoder
|
|
for buf in media_packets {
|
|
assert!(h_dec.push(buf).is_ok());
|
|
}
|
|
|
|
// Push repair packets to decoder
|
|
for buf in repair_packets {
|
|
assert!(h_dec_fec.push(buf).is_ok());
|
|
}
|
|
|
|
// At this point decoder has all the information it needs to
|
|
// recover packets, we just need an input buffer to run sink
|
|
// chain operations.
|
|
let result = h_dec.push(input_buffers.iter().last().unwrap().clone());
|
|
assert!(result.is_ok());
|
|
|
|
let mut output_buffers = (0..self.expect_output_buffers)
|
|
.map(|_| {
|
|
let result = h_dec.pull();
|
|
assert!(result.is_ok());
|
|
result.unwrap()
|
|
})
|
|
.collect::<Vec<_>>();
|
|
|
|
// Output buffers are out of sequence, we should sort it by
|
|
// seqnum so we can compare them with input buffers.
|
|
output_buffers.sort_unstable_by(|a, b| {
|
|
let aa = RTPBuffer::from_buffer_readable(a).unwrap();
|
|
let bb = RTPBuffer::from_buffer_readable(b).unwrap();
|
|
|
|
match gst_rtp::compare_seqnum(bb.seq(), aa.seq()) {
|
|
x if x > 0 => std::cmp::Ordering::Greater,
|
|
x if x < 0 => std::cmp::Ordering::Less,
|
|
_ => std::cmp::Ordering::Equal,
|
|
}
|
|
});
|
|
|
|
assert_eq!(output_buffers.len(), self.expect_output_buffers);
|
|
|
|
if self.input_buffers == self.expect_output_buffers {
|
|
for (inbuf, outbuf) in Iterator::zip(input_buffers.iter(), output_buffers.iter()) {
|
|
let rtp1 = RTPBuffer::from_buffer_readable(inbuf).unwrap();
|
|
let rtp2 = RTPBuffer::from_buffer_readable(outbuf).unwrap();
|
|
|
|
assert_eq!(rtp1.seq(), rtp2.seq());
|
|
assert_eq!(rtp1.payload().unwrap(), rtp2.payload().unwrap());
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_all_default() {
|
|
RaptorqTest::new().run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_decoder_media_packets_out_of_sequence() {
|
|
RaptorqTest::new()
|
|
.swapped_buffers(vec![5, 10, 12, 15])
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_10_percent_overhead() {
|
|
RaptorqTest::new()
|
|
.protected_packets(100)
|
|
.repair_packets(10)
|
|
.lost_buffers(vec![4, 42, 43, 44, 45])
|
|
.input_buffers(100)
|
|
.expect_output_buffers(100)
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_5_percent_overhead() {
|
|
RaptorqTest::new()
|
|
.protected_packets(100)
|
|
.repair_packets(5)
|
|
.input_buffers(100)
|
|
.lost_buffers(vec![8, 11])
|
|
.expect_output_buffers(100)
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_symbol_size_128() {
|
|
RaptorqTest::new()
|
|
.protected_packets(20)
|
|
.repair_packets(4)
|
|
.symbol_size(128)
|
|
.mtu(400)
|
|
.input_buffers(20)
|
|
.lost_buffers(vec![9])
|
|
.expect_output_buffers(20)
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_symbol_size_192() {
|
|
RaptorqTest::new()
|
|
.protected_packets(20)
|
|
.repair_packets(4)
|
|
.symbol_size(192)
|
|
.mtu(999)
|
|
.input_buffers(20)
|
|
.lost_buffers(vec![16, 19])
|
|
.expect_output_buffers(20)
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_symbol_size_1024() {
|
|
RaptorqTest::new()
|
|
.protected_packets(20)
|
|
.repair_packets(8)
|
|
.symbol_size(192)
|
|
.mtu(100)
|
|
.input_buffers(20)
|
|
.lost_buffers(vec![0, 1, 2, 3, 4, 5])
|
|
.expect_output_buffers(20)
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_mtu_lt_symbol_size() {
|
|
RaptorqTest::new()
|
|
.protected_packets(20)
|
|
.repair_packets(8)
|
|
.symbol_size(1400)
|
|
.mtu(100)
|
|
.input_buffers(20)
|
|
.lost_buffers(vec![14, 15, 16, 17, 18, 19])
|
|
.expect_output_buffers(20)
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_heavy_loss() {
|
|
RaptorqTest::new()
|
|
.protected_packets(40)
|
|
.repair_packets(8)
|
|
.input_buffers(40)
|
|
.lost_buffers(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
|
|
.expect_output_buffers(30)
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_repair_window_100ms() {
|
|
RaptorqTest::new()
|
|
.protected_packets(10)
|
|
.repair_packets(10)
|
|
.repair_window(100)
|
|
.input_buffers(10)
|
|
.lost_buffers(vec![2, 6])
|
|
.expect_output_buffers(10)
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_repair_window_500ms() {
|
|
RaptorqTest::new()
|
|
.protected_packets(8)
|
|
.repair_packets(2)
|
|
.repair_window(500)
|
|
.input_buffers(8)
|
|
.lost_buffers(vec![])
|
|
.expect_output_buffers(8)
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_wrapping_sequence_number_1() {
|
|
RaptorqTest::new().initial_seq(u16::MAX - 5).run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_wrapping_sequence_number_2() {
|
|
RaptorqTest::new()
|
|
.initial_seq(u16::MAX - 5)
|
|
.swapped_buffers(vec![4, 5])
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_wrapping_sequence_number_3() {
|
|
RaptorqTest::new()
|
|
.initial_seq(u16::MAX - 3)
|
|
.lost_buffers(vec![0, 1, 2, 8])
|
|
.run();
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_encoder_flush_cancels_pending_timers() {
|
|
init();
|
|
|
|
let enc = gst::ElementFactory::make("raptorqenc")
|
|
// Set delay to 5s, this way each buffer should be delayed by 1s
|
|
.property("repair-window", 5000u32)
|
|
.property("protected-packets", 5u32)
|
|
.property("repair-packets", 5u32)
|
|
.build()
|
|
.unwrap();
|
|
|
|
let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
|
|
let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
|
|
|
|
h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
|
|
|
|
for i in 0u64..5 {
|
|
let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
|
|
|
|
let buf_mut = buf.get_mut().unwrap();
|
|
buf_mut.set_pts(gst::ClockTime::SECOND * i);
|
|
|
|
let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
|
|
rtpbuf.set_seq(i as u16);
|
|
|
|
drop(rtpbuf);
|
|
|
|
let result = h_enc.push(buf);
|
|
assert!(result.is_ok());
|
|
}
|
|
|
|
// We want to check if flush cancels pending timers, last buffer of source
|
|
// block is at 5s, at 6s we should have 1 buffer qeued already, then we flush
|
|
// and move time to 10s. Flush should cancel pending timers and we should
|
|
// have no buffers at the output
|
|
h_enc_fec.set_time(gst::ClockTime::SECOND * 6).unwrap();
|
|
h_enc_fec.crank_single_clock_wait().unwrap();
|
|
|
|
let result = h_enc_fec.pull();
|
|
assert!(result.is_ok());
|
|
|
|
h_enc.push_event(gst::event::FlushStart::new());
|
|
h_enc.push_event(gst::event::FlushStop::new(true));
|
|
|
|
h_enc_fec.set_time(gst::ClockTime::SECOND * 10).unwrap();
|
|
|
|
loop {
|
|
let event = h_enc.pull_event();
|
|
|
|
if let Ok(event) = event {
|
|
match event.view() {
|
|
gst::EventView::FlushStart(_) => {
|
|
continue;
|
|
}
|
|
gst::EventView::FlushStop(_) => {
|
|
break;
|
|
}
|
|
_ => (),
|
|
}
|
|
}
|
|
}
|
|
|
|
assert_eq!(h_enc_fec.buffers_in_queue(), 0);
|
|
assert_eq!(h_enc_fec.testclock().unwrap().peek_id_count(), 0);
|
|
}
|
|
|
|
#[test]
|
|
fn test_raptorq_repair_window_tolerance() {
|
|
init();
|
|
|
|
let enc = gst::ElementFactory::make("raptorqenc")
|
|
// Set delay to 5s, this way each buffer should be delayed by 1s
|
|
.property("repair-window", 1000u32)
|
|
.property("protected-packets", 5u32)
|
|
.property("repair-packets", 5u32)
|
|
.build()
|
|
.unwrap();
|
|
|
|
let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
|
|
let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
|
|
|
|
h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
|
|
|
|
for i in 0u64..5 {
|
|
let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
|
|
|
|
let buf_mut = buf.get_mut().unwrap();
|
|
buf_mut.set_pts(gst::ClockTime::SECOND * i);
|
|
|
|
let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
|
|
rtpbuf.set_seq(i as u16);
|
|
|
|
drop(rtpbuf);
|
|
|
|
let result = h_enc.push(buf);
|
|
assert!(result.is_ok());
|
|
}
|
|
|
|
let dec = gst::ElementFactory::make("raptorqdec")
|
|
.property("repair-window-tolerance", 1000u32)
|
|
.build()
|
|
.unwrap();
|
|
|
|
let mut h_dec = gst_check::Harness::with_element(&dec, Some("sink"), Some("src"));
|
|
let mut h_dec_fec = gst_check::Harness::with_element(&dec, Some("fec_0"), None);
|
|
|
|
let caps = loop {
|
|
let event = h_enc_fec.pull_event();
|
|
|
|
if let Ok(event) = event {
|
|
#[allow(clippy::single_match)]
|
|
match event.view() {
|
|
gst::EventView::Caps(c) => {
|
|
break c.caps_owned();
|
|
}
|
|
_ => (),
|
|
}
|
|
}
|
|
};
|
|
|
|
h_dec.set_src_caps_str("application/x-rtp");
|
|
h_dec_fec.set_src_caps(caps);
|
|
|
|
h_enc_fec.set_time(1.seconds()).unwrap();
|
|
|
|
let result = h_enc.pull();
|
|
assert!(result.is_ok());
|
|
|
|
let buf = result.unwrap();
|
|
let result = h_dec.push(buf);
|
|
assert!(result.is_ok());
|
|
|
|
// Push some of repair packets to decoder, just not enough to recover
|
|
// media packets
|
|
for _ in 0..2 {
|
|
h_enc_fec.crank_single_clock_wait().unwrap();
|
|
|
|
let result = h_enc_fec.pull();
|
|
assert!(result.is_ok());
|
|
|
|
let buf = result.unwrap();
|
|
let result = h_dec_fec.push(buf);
|
|
assert!(result.is_ok());
|
|
}
|
|
|
|
let stats = h_dec.element().unwrap().property::<gst::Structure>("stats");
|
|
assert_eq!(
|
|
stats
|
|
.get::<u64>("buffered-media-packets")
|
|
.expect("type error"),
|
|
1
|
|
);
|
|
assert_eq!(
|
|
stats
|
|
.get::<u64>("buffered-repair-packets")
|
|
.expect("type error"),
|
|
2
|
|
);
|
|
|
|
// Media buffer is way beyond repair window which is 2 seconds,
|
|
// (repair_window (1s) + repair_window_tolerance (1s)),
|
|
// the decoder should drop buffered packets as they were kept for too long.
|
|
let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
|
|
let buf_mut = buf.get_mut().unwrap();
|
|
buf_mut.set_pts(gst::ClockTime::SECOND * 10);
|
|
|
|
let result = h_dec.push(buf);
|
|
assert!(result.is_ok());
|
|
|
|
let stats = h_dec.element().unwrap().property::<gst::Structure>("stats");
|
|
assert_eq!(
|
|
stats
|
|
.get::<u64>("buffered-media-packets")
|
|
.expect("type error"),
|
|
0
|
|
);
|
|
assert_eq!(
|
|
stats
|
|
.get::<u64>("buffered-repair-packets")
|
|
.expect("type error"),
|
|
0
|
|
);
|
|
}
|