Add RaptorQ RTP FEC plugins

This commit is contained in:
Tomasz Andrzejak 2022-02-27 11:30:09 +01:00
parent 02990f8fcc
commit 14160d1d31
13 changed files with 2871 additions and 0 deletions

View file

@ -12,6 +12,7 @@ members = [
"generic/file", "generic/file",
"generic/sodium", "generic/sodium",
"generic/threadshare", "generic/threadshare",
"net/raptorq",
"net/hlssink3", "net/hlssink3",
"net/onvif", "net/onvif",
"net/reqwest", "net/reqwest",
@ -49,6 +50,7 @@ default-members = [
"generic/fmp4", "generic/fmp4",
"generic/threadshare", "generic/threadshare",
"net/onvif", "net/onvif",
"net/raptorq",
"net/reqwest", "net/reqwest",
"net/aws", "net/aws",
"utils/fallbackswitch", "utils/fallbackswitch",

View file

@ -42,6 +42,7 @@ plugins = {
'gst-plugin-flavors': 'libgstrsflv', 'gst-plugin-flavors': 'libgstrsflv',
'gst-plugin-gif': 'libgstgif', 'gst-plugin-gif': 'libgstgif',
'gst-plugin-lewton': 'libgstlewton', 'gst-plugin-lewton': 'libgstlewton',
'gst-plugin-raptorq': 'libgstraptorq',
'gst-plugin-rav1e': 'libgstrav1e', 'gst-plugin-rav1e': 'libgstrav1e',
'gst-plugin-reqwest': 'libgstreqwest', 'gst-plugin-reqwest': 'libgstreqwest',
'gst-plugin-hlssink3': 'libgsthlssink3', 'gst-plugin-hlssink3': 'libgsthlssink3',

45
net/raptorq/Cargo.toml Normal file
View file

@ -0,0 +1,45 @@
[package]
name = "gst-plugin-raptorq"
version = "0.9.0"
authors = ["Tomasz Andrzejak <andreiltd@gmail.com>"]
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
description = "Rust Raptorq FEC Plugin"
license = "MPL-2.0"
edition = "2021"
rust-version = "1.56"
[dependencies]
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
once_cell = "1.0"
raptorq = "1.7"
[dev-dependencies]
gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] }
rand = "0.8"
[lib]
name = "gstraptorq"
crate-type = ["cdylib", "rlib"]
path = "src/lib.rs"
[build-dependencies]
gst-plugin-version-helper = { path="../../version-helper" }
[features]
static = []
capi = []
[package.metadata.capi]
min_version = "0.8.0"
[package.metadata.capi.header]
enabled = false
[package.metadata.capi.library]
install_subdir = "gstreamer-1.0"
versioning = false
[package.metadata.capi.pkg_config]
requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gobject-2.0, glib-2.0, gmodule-2.0"

1
net/raptorq/LICENSE-MPL-2.0 Symbolic link
View file

@ -0,0 +1 @@
../../LICENSE-MPL-2.0

118
net/raptorq/README.md Normal file
View file

@ -0,0 +1,118 @@
## Introduction
This is GStreamer implementation of RaptorQ FEC for RTP streams.
The sender element produces requested number `X` of repair packets from `K` RTP
packets. The receiver only needs:
- `K` of any repair or RTP packets to recover all the data with 99% probability
- `K + 1` of any repair or RTP packets to recover all the data with 99.99%
probability,
- `K + 2` of any repair or RTP packets to recover all the data with 99.9999%
probability etc.
Relevant documents:
- [RFC6363 - Forward Error Correction (FEC) Framework](https://datatracker.ietf.org/doc/html/rfc6363)
- [RFC6681 - Raptor Forward Error Correction (FEC) Schemes for FECFRAME](https://datatracker.ietf.org/doc/html/rfc6681)
- [RFC6682 - RTP Payload Format for Raptor Forward Error Correction (FEC)](https://datatracker.ietf.org/doc/html/rfc6682)
## Sender/Receiver Example
```shell
gst-launch-1.0 \
rtpbin name=rtp fec-encoders='fec,0="raptorqenc\ mtu=1356\ symbol-size=192";' \
uridecodebin uri=file:///path/to/video/file ! x264enc key-int-max=60 tune=zerolatency ! \
queue ! mpegtsmux ! rtpmp2tpay ssrc=0 ! \
rtp.send_rtp_sink_0 rtp.send_rtp_src_0 ! udpsink host=127.0.0.1 port=5000 \
rtp.send_fec_src_0_0 ! udpsink host=127.0.0.1 port=5002 async=false
gst-launch-1.0 \
rtpbin latency=200 fec-decoders='fec,0="raptorqdec";' name=rtp \
udpsrc address=127.0.0.1 port=5002 \
caps="application/x-rtp, payload=96, raptor-scheme-id=(string)6, repair-window=(string)1000000, t=(string)192" ! \
queue ! rtp.recv_fec_sink_0_0 \
udpsrc address=127.0.0.1 port=5000 \
caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=mp2t, payload=33" ! \
queue ! netsim drop-probability=0.05 ! rtp.recv_rtp_sink_0 \
rtp. ! decodebin ! videoconvert ! queue ! autovideosink
```
## Implementation Details
### Encoder Element
The encoder element stores the copy of original RTP packets internally until it
receives the number of packets that are requested to be protected together. At
this point it creates a Source Block that is passed to RaptorQ Encoder. Source
Block is constructed by concatenating ADUIs (Application Data Unit Information)
sometimes also called SPI (Source Packet Information). Each ADUI contains:
- Header with Flow ID - `F(I)` and Length Indication for the packet - `L(I)`,
- UDP payload, this a complete RTP packet with header,
- Padding bytes if required,
```text
T T T T
<----------------><--------------><---------------><---------------->
+----+--------+-----------------------+-----------------------------+
|F[0]| L[0] | ADU[0] | Pad[0] |
+----+--------+----------+------------+-----------------------------+
|F[1]| L[1] | ADU[1] | Pad[1] |
+----+--------+----------+------------------------------------------+
|F[2]| L[2] | ADU[2] |
+----+--------+------+----------------------------------------------+
|F[3]| L[3] |ADU[3]| Pad[3] |
+----+--------+------+----------------------------------------------+
\_________________________________ ________________________________/
\/
RaptorQ FEC encoding
+-------------------------------------------------------------------+
| Repair 4 |
+-------------------------------------------------------------------+
. .
. .
+-------------------------------------------------------------------+
| Repair 7 |
+-------------------------------------------------------------------+
T - Symbol Size
F - Flow ID
L - Length Indication
ADU - Application Data Unit (RTP packet)
```
Encoder element creates requested number of packets for a given Source Block.
The repair packets are send during `repair-window` which is configurable
parameter. E.g. if encoder element produces 5 repair packets and `repair-window`
is set to 500ms, a first repair packet is send 100ms after the last protected
packet, second at 200ms and the last at `repair-window`.
Each repair packet except the symbols that are required to recover missing
source packets, contains also the information about the Source Block:
- `I` - Initial sequence number of the Source Block,
- `Lp` - ADUI length in symbols,
- `Lb` - Source Block Length in symbols,
### Decoder Element
Decoder element stores the copy of received RTP packets, and push original
packet downstream immediately. If all the RTP packets have been received, the
buffered media packets are dropped. If any packets are missing, the receiver
checks if it has enough buffered media and repair packets to perform decoding.
If that's the case it tries to recover missing packets by building the Source
Block following the same rules as sender, except it skips missing packets and
append repair packets to the block instead.
Because the receiver element does not introduce latency, the recovered packets
are send out of sequence, and it requires a `rtpjitterbuffer` to be chained
downstream. The `rtpjitterbuffer` needs to be configured with high enough
latency.
The receiver to determine which media packets belongs to Source Blocks uses the
information that can be retrieved from any of the repair packets. Then media
packets with Sequence Numbers: `I + Lb/Lp - 1` inclusive, are considered during
building a Source Block.
The receiver uses `repair-window` that is signaled by the sender, and its own
`repair-window-tolerance` parameter to decide for how long it should wait for
the corresponding repair packets before giving up. The wait time is
`repair-window + repair-window-tolerance`.

3
net/raptorq/build.rs Normal file
View file

@ -0,0 +1,3 @@
fn main() {
gst_plugin_version_helper::info()
}

View file

@ -0,0 +1,96 @@
// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
pub const MAX_SOURCE_BLOCK_LEN: usize = 56403;
pub const MAX_ENCODING_SYMBOL_SIZE: usize = 65536;
// RFC6681, section 8.1.1.1
pub const FEC_SCHEME_ID: u32 = 6;
#[derive(Clone, Debug, PartialEq)]
pub struct DataUnitHeader {
pub flow_indication: u8,
pub len_indication: u16,
}
// RFC6881, section 5
impl DataUnitHeader {
pub fn encode(&self) -> [u8; 3] {
let mut bytes: [u8; 3] = [0; 3];
bytes[0] = self.flow_indication;
bytes[1..3].copy_from_slice(&self.len_indication.to_be_bytes());
bytes
}
pub fn decode(bytes: [u8; 3]) -> Self {
Self {
flow_indication: bytes[0],
len_indication: u16::from_be_bytes([bytes[1], bytes[2]]),
}
}
}
// RFC6881, section 8.1.3
#[derive(Clone, Debug, PartialEq)]
pub struct RepairPayloadId {
pub initial_sequence_num: u16,
pub source_block_len: u16,
pub encoding_symbol_id: u32, // 24 bits
}
impl RepairPayloadId {
pub fn encode(&self) -> [u8; 7] {
let mut bytes: [u8; 7] = [0; 7];
bytes[0..2].copy_from_slice(&self.initial_sequence_num.to_be_bytes());
bytes[2..4].copy_from_slice(&self.source_block_len.to_be_bytes());
bytes[4..7].copy_from_slice(&self.encoding_symbol_id.to_be_bytes()[1..4]);
bytes
}
pub fn decode(bytes: [u8; 7]) -> Self {
Self {
initial_sequence_num: u16::from_be_bytes([bytes[0], bytes[1]]),
source_block_len: u16::from_be_bytes([bytes[2], bytes[3]]),
encoding_symbol_id: u32::from_be_bytes([0, bytes[4], bytes[5], bytes[6]]),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_repair_payload_encode() {
let payload_id = RepairPayloadId {
initial_sequence_num: 42,
source_block_len: 43,
encoding_symbol_id: 44,
};
let encoded = payload_id.encode();
assert_eq!(encoded.len(), 7);
let decoded = RepairPayloadId::decode(encoded);
assert_eq!(payload_id, decoded);
}
#[test]
fn test_unit_data_header_encode() {
let header = DataUnitHeader {
flow_indication: 42,
len_indication: 43,
};
let encoded = header.encode();
assert_eq!(encoded.len(), 3);
let decoded = DataUnitHeader::decode(encoded);
assert_eq!(header, decoded);
}
}

33
net/raptorq/src/lib.rs Normal file
View file

@ -0,0 +1,33 @@
// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
#![allow(clippy::non_send_fields_in_send_ty)]
#![doc = include_str!("../README.md")]
use gst::glib;
mod fecscheme;
mod raptorqdec;
mod raptorqenc;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
raptorqdec::register(plugin)?;
raptorqenc::register(plugin)?;
Ok(())
}
gst::plugin_define!(
raptorq,
env!("CARGO_PKG_DESCRIPTION"),
plugin_init,
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
"MPL",
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_NAME"),
env!("CARGO_PKG_REPOSITORY"),
env!("BUILD_REL_DATE")
);

View file

@ -0,0 +1,939 @@
// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use gst::{error_msg, glib};
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_rtp::RTPBuffer;
use once_cell::sync::Lazy;
use std::collections::BTreeMap;
use std::iter;
use std::ops::Range;
use std::sync::Mutex;
use raptorq::{EncodingPacket, ObjectTransmissionInformation, PayloadId, SourceBlockDecoder};
use crate::fecscheme::{self, DataUnitHeader, RepairPayloadId};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"raptorqdec",
gst::DebugColorFlags::empty(),
Some("RTP RaptorQ Decoder"),
)
});
const DEFAULT_REPAIR_WINDOW_TOLERANCE: u32 = 500;
const DEFAULT_MEDIA_PACKETS_RESET_THRESHOLD: u32 = 5000;
#[derive(Debug, Clone, Copy)]
struct Settings {
repair_window_tolerance: u32,
media_packets_reset_threshold: u32,
}
impl Default for Settings {
fn default() -> Self {
Self {
repair_window_tolerance: DEFAULT_REPAIR_WINDOW_TOLERANCE,
media_packets_reset_threshold: DEFAULT_MEDIA_PACKETS_RESET_THRESHOLD,
}
}
}
#[derive(Debug, Default, Clone, Copy)]
struct Stats {
recv: u64,
lost: u64,
recovered: u64,
}
#[derive(Debug, Clone, Copy)]
struct SourceBlockInfo {
initial_seq: u64,
symbols_per_block: u64,
symbols_per_packet: u64,
}
impl SourceBlockInfo {
fn seq_range(&self) -> Range<u64> {
// RFC 6881, section 8.2.2
let i = self.initial_seq;
let lp = self.symbols_per_packet;
let lb = self.symbols_per_block;
i..i + lb / lp
}
fn packets_num(&self) -> usize {
(self.symbols_per_block / self.symbols_per_packet) as usize
}
}
#[derive(Debug, Clone)]
struct RepairPacketItem {
payload_id: RepairPayloadId,
payload: Vec<u8>,
}
#[derive(Debug, Clone)]
struct MediaPacketItem {
header: DataUnitHeader,
payload: Vec<u8>,
}
#[derive(Default)]
struct State {
media_packets: BTreeMap<u64, MediaPacketItem>,
repair_packets: BTreeMap<u64, Vec<RepairPacketItem>>,
expirations: BTreeMap<u64, Option<gst::ClockTime>>,
source_block_info: BTreeMap<u64, SourceBlockInfo>,
extended_media_seq: Option<u64>,
extended_repair_seq: Option<u64>,
symbol_size: usize,
media_packets_reset_threshold: usize,
repair_window: Option<gst::ClockTime>,
max_arrival_time: Option<gst::ClockTime>,
stats: Stats,
}
impl State {
fn drop_source_block(&mut self, seq: u64) {
if let Some(info) = self.source_block_info.get(&seq) {
let (seq_lo, seq_hi) = (info.seq_range().start, info.seq_range().end);
self.media_packets.retain(|&k, _| k >= seq_hi);
self.repair_packets.remove(&seq_lo);
self.source_block_info.remove(&seq_lo);
self.expirations.remove(&seq_lo);
}
}
fn expire_packets(&mut self) -> Vec<u64> {
let expired = self
.expirations
.iter()
.filter_map(|(&seq, &expiration)| {
if self.max_arrival_time.opt_gt(expiration) == Some(true) {
Some(seq)
} else {
None
}
})
.collect::<Vec<_>>();
for seq in &expired {
self.drop_source_block(*seq);
}
expired
}
}
pub struct RaptorqDec {
sinkpad: gst::Pad,
srcpad: gst::Pad,
sinkpad_fec: Mutex<Option<gst::Pad>>,
settings: Mutex<Settings>,
state: Mutex<State>,
}
impl RaptorqDec {
fn process_source_block(
&self,
element: &super::RaptorqDec,
state: &mut State,
) -> Result<gst::FlowSuccess, gst::FlowError> {
// Pull the information about the current Source Block from sequence.
// Data packets for current Source Block are in range: info.seq_range(),
// Repair Packets on the other hand, for a given block share the same key
// in the sequence, which is the lower seq bound of Data Packets.
let source_block_info = state
.source_block_info
.values()
.cloned()
.collect::<Vec<_>>();
for info in source_block_info {
let (seq_lo, seq_hi) = (info.seq_range().start, info.seq_range().end);
let data_packets_num = state.media_packets.range(seq_lo..seq_hi).count();
let n = info.packets_num();
if data_packets_num == n {
gst::trace!(
CAT,
obj: element,
"All packets ({}) received, dropping Source Block ({})",
data_packets_num,
seq_lo
);
state.drop_source_block(seq_lo);
continue;
}
let repair_packets_num = state.repair_packets.entry(seq_lo).or_default().len();
// Wait until we have enough Symbols to start decoding a Block
if data_packets_num + repair_packets_num < n {
continue;
}
// Build Source Block from received Data Packets and append
// Repair Packets that have the same initial sequnce number
let mut source_block = Vec::with_capacity(
(data_packets_num + repair_packets_num)
.checked_mul(state.symbol_size)
.ok_or(gst::FlowError::NotSupported)?
.checked_mul(info.symbols_per_packet as usize)
.ok_or(gst::FlowError::NotSupported)?,
);
source_block.extend(
Iterator::chain(
state
.media_packets
.range(seq_lo..seq_hi)
.map(|(_, packet)| {
let si = info.symbols_per_packet as usize;
let mut data = vec![0; si * state.symbol_size];
assert!(data.len() >= packet.payload.len() + 3);
data[0..3].copy_from_slice(&packet.header.encode());
data[3..3 + packet.payload.len()].copy_from_slice(&packet.payload);
data
}),
state
.repair_packets
.entry(seq_lo)
.or_default()
.iter()
.map(|packet| packet.payload.to_owned()),
)
.flatten(),
);
// RFC 6881, section 8.2.2
let esi = Iterator::chain(
state
.media_packets
.range(seq_lo..seq_hi)
.flat_map(|(seq, _)| {
let i = (seq - seq_lo) * info.symbols_per_packet;
(i..i + info.symbols_per_packet).collect::<Vec<_>>()
}),
state
.repair_packets
.entry(seq_lo)
.or_default()
.iter()
.flat_map(|packet| {
let i = packet.payload_id.encoding_symbol_id as u64;
(i..i + info.symbols_per_packet).collect::<Vec<_>>()
}),
)
.collect::<Vec<_>>();
let symbolsz = state.symbol_size as u64;
let blocksz = info.symbols_per_block * symbolsz;
let config = ObjectTransmissionInformation::new(0, symbolsz as u16, 1, 1, 8);
let mut decoder = SourceBlockDecoder::new2(0, &config, blocksz);
let mut result = None;
for (esi, symbol) in
Iterator::zip(esi.iter(), source_block.chunks_exact(state.symbol_size))
{
let payload_id = PayloadId::new(0, *esi as u32);
let encoding_packet = EncodingPacket::new(payload_id, symbol.to_vec());
result = decoder.decode(iter::once(encoding_packet));
if result.is_some() {
break;
}
}
if let Some(data) = result {
// Find missing packets in the Source Block
let missing_indices = (seq_lo..seq_hi)
.filter_map(|seq| match state.media_packets.contains_key(&seq) {
false => Some((seq - seq_lo) as usize),
true => None,
})
.collect::<Vec<_>>();
let pktsz = (info.symbols_per_packet * symbolsz) as usize;
let recovered_packets = missing_indices
.iter()
.filter_map(|i| {
let packet = &data[i * pktsz..];
let header = packet[0..3].try_into().ok()?;
let len = DataUnitHeader::decode(header).len_indication as usize;
// Length indication does not account for Unit Header and RTP header
if packet.len() >= len + 3 + 12 {
let data_unit = packet[3..len + 12 + 3].to_owned();
let mut buf = gst::Buffer::from_slice(data_unit);
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_dts(state.max_arrival_time);
return Some(buf);
}
None
})
.collect::<Vec<_>>();
state.drop_source_block(seq_lo);
state.stats.lost += missing_indices.len() as u64;
for packet in recovered_packets {
{
let rtpbuf = RTPBuffer::from_buffer_readable(&packet).unwrap();
gst::debug!(
CAT,
obj: element,
"Succesfully recovered packet: seqnum: {}, len: {}, ts: {}",
rtpbuf.seq(),
rtpbuf.payload_size(),
rtpbuf.timestamp(),
);
}
state.stats.recovered += 1;
self.srcpad.push(packet)?;
}
}
}
Ok(gst::FlowSuccess::Ok)
}
fn store_media_packet(
&self,
element: &super::RaptorqDec,
state: &mut State,
buffer: &gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let this_seq = {
let rtpbuf = RTPBuffer::from_buffer_readable(buffer).map_err(|err| {
gst::error!(CAT, obj: element, "Failed to map rtp buffer : {}", err);
gst::FlowError::Error
})?;
gst::trace!(
CAT,
obj: element,
"New data packet, seq {}, ts {}",
rtpbuf.seq(),
rtpbuf.timestamp()
);
// Expand cyclic sequence numbers to u64, start from u16::MAX so we
// never overflow substraction.
let seq = rtpbuf.seq();
let prev_seq = state.extended_media_seq.unwrap_or(65_535 + seq as u64);
let delta = gst_rtp::compare_seqnum(prev_seq as u16, seq);
match delta.is_negative() {
true => prev_seq - delta.unsigned_abs() as u64,
false => prev_seq + delta.unsigned_abs() as u64,
}
};
let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?;
// As defined in RFC6881, section 8.2.4, length indication
// should be equal to UDP packet length without RTP header.
let header = DataUnitHeader {
flow_indication: 0,
len_indication: buffer.size() as u16 - 12,
};
state.media_packets.insert(
this_seq,
MediaPacketItem {
header,
payload: map.to_vec(),
},
);
state.stats.recv += 1;
state.extended_media_seq = Some(this_seq);
let now = buffer.dts_or_pts();
state.max_arrival_time = state.max_arrival_time.opt_max(now).or(now);
Ok(gst::FlowSuccess::Ok)
}
fn sink_chain(
&self,
_pad: &gst::Pad,
element: &super::RaptorqDec,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
self.store_media_packet(element, &mut state, &buffer)?;
// Retire the packets that have been around for too long
let expired = state.expire_packets();
for seq in expired {
gst::trace!(
CAT,
obj: element,
"Source Block ({}) dropped, because max wait time has been exceeded",
seq as u16
);
}
// This is the fuse to make sure we are not growing RTP storage indefinitely.
let thresh = state.media_packets_reset_threshold;
if thresh > 0 && state.media_packets.len() >= thresh {
gst::warning!(
CAT,
obj: element,
"Too many buffered media packets, resetting decoder. This might \
be because we haven't received a repair packet for too long, or \
repair packets have no valid timestamps.",
);
self.reset(element);
}
self.process_source_block(element, &mut state)?;
drop(state);
self.srcpad.push(buffer)
}
fn fec_sink_chain(
&self,
_pad: &gst::Pad,
element: &super::RaptorqDec,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let rtpbuf = RTPBuffer::from_buffer_readable(&buffer).map_err(|err| {
gst::error!(CAT, obj: element, "Failed to map rtp buffer : {}", err);
gst::FlowError::Error
})?;
let payload = rtpbuf.payload().unwrap();
let payload_id = payload[0..7].try_into().map_err(|err| {
gst::error!(CAT, obj: element, "Unexpected rtp fec payload : {}", err);
gst::FlowError::Error
})?;
let mut state = self.state.lock().unwrap();
let id = RepairPayloadId::decode(payload_id);
let i = id.initial_sequence_num;
let lb = id.source_block_len as u64;
let lp = ((payload.len() - payload_id.len()) / state.symbol_size) as u64;
gst::trace!(
CAT,
obj: element,
"New repair packet, I: {}, LP: {}, LB: {}",
i,
lp,
lb,
);
// Expand cyclic sequence numbers to u64, start from u16::MAX so we
// never overflow substraction.
let prev_seq = state.extended_repair_seq.unwrap_or(65_535 + i as u64);
let delta = gst_rtp::compare_seqnum(prev_seq as u16, i);
let this_seq = match delta.is_negative() {
true => prev_seq - delta.unsigned_abs() as u64,
false => prev_seq + delta.unsigned_abs() as u64,
};
state.extended_repair_seq = Some(this_seq);
let expire_at = state.max_arrival_time.opt_add(state.repair_window);
let scheduled = state.expirations.entry(this_seq).or_insert(expire_at);
// Update already scheduled expiration if a new value happens to be earlier
*scheduled = scheduled.opt_min(expire_at);
state
.source_block_info
.entry(this_seq)
.or_insert(SourceBlockInfo {
initial_seq: this_seq,
symbols_per_block: lb,
symbols_per_packet: lp,
});
state
.repair_packets
.entry(this_seq)
.or_insert_with(Vec::new)
.push(RepairPacketItem {
payload_id: id,
payload: payload[7..].to_vec(), // without PayloadId
});
assert_eq!(state.repair_packets.len(), state.source_block_info.len());
assert_eq!(state.repair_packets.len(), state.expirations.len());
Ok(gst::FlowSuccess::Ok)
}
fn sink_event(&self, pad: &gst::Pad, element: &super::RaptorqDec, event: gst::Event) -> bool {
gst::debug!(CAT, "Handling event {:?}", event);
use gst::EventView;
if let EventView::FlushStop(_) = event.view() {
self.reset(element);
}
pad.event_default(Some(element), event)
}
fn fec_sink_event(
&self,
pad: &gst::Pad,
element: &super::RaptorqDec,
event: gst::Event,
) -> bool {
gst::debug!(CAT, "Handling event {:?}", event);
use gst::EventView;
if let EventView::Caps(c) = event.view() {
if let Err(err) = self.start(element, c.caps()) {
gst::element_error!(
element,
gst::CoreError::Event,
["Failed to start raptorqdec {:?}", err]
);
return false;
}
}
pad.event_default(Some(element), event)
}
fn iterate_internal_links(
&self,
pad: &gst::Pad,
_element: &super::RaptorqDec,
) -> gst::Iterator<gst::Pad> {
if pad == &self.srcpad {
gst::Iterator::from_vec(vec![self.sinkpad.clone()])
} else if pad == &self.sinkpad {
gst::Iterator::from_vec(vec![self.srcpad.clone()])
} else {
gst::Iterator::from_vec(vec![])
}
}
fn start(
&self,
element: &super::RaptorqDec,
incaps: &gst::CapsRef,
) -> Result<(), gst::ErrorMessage> {
let symbol_size = fmtp_param_from_caps::<usize>("t", incaps)?;
if symbol_size > fecscheme::MAX_ENCODING_SYMBOL_SIZE {
let details = format!(
"Symbol size exceeds Maximum Encoding Symbol Size: {}",
fecscheme::MAX_ENCODING_SYMBOL_SIZE
);
gst::element_error!(element, gst::CoreError::Failed, [&details]);
return Err(error_msg!(gst::CoreError::Failed, [&details]));
}
let settings = self.settings.lock().unwrap();
let tolerance = settings.repair_window_tolerance as u64;
let repair_window = fmtp_param_from_caps::<u64>("repair-window", incaps)?;
let tolerance = gst::ClockTime::from_mseconds(tolerance);
let repair_window = gst::ClockTime::from_useconds(repair_window);
let repair_window = Some(repair_window + tolerance);
let media_packets_reset_threshold = settings.media_packets_reset_threshold as usize;
gst::debug!(CAT, obj: element, "Configured for caps {}", incaps);
let mut state = self.state.lock().unwrap();
state.symbol_size = symbol_size;
state.repair_window = repair_window;
state.media_packets_reset_threshold = media_packets_reset_threshold;
Ok(())
}
fn stop(&self, element: &super::RaptorqDec) {
self.reset(element);
}
fn reset(&self, _element: &super::RaptorqDec) {
let mut state = self.state.lock().unwrap();
state.media_packets.clear();
state.repair_packets.clear();
state.source_block_info.clear();
state.expirations.clear();
state.extended_media_seq = None;
state.extended_repair_seq = None;
state.max_arrival_time = gst::ClockTime::NONE;
state.stats = Default::default();
}
}
#[glib::object_subclass]
impl ObjectSubclass for RaptorqDec {
const NAME: &'static str = "GstRaptorqDec";
type Type = super::RaptorqDec;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
.chain_function(|pad, parent, buffer| {
Self::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this, element| this.sink_chain(pad, element, buffer),
)
})
.event_function(|pad, parent, event| {
Self::catch_panic_pad_function(
parent,
|| false,
|this, element| this.sink_event(pad, element, event),
)
})
.iterate_internal_links_function(|pad, parent| {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|this, element| this.iterate_internal_links(pad, element),
)
})
.flags(gst::PadFlags::PROXY_CAPS)
.build();
let templ = klass.pad_template("src").unwrap();
let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
.iterate_internal_links_function(|pad, parent| {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|this, element| this.iterate_internal_links(pad, element),
)
})
.flags(gst::PadFlags::PROXY_CAPS)
.build();
Self {
srcpad,
sinkpad,
sinkpad_fec: Mutex::new(None),
settings: Mutex::new(Default::default()),
state: Mutex::new(Default::default()),
}
}
}
impl ObjectImpl for RaptorqDec {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecUInt::new(
"repair-window-tolerance",
"Repair Window Tolerance (ms)",
"The amount of time to add to repair-window reported by RaptorQ encoder (in ms)",
0,
u32::MAX - 1,
DEFAULT_REPAIR_WINDOW_TOLERANCE,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecUInt::new(
"media-packets-reset-threshold",
"Media Packets Reset Threshold",
"This is the maximum allowed number of buffered packets, before we reset the decoder. \
It can only be triggered if we don't receive repair packets for too long, or packets \
have no valid timestamps, (0 - disable)",
0,
u32::MAX - 1,
DEFAULT_MEDIA_PACKETS_RESET_THRESHOLD,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecBoxed::new(
"stats",
"Statistics",
"Various statistics",
gst::Structure::static_type(),
glib::ParamFlags::READABLE,
),
]
});
PROPERTIES.as_ref()
}
fn set_property(
&self,
_obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() {
"repair-window-tolerance" => {
let mut settings = self.settings.lock().unwrap();
let val = value.get().expect("type checked upstream");
settings.repair_window_tolerance = val;
}
"media-packets-reset-threshold" => {
let mut settings = self.settings.lock().unwrap();
let val = value.get().expect("type checked upstream");
settings.media_packets_reset_threshold = val;
}
_ => unimplemented!(),
}
}
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"repair-window-tolerance" => {
let settings = self.settings.lock().unwrap();
settings.repair_window_tolerance.to_value()
}
"media-packets-reset-threshold" => {
let settings = self.settings.lock().unwrap();
settings.media_packets_reset_threshold.to_value()
}
"stats" => {
let state = self.state.lock().unwrap();
let stats = state.stats;
let (media_packets, repair_packets) = (
state.media_packets.len() as u64,
state
.repair_packets
.values()
.fold(0, |acc, x| acc + x.len() as u64),
);
let s = gst::Structure::builder("application/x-rtp-raptorqdec-stats")
.field("received-packets", stats.recv)
.field("lost-packets", stats.lost)
.field("recovered-packets", stats.recovered)
.field("buffered-media-packets", media_packets)
.field("buffered-repair-packets", repair_packets)
.build();
s.to_value()
}
_ => unimplemented!(),
}
}
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
obj.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.srcpad).unwrap();
}
}
impl GstObjectImpl for RaptorqDec {}
impl ElementImpl for RaptorqDec {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"RTP RaptorQ FEC Decoder",
"RTP RaptorQ FEC Decoding",
"Performs FEC using RaptorQ (RFC6681, RFC6682)",
"Tomasz Andrzejak <andreiltd@gmail.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder("application/x-rtp").build();
let srcpad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
let sinkpad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();
let sink_fec_caps = gst::Caps::builder("application/x-rtp")
.field("raptor-scheme-id", fecscheme::FEC_SCHEME_ID.to_string())
// All fmtp paramters from SDP are string in caps, those are
// required parameters that cannot be expressed as string:
// .field("kmax", (string) [1, MAX_SOURCE_BLOCK_LEN])
// .field("t", (string) [1, MAX_ENCODING_SYMBOL_SIZE])
// .field("repair-window", (string) ANY)
.build();
let sinkpad_fec_template = gst::PadTemplate::new(
"fec_%u",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&sink_fec_caps,
)
.unwrap();
vec![srcpad_template, sinkpad_template, sinkpad_fec_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::ReadyToPaused => {
self.reset(element);
}
gst::StateChange::PausedToReady => {
self.stop(element);
}
_ => (),
}
self.parent_change_state(element, transition)
}
fn request_new_pad(
&self,
element: &Self::Type,
templ: &gst::PadTemplate,
name: Option<String>,
_caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
let mut sinkpad_fec_guard = self.sinkpad_fec.lock().unwrap();
if sinkpad_fec_guard.is_some() {
gst::element_error!(
element,
gst::CoreError::Pad,
["Not accepting more than one FEC stream"]
);
return None;
}
let sinkpad_fec = gst::Pad::builder_with_template(templ, name.as_deref())
.chain_function(|pad, parent, buffer| {
Self::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this, element| this.fec_sink_chain(pad, element, buffer),
)
})
.event_function(|pad, parent, event| {
Self::catch_panic_pad_function(
parent,
|| false,
|this, element| this.fec_sink_event(pad, element, event),
)
})
.iterate_internal_links_function(|pad, parent| {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|this, element| this.iterate_internal_links(pad, element),
)
})
.build();
sinkpad_fec.set_active(true).unwrap();
*sinkpad_fec_guard = Some(sinkpad_fec.clone());
drop(sinkpad_fec_guard);
element.add_pad(&sinkpad_fec).unwrap();
Some(sinkpad_fec)
}
fn release_pad(&self, element: &Self::Type, _pad: &gst::Pad) {
let mut pad_guard = self.sinkpad_fec.lock().unwrap();
if let Some(pad) = pad_guard.take() {
drop(pad_guard);
pad.set_active(false).unwrap();
element.remove_pad(&pad).unwrap();
}
}
}
fn fmtp_param_from_caps<T: std::str::FromStr>(
name: &str,
caps: &gst::CapsRef,
) -> Result<T, gst::ErrorMessage>
where
<T as std::str::FromStr>::Err: std::fmt::Debug,
{
caps.structure(0)
.unwrap()
.get::<String>(name)
.map_err(|err| {
error_msg!(
gst::CoreError::Caps,
[
"Could not get \"{}\" param from caps {:?}, err: {:?}",
name,
caps,
err
]
)
})?
.parse::<T>()
.map_err(|err| {
error_msg!(
gst::CoreError::Caps,
[
"Could not parse \"{}\" param from caps {:?}, err: {:?}",
name,
caps,
err
]
)
})
}

View file

@ -0,0 +1,23 @@
// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use gst::glib;
use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct RaptorqDec(ObjectSubclass<imp::RaptorqDec>) @extends gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"raptorqdec",
gst::Rank::Marginal,
RaptorqDec::static_type(),
)
}

View file

@ -0,0 +1,969 @@
// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use gst::{element_error, error_msg, glib, loggable_error};
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_rtp::rtp_buffer::*;
use gst_rtp::RTPBuffer;
use once_cell::sync::Lazy;
use std::collections::HashSet;
use std::sync::{mpsc, Mutex};
use raptorq::{
extended_source_block_symbols, ObjectTransmissionInformation, SourceBlockEncoder,
SourceBlockEncodingPlan,
};
use crate::fecscheme::{self, DataUnitHeader, RepairPayloadId};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"raptorqenc",
gst::DebugColorFlags::empty(),
Some("RTP RaptorQ Encoder"),
)
});
const DEFAULT_PROTECTED_PACKETS: u32 = 25;
const DEFAULT_REPAIR_PACKETS: u32 = 5;
const DEFAULT_REPAIR_WINDOW: u32 = 50;
const DEFAULT_SYMBOL_SIZE: u32 = 1408;
const DEFAULT_MTU: u32 = 1400;
const DEFAULT_PT: u32 = 97;
const SYMBOL_ALIGNMENT: usize = 8;
#[derive(Debug, Clone, Copy)]
struct Settings {
protected_packets: u32,
repair_packets: u32,
repair_window: u32,
symbol_size: u32,
mtu: u32,
pt: u32,
}
impl Default for Settings {
fn default() -> Self {
Self {
protected_packets: DEFAULT_PROTECTED_PACKETS,
repair_packets: DEFAULT_REPAIR_PACKETS,
repair_window: DEFAULT_REPAIR_WINDOW,
symbol_size: DEFAULT_SYMBOL_SIZE,
mtu: DEFAULT_MTU,
pt: DEFAULT_PT,
}
}
}
type BufferTarget = (Option<gst::ClockTime>, gst::Buffer);
type BufferTrigger = (gst::ClockId, gst::Buffer);
enum SrcTaskMsg {
Schedule(BufferTarget),
Timeout(BufferTrigger),
Eos,
}
#[derive(Debug, Clone)]
struct State {
packets: Vec<gst::Buffer>,
seqnums: Vec<u16>,
sender: Option<mpsc::Sender<SrcTaskMsg>>,
segment: gst::FormattedSegment<gst::ClockTime>,
repair_packets_num: usize,
protected_packets_num: usize,
repair_window: usize,
symbol_size: usize,
symbols_per_packet: usize,
symbols_per_block: usize,
mtu: usize,
pt: u8,
seq: u16,
ssrc: u32,
clock_rate: Option<u32>,
info: ObjectTransmissionInformation,
plan: SourceBlockEncodingPlan,
}
pub struct RaptorqEnc {
sinkpad: gst::Pad,
srcpad: gst::Pad,
srcpad_fec: gst::Pad,
settings: Mutex<Settings>,
state: Mutex<Option<State>>,
pending_timers: Mutex<HashSet<gst::ClockId>>,
}
impl RaptorqEnc {
fn process_source_block(
element: &super::RaptorqEnc,
state: &mut State,
now_pts: Option<gst::ClockTime>,
now_dts: Option<gst::ClockTime>,
now_rtpts: u32,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let sender = match &state.sender {
Some(sender) => sender,
None => return Ok(gst::FlowSuccess::Ok),
};
// Build Source Block, RFC6881, section 8.
let mut source_block = Vec::with_capacity(
state
.symbol_size
.checked_mul(state.symbols_per_block)
.ok_or(gst::FlowError::NotSupported)?,
);
source_block.extend(state.packets.iter().flat_map(|packet| {
// As defined in RFC6881, section 8.2.4, length indication
// should be equal to UDP packet length without RTP header.
let li = packet.size() - 12;
// Value of s[i] should be equal to number of repair symbols
// placed in each repair packet.
let si = state.symbols_per_packet;
gst::trace!(
CAT,
obj: element,
"Source Block add ADU: si {}, li {}",
si,
li
);
let mut data = vec![0; si * state.symbol_size];
data[0..3].copy_from_slice(
&DataUnitHeader {
flow_indication: 0,
len_indication: li as u16,
}
.encode(),
);
let packet_map = packet.map_readable().unwrap();
let packet_data = packet_map.as_slice();
data[3..3 + packet.size()].copy_from_slice(packet_data);
data
}));
assert_eq!(
state.symbol_size * state.symbols_per_block,
source_block.len()
);
let encoder =
SourceBlockEncoder::with_encoding_plan2(0, &state.info, &source_block, &state.plan);
let sbl = state.symbols_per_block;
// Initial sequnce number in Repair Payload ID is a sequence number of
// the first packet in the Source Block.
let seq = state.seqnums.first().cloned().unwrap();
// Build FEC packets as defined in RFC6881, section 8.1.3
let repair_symbols = state.repair_packets_num * state.symbols_per_packet;
// Delay step is used to create linearly spaced vector of delays for
// repair packets. All the repair packets are send within repair_window
// span from the fec srcpad thread.
let delay_step = state
.repair_window
.checked_div(state.repair_packets_num)
.unwrap_or(0);
let delays = (1..=state.repair_packets_num)
.map(|n| gst::ClockTime::from_mseconds((n * delay_step) as u64))
.collect::<Vec<_>>();
let base_time = element.base_time();
let running_time = state.segment.to_running_time(now_pts);
for (target_time, repair_packet) in Iterator::zip(
delays
.iter()
.map(|delay| base_time.opt_add(running_time).opt_add(delay)),
encoder
.repair_packets(0, repair_symbols as u32)
.chunks_exact(state.symbols_per_packet)
.enumerate()
.zip(&delays)
.map(|((n, packets), delay)| {
let esi = packets[0].payload_id().encoding_symbol_id();
let payload_id = RepairPayloadId {
initial_sequence_num: seq,
source_block_len: sbl as u16,
encoding_symbol_id: esi,
}
.encode();
let fecsz = payload_id.len() + state.symbol_size * state.symbols_per_packet;
let mut buf = gst::Buffer::new_rtp_with_sizes(fecsz as u32, 0, 0).unwrap();
{
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(now_pts.opt_add(delay));
buf_mut.set_dts(now_dts.opt_add(delay));
let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
rtpbuf.set_payload_type(state.pt);
rtpbuf.set_seq(state.seq);
rtpbuf.set_marker(n == state.repair_packets_num - 1);
if let Some(clock_rate) = state.clock_rate {
let rtpdelay = delay
.mul_div_round(*gst::ClockTime::SECOND, clock_rate as u64)
.unwrap()
.nseconds() as u32;
rtpbuf.set_timestamp(now_rtpts.overflowing_add(rtpdelay).0);
}
state.seq = state.seq.overflowing_add(1).0;
let payload = rtpbuf.payload_mut().unwrap();
payload[0..payload_id.len()].copy_from_slice(&payload_id);
for (n, packet) in packets.iter().enumerate() {
let data = packet.data();
let start = payload_id.len() + n * data.len();
payload[start..start + data.len()].copy_from_slice(data);
}
}
buf
}),
) {
if sender
.send(SrcTaskMsg::Schedule((target_time, repair_packet)))
.is_err()
{
break;
}
}
state.packets.clear();
state.seqnums.clear();
Ok(gst::FlowSuccess::Ok)
}
fn start_task(&self, element: &super::RaptorqEnc) -> Result<(), gst::LoggableError> {
let (sender, receiver) = mpsc::channel();
let mut state_guard = self.state.lock().unwrap();
let state = state_guard.as_mut().unwrap();
state.sender = Some(sender);
drop(state_guard);
let element_weak = element.downgrade();
let pad_weak = self.srcpad_fec.downgrade();
let mut eos = false;
self.srcpad_fec
.start_task(move || {
while let Ok(msg) = receiver.recv() {
let pad = match pad_weak.upgrade() {
Some(pad) => pad,
None => break,
};
let element = match element_weak.upgrade() {
Some(element) => element,
None => break,
};
match msg {
SrcTaskMsg::Timeout((id, buf)) => {
let mut timers = element.imp().pending_timers.lock().unwrap();
let _ = timers.remove(&id);
let push_eos = eos && timers.is_empty();
drop(timers);
if let Err(err) = pad.push(buf) {
gst::element_error!(
element,
gst::CoreError::Pad,
["Failed to push on src FEC pad {:?}", err]
);
break;
}
if push_eos {
pad.push_event(gst::event::Eos::new());
break;
}
}
SrcTaskMsg::Schedule((target, buf)) => {
let target = match target {
Some(target) => target,
None => {
// No target, push buffer immediately
if let Err(err) = pad.push(buf) {
gst::element_error!(
element,
gst::CoreError::Pad,
["Failed to push on src FEC pad {:?}", err]
);
break;
}
continue;
}
};
let clock = match element.clock() {
Some(clock) => clock,
None => {
// No clock provided, push buffer immediately
if let Err(err) = pad.push(buf) {
gst::element_error!(
element,
gst::CoreError::Pad,
["Failed to push on src FEC pad {:?}", err]
);
break;
}
continue;
}
};
let timeout_sender = {
let state_guard = element.imp().state.lock().unwrap();
let state = match state_guard.as_ref() {
Some(state) => state,
None => break,
};
state.sender.as_ref().unwrap().clone()
};
let timeout = clock.new_single_shot_id(target);
let mut timers = element.imp().pending_timers.lock().unwrap();
timers.insert(timeout.clone().into());
timeout
.wait_async(move |_clock, _time, id| {
let id = id.clone();
let _ = timeout_sender.send(SrcTaskMsg::Timeout((id, buf)));
})
.expect("Failed to wait async");
}
SrcTaskMsg::Eos => {
if element.imp().pending_timers.lock().unwrap().is_empty() {
pad.push_event(gst::event::Eos::new());
break;
}
eos = true;
}
}
}
// All senders dropped or error
let pad = match pad_weak.upgrade() {
Some(pad) => pad,
None => return,
};
let _ = pad.pause_task();
})
.map_err(|_| loggable_error!(CAT, "Failed to start pad task"))?;
Ok(())
}
fn src_activatemode(
&self,
_pad: &gst::Pad,
element: &super::RaptorqEnc,
_mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
if active {
self.start_task(element)?;
} else {
// element stop should be called at this point so that all mpsc
// senders used in task are dropped, otherwise channel can deadlock
self.srcpad_fec.stop_task()?;
}
Ok(())
}
fn sink_chain(
&self,
_pad: &gst::Pad,
element: &super::RaptorqEnc,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state_guard = self.state.lock().unwrap();
let state = state_guard.as_mut().ok_or(gst::FlowError::NotNegotiated)?;
if buffer.size() > state.mtu {
gst::error!(CAT, obj: element, "Packet length exceeds configured MTU");
return Err(gst::FlowError::NotSupported);
}
let (curr_seq, now_rtpts) = match RTPBuffer::from_buffer_readable(&buffer) {
Ok(rtpbuf) => (rtpbuf.seq(), rtpbuf.timestamp()),
Err(_) => {
gst::error!(CAT, obj: element, "Mapping to RTP packet failed");
return Err(gst::FlowError::NotSupported);
}
};
if let Some(last_seq) = state.seqnums.last() {
if last_seq.overflowing_add(1).0 != curr_seq {
gst::error!(CAT, obj: element, "Got out of sequence packets");
return Err(gst::FlowError::NotSupported);
}
}
state.packets.push(buffer.clone());
state.seqnums.push(curr_seq);
assert_eq!(state.packets.len(), state.seqnums.len());
if state.packets.len() == state.protected_packets_num as usize {
// We use current buffer timing as a base for repair packets timestamps
let now_pts = buffer.pts();
let now_dts = buffer.dts_or_pts();
Self::process_source_block(element, state, now_pts, now_dts, now_rtpts)?;
}
drop(state_guard);
self.srcpad.push(buffer)
}
fn sink_event(&self, pad: &gst::Pad, element: &super::RaptorqEnc, event: gst::Event) -> bool {
gst::debug!(CAT, "Handling event {:?}", event);
use gst::EventView;
match event.view() {
EventView::FlushStart(_) => {
if let Err(err) = self.stop(element) {
element_error!(
element,
gst::CoreError::Event,
["Failed to stop encoder after flush start {:?}", err]
);
return false;
}
let _ = self.srcpad_fec.set_active(false);
}
EventView::FlushStop(_) => {
if let Err(err) = self.start(element) {
element_error!(
element,
gst::CoreError::Event,
["Failed to start encoder after flush stop {:?}", err]
);
return false;
}
let _ = self.srcpad_fec.set_active(true);
}
EventView::Caps(ev) => {
let caps = ev.caps();
gst::info!(CAT, obj: pad, "Got caps {:?}", caps);
let mut state_guard = self.state.lock().unwrap();
if let Some(state) = state_guard.as_mut() {
let s = caps.structure(0).unwrap();
// We need clock rate to calculate RTP timestamps of
// delayed repair packets.
if let Ok(clock_rate) = s.get::<i32>("clock-rate") {
if clock_rate <= 0 {
element_error!(element, gst::CoreError::Event, ["Invalid clock rate"]);
return false;
}
state.clock_rate = Some(clock_rate as u32);
}
}
}
EventView::Segment(ev) => {
let mut state_guard = self.state.lock().unwrap();
if let Some(state) = state_guard.as_mut() {
let segment = ev.segment().clone();
let segment = match segment.downcast::<gst::ClockTime>() {
Ok(segment) => segment,
Err(_) => {
element_error!(
element,
gst::CoreError::Event,
["Only time segments are supported"]
);
return false;
}
};
state.segment = segment.clone();
// Push stream events on FEC srcpad as well
let pad = &self.srcpad_fec;
let stream_id = pad.create_stream_id(element, Some("fec")).to_string();
let kmax = extended_source_block_symbols(state.symbols_per_block as u32);
let scheme_id = fecscheme::FEC_SCHEME_ID;
// RFC 6682, section 6.1.1
let caps = gst::Caps::builder("application/x-rtp")
.field("payload", state.pt as i32)
.field("ssrc", state.ssrc as i32)
.field("clock-rate", state.clock_rate.unwrap_or(0) as i32)
.field("encoding-name", "RAPTORFEC")
.field("raptor-scheme-id", scheme_id.to_string())
.field("kmax", kmax.to_string())
.field("repair-window", (state.repair_window * 1000).to_string()) // ms -> us
.field("t", state.symbol_size.to_string())
.field("p", "B")
.build();
drop(state_guard);
pad.push_event(gst::event::StreamStart::new(&stream_id));
pad.push_event(gst::event::Caps::new(&caps));
pad.push_event(gst::event::Segment::new(&segment));
}
}
EventView::Eos(_) => {
let mut state_guard = self.state.lock().unwrap();
if let Some(state) = state_guard.as_mut() {
let _ = state.sender.as_ref().unwrap().send(SrcTaskMsg::Eos);
}
}
_ => (),
}
pad.event_default(Some(element), event)
}
fn iterate_internal_links(
&self,
pad: &gst::Pad,
_element: &super::RaptorqEnc,
) -> gst::Iterator<gst::Pad> {
if pad == &self.sinkpad {
gst::Iterator::from_vec(vec![self.srcpad.clone()])
} else if pad == &self.srcpad {
gst::Iterator::from_vec(vec![self.sinkpad.clone()])
} else {
gst::Iterator::from_vec(vec![])
}
}
fn start(&self, element: &super::RaptorqEnc) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
let protected_packets_num = settings.protected_packets as usize;
let repair_packets_num = settings.repair_packets as usize;
let repair_window = settings.repair_window as usize;
let symbol_size = settings.symbol_size as usize;
let mtu = settings.mtu as usize;
let pt = settings.pt as u8;
// this is the number of repair symbols placed in each repair packet,
// it SHALL be the same for all repair packets in a block. This include
// 1 byte of flow indication and 2 bytes of lenght indication as defined
// in RFC6881, section 8.2.4.
let symbols_per_packet = (mtu + 3 + symbol_size - 1) / symbol_size;
let symbols_per_block = symbols_per_packet * protected_packets_num;
if symbol_size.rem_euclid(SYMBOL_ALIGNMENT) != 0 {
let details = format!(
"Symbol size is not multiple of Symbol Alignment {}",
SYMBOL_ALIGNMENT
);
gst::element_error!(element, gst::CoreError::Failed, [&details]);
return Err(error_msg!(gst::CoreError::Failed, [&details]));
}
if symbol_size > fecscheme::MAX_ENCODING_SYMBOL_SIZE {
let details = format!(
"Symbol size exceeds Maximum Encoding Symbol Size: {}",
fecscheme::MAX_ENCODING_SYMBOL_SIZE
);
gst::element_error!(element, gst::CoreError::Failed, [&details]);
return Err(error_msg!(gst::CoreError::Failed, [&details]));
}
if symbols_per_block > fecscheme::MAX_SOURCE_BLOCK_LEN {
let details = format!(
"Source block length exceeds Maximum Source Block Length: {}",
fecscheme::MAX_SOURCE_BLOCK_LEN
);
gst::element_error!(element, gst::CoreError::Failed, [&details]);
return Err(error_msg!(gst::CoreError::Failed, [&details]));
}
gst::info!(
CAT,
obj: element,
"Starting RaptorQ Encoder, Symbols per Block: {}, Symbol Size: {}",
symbols_per_block,
symbol_size
);
let plan = SourceBlockEncodingPlan::generate(symbols_per_block as u16);
let info = ObjectTransmissionInformation::new(0, symbol_size as u16, 1, 1, 8);
let segment = gst::FormattedSegment::<gst::ClockTime>::default();
*self.state.lock().unwrap() = Some(State {
info,
plan,
repair_packets_num,
protected_packets_num,
repair_window,
symbol_size,
symbols_per_packet,
symbols_per_block,
mtu,
pt,
segment,
seq: 0,
ssrc: 0,
packets: Vec::new(),
seqnums: Vec::new(),
clock_rate: None,
sender: None,
});
Ok(())
}
fn stop(&self, _element: &super::RaptorqEnc) -> Result<(), gst::ErrorMessage> {
let mut timers = self.pending_timers.lock().unwrap();
for timer in timers.drain() {
timer.unschedule();
}
// Drop state
let _ = self.state.lock().unwrap().take();
Ok(())
}
}
#[glib::object_subclass]
impl ObjectSubclass for RaptorqEnc {
const NAME: &'static str = "GstRaptorqEnc";
type Type = super::RaptorqEnc;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
.chain_function(|pad, parent, buffer| {
Self::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this, element| this.sink_chain(pad, element, buffer),
)
})
.event_function(|pad, parent, event| {
Self::catch_panic_pad_function(
parent,
|| false,
|this, element| this.sink_event(pad, element, event),
)
})
.iterate_internal_links_function(|pad, parent| {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|this, element| this.iterate_internal_links(pad, element),
)
})
.flags(gst::PadFlags::PROXY_CAPS)
.build();
let templ = klass.pad_template("src").unwrap();
let srcpad = gst::Pad::builder_with_template(&templ, Some("src"))
.iterate_internal_links_function(|pad, parent| {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|this, element| this.iterate_internal_links(pad, element),
)
})
.flags(gst::PadFlags::PROXY_CAPS)
.build();
let templ = klass.pad_template("fec_0").unwrap();
let srcpad_fec = gst::Pad::builder_with_template(&templ, Some("fec_0"))
.activatemode_function(move |pad, parent, mode, active| {
Self::catch_panic_pad_function(
parent,
|| Err(loggable_error!(CAT, "Panic activating src pad with mode")),
|this, element| this.src_activatemode(pad, element, mode, active),
)
})
.iterate_internal_links_function(|pad, parent| {
Self::catch_panic_pad_function(
parent,
|| gst::Iterator::from_vec(vec![]),
|this, element| this.iterate_internal_links(pad, element),
)
})
.build();
Self {
sinkpad,
srcpad,
srcpad_fec,
settings: Mutex::new(Default::default()),
state: Mutex::new(None),
pending_timers: Mutex::new(HashSet::new()),
}
}
}
impl ObjectImpl for RaptorqEnc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecUInt::new(
"protected-packets",
"Protected Packets",
"Number of packets to protect together",
1,
u32::MAX - 1,
DEFAULT_PROTECTED_PACKETS,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecUInt::new(
"repair-packets",
"Repair Packets",
"Number of repair packets per block to send",
1,
u32::MAX - 1,
DEFAULT_REPAIR_PACKETS,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecUInt::new(
"repair-window",
"Repair Window",
"A time span in milliseconds in which repair packets are send",
0,
u32::MAX - 1,
DEFAULT_REPAIR_PACKETS,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecUInt::new(
"symbol-size",
"Symbol Size",
"Size of RaptorQ data unit",
1,
u32::MAX - 1,
DEFAULT_SYMBOL_SIZE,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecUInt::new(
// TODO: maybe change this to max-rtp-packet-size or max-media-packet-size
"mtu",
"MTU",
"Maximum expected packet size",
0,
i32::MAX as u32,
DEFAULT_MTU,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
glib::ParamSpecUInt::new(
"pt",
"Payload Type",
"The payload type of FEC packets",
96,
255,
DEFAULT_PT,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
),
]
});
PROPERTIES.as_ref()
}
fn set_property(
&self,
_obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() {
"protected-packets" => {
let mut settings = self.settings.lock().unwrap();
let protected_packets = value.get().expect("type checked upstream");
settings.protected_packets = protected_packets;
}
"repair-packets" => {
let mut settings = self.settings.lock().unwrap();
let repair_packets = value.get().expect("type checked upstream");
settings.repair_packets = repair_packets;
}
"repair-window" => {
let mut settings = self.settings.lock().unwrap();
let repair_window = value.get().expect("type checked upstream");
settings.repair_window = repair_window;
}
"symbol-size" => {
let mut settings = self.settings.lock().unwrap();
let symbol_size = value.get().expect("type checked upstream");
settings.symbol_size = symbol_size;
}
"mtu" => {
let mut settings = self.settings.lock().unwrap();
let mtu = value.get().expect("type checked upstream");
settings.mtu = mtu;
}
"pt" => {
let mut settings = self.settings.lock().unwrap();
let pt = value.get().expect("type checked upstream");
settings.pt = pt;
}
_ => unimplemented!(),
}
}
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"protected-packets" => {
let settings = self.settings.lock().unwrap();
settings.protected_packets.to_value()
}
"repair-packets" => {
let settings = self.settings.lock().unwrap();
settings.repair_packets.to_value()
}
"repair-window" => {
let settings = self.settings.lock().unwrap();
settings.repair_window.to_value()
}
"symbol-size" => {
let settings = self.settings.lock().unwrap();
settings.symbol_size.to_value()
}
"mtu" => {
let settings = self.settings.lock().unwrap();
settings.mtu.to_value()
}
"pt" => {
let settings = self.settings.lock().unwrap();
settings.pt.to_value()
}
_ => unimplemented!(),
}
}
fn constructed(&self, obj: &Self::Type) {
self.parent_constructed(obj);
obj.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.srcpad).unwrap();
obj.add_pad(&self.srcpad_fec).unwrap();
}
}
impl GstObjectImpl for RaptorqEnc {}
impl ElementImpl for RaptorqEnc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"RTP RaptorQ FEC Encoder",
"RTP RaptorQ FEC Encoding",
"Performs FEC using RaptorQ (RFC6681, RFC6682)",
"Tomasz Andrzejak <andreiltd@gmail.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder("application/x-rtp")
.field("clock-rate", gst::IntRange::new(0, std::i32::MAX))
.build();
let srcpad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
let sinkpad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&caps,
)
.unwrap();
let srcpad_fec_template = gst::PadTemplate::new(
"fec_0",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.unwrap();
vec![srcpad_template, sinkpad_template, srcpad_fec_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
element: &Self::Type,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst::trace!(CAT, obj: element, "Changing state {:?}", transition);
match transition {
gst::StateChange::ReadyToPaused => {
self.start(element).map_err(|_| gst::StateChangeError)?;
}
gst::StateChange::PausedToReady => {
self.stop(element).map_err(|_| gst::StateChangeError)?;
}
_ => (),
}
self.parent_change_state(element, transition)
}
}

View file

@ -0,0 +1,23 @@
// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
use gst::glib;
use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct RaptorqEnc(ObjectSubclass<imp::RaptorqEnc>) @extends gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"raptorqenc",
gst::Rank::Marginal,
RaptorqEnc::static_type(),
)
}

View file

@ -0,0 +1,618 @@
// Copyright (C) 2022 Tomasz Andrzejak <andreiltd@gmail.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::prelude::*;
use gst_rtp::rtp_buffer::*;
use gst_rtp::RTPBuffer;
use rand::Rng;
#[must_use]
struct RaptorqTest {
protected_packets: usize,
repair_packets: usize,
repair_window: usize,
symbol_size: usize,
mtu: usize,
initial_seq: u16,
lost_buffers: Vec<usize>,
swapped_buffers: Vec<usize>,
input_buffers: usize,
expect_output_buffers: usize,
}
fn init() {
use std::sync::Once;
static INIT: Once = Once::new();
INIT.call_once(|| {
gst::init().unwrap();
gstraptorq::plugin_register_static().expect("Failed to register raptorqenc plugin");
});
}
impl RaptorqTest {
fn new() -> Self {
init();
let enc = gst::ElementFactory::make("raptorqenc", None).unwrap();
let protected_packets = enc.property::<u32>("protected-packets") as usize;
let repair_packets = enc.property::<u32>("repair-packets") as usize;
let repair_window = enc.property::<u32>("repair-window") as usize;
let symbol_size = enc.property::<u32>("symbol-size") as usize;
let mtu = enc.property::<u32>("mtu") as usize;
Self {
protected_packets,
repair_packets,
repair_window,
symbol_size,
mtu,
initial_seq: 42,
lost_buffers: vec![0],
swapped_buffers: vec![],
input_buffers: protected_packets,
expect_output_buffers: protected_packets,
}
}
fn protected_packets(mut self, protected_packets: usize) -> Self {
self.protected_packets = protected_packets;
self
}
fn repair_packets(mut self, repair_packets: usize) -> Self {
self.repair_packets = repair_packets;
self
}
fn repair_window(mut self, repair_window: usize) -> Self {
self.repair_window = repair_window;
self
}
fn symbol_size(mut self, symbol_size: usize) -> Self {
self.symbol_size = symbol_size;
self
}
fn initial_seq(mut self, initial_seq: u16) -> Self {
self.initial_seq = initial_seq;
self
}
fn mtu(mut self, mtu: usize) -> Self {
self.mtu = mtu;
self
}
fn lost_buffers(mut self, lost_buffers: Vec<usize>) -> Self {
self.lost_buffers = lost_buffers;
self
}
fn swapped_buffers(mut self, swapped_buffers: Vec<usize>) -> Self {
self.swapped_buffers = swapped_buffers;
self
}
fn input_buffers(mut self, input_buffers: usize) -> Self {
self.input_buffers = input_buffers;
self
}
fn expect_output_buffers(mut self, expect_output_buffers: usize) -> Self {
self.expect_output_buffers = expect_output_buffers;
self
}
fn run(self) {
assert!(self.input_buffers >= self.protected_packets);
// 1. Decoder Setup:
let enc = gst::ElementFactory::make("raptorqenc", None).unwrap();
enc.set_property("protected-packets", self.protected_packets as u32);
enc.set_property("repair-packets", self.repair_packets as u32);
enc.set_property("repair-window", self.repair_window as u32);
enc.set_property("symbol-size", self.symbol_size as u32);
enc.set_property("mtu", self.mtu as u32);
let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
// 2. Decoder Setup:
let dec = gst::ElementFactory::make("raptorqdec", None).unwrap();
let mut h_dec = gst_check::Harness::with_element(&dec, Some("sink"), Some("src"));
let mut h_dec_fec = gst_check::Harness::with_element(&dec, Some("fec_0"), None);
let caps = gst::Caps::builder("application/x-rtp")
.field("raptor-scheme-id", "6")
.field("repair-window", "1000000")
.field("t", self.symbol_size.to_string())
.build();
h_dec.set_src_caps_str("application/x-rtp");
h_dec_fec.set_src_caps(caps);
let mut rng = rand::thread_rng();
let input_buffers = (0..self.input_buffers)
.map(|i| {
// payload size without RTP Header and ADUI Header
let size = rng.gen_range(1..self.mtu - 12 - 3);
let data = (0..size).map(|_| rng.gen()).collect::<Vec<u8>>();
let mut buf = gst::Buffer::new_rtp_with_sizes(size as u32, 0, 0).unwrap();
{
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(gst::ClockTime::ZERO);
buf_mut.set_dts(gst::ClockTime::ZERO);
let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
let payload = rtpbuf.payload_mut().unwrap();
payload.copy_from_slice(data.as_slice());
rtpbuf.set_seq(self.initial_seq.wrapping_add(i as u16));
rtpbuf.set_timestamp(0);
}
buf
})
.collect::<Vec<_>>();
// 3. Encoder Operations:
// Do not consume buffers here so we can compare it with the output
for buf in &input_buffers {
let result = h_enc.push(buf.clone());
assert!(result.is_ok());
}
assert_eq!(h_enc.buffers_in_queue(), self.input_buffers as u32);
let mut media_packets = (0..self.input_buffers)
.map(|_| {
let result = h_enc.pull();
assert!(result.is_ok());
result.unwrap()
})
.collect::<Vec<_>>();
// Simulate out of order packets
for x in self.swapped_buffers.chunks_exact(2) {
media_packets.swap(x[0], x[1])
}
// Check if repair packets pushed from encoder are delayed properly
let delay_step =
gst::ClockTime::from_mseconds((self.repair_window / self.repair_packets) as u64);
let mut delay = delay_step;
let repair_packets = (0..self.repair_packets)
.map(|_| {
// Set time just before the timer to push the buffer fires up,
// we shouldn't see the buffer just yet.
h_enc_fec.set_time(delay - gst::ClockTime::NSECOND).unwrap();
assert_eq!(h_enc_fec.buffers_in_queue(), 0);
// Advance time to the delay and crank clock id, we should
// get a buffer with adjusted timestamps now. All input buffers
// have zero timestamp, so the pts/dts/rtp-timestamp should be
// equal to delay.
h_enc_fec.set_time(delay).unwrap();
h_enc_fec.crank_single_clock_wait().unwrap();
let result = h_enc_fec.pull();
assert!(result.is_ok());
let buf = result.unwrap();
assert_eq!(buf.pts().unwrap(), delay);
assert_eq!(buf.dts().unwrap(), delay);
let ts = RTPBuffer::from_buffer_readable(&buf).unwrap().timestamp();
let expected_ts =
*delay.mul_div_round(*gst::ClockTime::SECOND, 8000).unwrap() as u32;
assert_eq!(ts, expected_ts);
delay += delay_step;
buf
})
.collect::<Vec<_>>();
// 4. Decoder Operations:
// remove media packets to simulate packet loss
let media_packets = media_packets
.iter()
.cloned()
.enumerate()
.filter(|(i, _)| !self.lost_buffers.contains(i))
.map(|(_, x)| x)
.collect::<Vec<_>>();
// Push media packets to decoder
for buf in media_packets {
assert!(h_dec.push(buf).is_ok());
}
// Push repair packets to decoder
for buf in repair_packets {
assert!(h_dec_fec.push(buf).is_ok());
}
// At this point decoder has all the information it needs to
// recover packets, we just need an input buffer to run sink
// chain operations.
let result = h_dec.push(input_buffers.iter().last().unwrap().clone());
assert!(result.is_ok());
let mut output_buffers = (0..self.expect_output_buffers)
.map(|_| {
let result = h_dec.pull();
assert!(result.is_ok());
result.unwrap()
})
.collect::<Vec<_>>();
// Output buffers are out of sequence, we should sort it by
// seqnum so we can compare them with input buffers.
output_buffers.sort_unstable_by(|a, b| {
let aa = RTPBuffer::from_buffer_readable(a).unwrap();
let bb = RTPBuffer::from_buffer_readable(b).unwrap();
match gst_rtp::compare_seqnum(bb.seq(), aa.seq()) {
x if x > 0 => std::cmp::Ordering::Greater,
x if x < 0 => std::cmp::Ordering::Less,
_ => std::cmp::Ordering::Equal,
}
});
assert_eq!(output_buffers.len(), self.expect_output_buffers);
if self.input_buffers == self.expect_output_buffers {
for (inbuf, outbuf) in Iterator::zip(input_buffers.iter(), output_buffers.iter()) {
let rtp1 = RTPBuffer::from_buffer_readable(inbuf).unwrap();
let rtp2 = RTPBuffer::from_buffer_readable(outbuf).unwrap();
assert_eq!(rtp1.seq(), rtp2.seq());
assert_eq!(rtp1.payload().unwrap(), rtp2.payload().unwrap());
}
}
}
}
#[test]
fn test_raptorq_all_default() {
RaptorqTest::new().run();
}
#[test]
fn test_raptorq_decoder_media_packets_out_of_sequence() {
RaptorqTest::new()
.swapped_buffers(vec![5, 10, 12, 15])
.run();
}
#[test]
fn test_raptorq_10_percent_overhead() {
RaptorqTest::new()
.protected_packets(100)
.repair_packets(10)
.lost_buffers(vec![4, 42, 43, 44, 45])
.input_buffers(100)
.expect_output_buffers(100)
.run();
}
#[test]
fn test_raptorq_5_percent_overhead() {
RaptorqTest::new()
.protected_packets(100)
.repair_packets(5)
.input_buffers(100)
.lost_buffers(vec![8, 11])
.expect_output_buffers(100)
.run();
}
#[test]
fn test_raptorq_symbol_size_128() {
RaptorqTest::new()
.protected_packets(20)
.repair_packets(4)
.symbol_size(128)
.mtu(400)
.input_buffers(20)
.lost_buffers(vec![9])
.expect_output_buffers(20)
.run();
}
#[test]
fn test_raptorq_symbol_size_192() {
RaptorqTest::new()
.protected_packets(20)
.repair_packets(4)
.symbol_size(192)
.mtu(999)
.input_buffers(20)
.lost_buffers(vec![16, 19])
.expect_output_buffers(20)
.run();
}
#[test]
fn test_raptorq_symbol_size_1024() {
RaptorqTest::new()
.protected_packets(20)
.repair_packets(8)
.symbol_size(192)
.mtu(100)
.input_buffers(20)
.lost_buffers(vec![0, 1, 2, 3, 4, 5])
.expect_output_buffers(20)
.run();
}
#[test]
fn test_raptorq_mtu_lt_symbol_size() {
RaptorqTest::new()
.protected_packets(20)
.repair_packets(8)
.symbol_size(1400)
.mtu(100)
.input_buffers(20)
.lost_buffers(vec![14, 15, 16, 17, 18, 19])
.expect_output_buffers(20)
.run();
}
#[test]
fn test_raptorq_heavy_loss() {
RaptorqTest::new()
.protected_packets(40)
.repair_packets(8)
.input_buffers(40)
.lost_buffers(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9])
.expect_output_buffers(30)
.run();
}
#[test]
fn test_raptorq_repair_window_100ms() {
RaptorqTest::new()
.protected_packets(10)
.repair_packets(10)
.repair_window(100)
.input_buffers(10)
.lost_buffers(vec![2, 6])
.expect_output_buffers(10)
.run();
}
#[test]
fn test_raptorq_repair_window_500ms() {
RaptorqTest::new()
.protected_packets(8)
.repair_packets(2)
.repair_window(500)
.input_buffers(8)
.lost_buffers(vec![])
.expect_output_buffers(8)
.run();
}
#[test]
fn test_raptorq_wrapping_sequence_number_1() {
RaptorqTest::new().initial_seq(u16::MAX - 5).run();
}
#[test]
fn test_raptorq_wrapping_sequence_number_2() {
RaptorqTest::new()
.initial_seq(u16::MAX - 5)
.swapped_buffers(vec![4, 5])
.run();
}
#[test]
fn test_raptorq_wrapping_sequence_number_3() {
RaptorqTest::new()
.initial_seq(u16::MAX - 3)
.lost_buffers(vec![0, 1, 2, 8])
.run();
}
#[test]
fn test_raptorq_encoder_flush_cancels_pending_timers() {
init();
let enc = gst::ElementFactory::make("raptorqenc", None).unwrap();
// Set delay to 5s, this way each buffer should be delayed by 1s
enc.set_property("repair-window", 5000u32);
enc.set_property("protected-packets", 5u32);
enc.set_property("repair-packets", 5u32);
let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
for i in 0u64..5 {
let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(gst::ClockTime::SECOND * i);
let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
rtpbuf.set_seq(i as u16);
drop(rtpbuf);
let result = h_enc.push(buf);
assert!(result.is_ok());
}
// We want to check if flush cancels pending timers, last buffer of source
// block is at 5s, at 6s we should have 1 buffer qeued already, then we flush
// and move time to 10s. Flush should cancel pending timers and we should
// have no buffers at the output
h_enc_fec.set_time(gst::ClockTime::SECOND * 6).unwrap();
h_enc_fec.crank_single_clock_wait().unwrap();
let result = h_enc_fec.pull();
assert!(result.is_ok());
h_enc.push_event(gst::event::FlushStart::new());
h_enc.push_event(gst::event::FlushStop::new(true));
h_enc_fec.set_time(gst::ClockTime::SECOND * 10).unwrap();
loop {
let event = h_enc.pull_event();
if let Ok(event) = event {
match event.view() {
gst::EventView::FlushStart(_) => {
continue;
}
gst::EventView::FlushStop(_) => {
break;
}
_ => (),
}
}
}
assert_eq!(h_enc_fec.buffers_in_queue(), 0);
assert_eq!(h_enc_fec.testclock().unwrap().peek_id_count(), 0);
}
#[test]
fn test_raptorq_repair_window_tolerance() {
init();
let enc = gst::ElementFactory::make("raptorqenc", None).unwrap();
// Set delay to 5s, this way each buffer should be delayed by 1s
enc.set_property("repair-window", 1000u32);
enc.set_property("protected-packets", 5u32);
enc.set_property("repair-packets", 5u32);
let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src"));
let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0"));
h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000");
for i in 0u64..5 {
let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(gst::ClockTime::SECOND * i);
let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap();
rtpbuf.set_seq(i as u16);
drop(rtpbuf);
let result = h_enc.push(buf);
assert!(result.is_ok());
}
let dec = gst::ElementFactory::make("raptorqdec", None).unwrap();
dec.set_property("repair-window-tolerance", 1000u32);
let mut h_dec = gst_check::Harness::with_element(&dec, Some("sink"), Some("src"));
let mut h_dec_fec = gst_check::Harness::with_element(&dec, Some("fec_0"), None);
let caps = loop {
let event = h_enc_fec.pull_event();
if let Ok(event) = event {
#[allow(clippy::single_match)]
match event.view() {
gst::EventView::Caps(c) => {
break c.caps_owned();
}
_ => (),
}
}
};
h_dec.set_src_caps_str("application/x-rtp");
h_dec_fec.set_src_caps(caps);
h_enc_fec.set_time(gst::ClockTime::from_seconds(1)).unwrap();
let result = h_enc.pull();
assert!(result.is_ok());
let buf = result.unwrap();
let result = h_dec.push(buf);
assert!(result.is_ok());
// Push some of repair packets to decoder, just not enough to recover
// media packets
for _ in 0..2 {
h_enc_fec.crank_single_clock_wait().unwrap();
let result = h_enc_fec.pull();
assert!(result.is_ok());
let buf = result.unwrap();
let result = h_dec_fec.push(buf);
assert!(result.is_ok());
}
let stats = h_dec.element().unwrap().property::<gst::Structure>("stats");
assert_eq!(
stats
.get::<u64>("buffered-media-packets")
.expect("type error"),
1
);
assert_eq!(
stats
.get::<u64>("buffered-repair-packets")
.expect("type error"),
2
);
// Media buffer is way beyond repair window which is 2 seconds,
// (repair_window (1s) + repair_window_tolerance (1s)),
// the decoder should drop buffered packets as they were kept for too long.
let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap();
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(gst::ClockTime::SECOND * 10);
let result = h_dec.push(buf);
assert!(result.is_ok());
let stats = h_dec.element().unwrap().property::<gst::Structure>("stats");
assert_eq!(
stats
.get::<u64>("buffered-media-packets")
.expect("type error"),
0
);
assert_eq!(
stats
.get::<u64>("buffered-repair-packets")
.expect("type error"),
0
);
}