From 14160d1d3128b1fb8da50fb474d23e77e55d657e Mon Sep 17 00:00:00 2001 From: Tomasz Andrzejak Date: Sun, 27 Feb 2022 11:30:09 +0100 Subject: [PATCH] Add RaptorQ RTP FEC plugins --- Cargo.toml | 2 + meson.build | 1 + net/raptorq/Cargo.toml | 45 ++ net/raptorq/LICENSE-MPL-2.0 | 1 + net/raptorq/README.md | 118 ++++ net/raptorq/build.rs | 3 + net/raptorq/src/fecscheme.rs | 96 +++ net/raptorq/src/lib.rs | 33 + net/raptorq/src/raptorqdec/imp.rs | 939 +++++++++++++++++++++++++++++ net/raptorq/src/raptorqdec/mod.rs | 23 + net/raptorq/src/raptorqenc/imp.rs | 969 ++++++++++++++++++++++++++++++ net/raptorq/src/raptorqenc/mod.rs | 23 + net/raptorq/tests/raptorq.rs | 618 +++++++++++++++++++ 13 files changed, 2871 insertions(+) create mode 100644 net/raptorq/Cargo.toml create mode 120000 net/raptorq/LICENSE-MPL-2.0 create mode 100644 net/raptorq/README.md create mode 100644 net/raptorq/build.rs create mode 100644 net/raptorq/src/fecscheme.rs create mode 100644 net/raptorq/src/lib.rs create mode 100644 net/raptorq/src/raptorqdec/imp.rs create mode 100644 net/raptorq/src/raptorqdec/mod.rs create mode 100644 net/raptorq/src/raptorqenc/imp.rs create mode 100644 net/raptorq/src/raptorqenc/mod.rs create mode 100644 net/raptorq/tests/raptorq.rs diff --git a/Cargo.toml b/Cargo.toml index c23bb8abe..3c4f46ac3 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -12,6 +12,7 @@ members = [ "generic/file", "generic/sodium", "generic/threadshare", + "net/raptorq", "net/hlssink3", "net/onvif", "net/reqwest", @@ -49,6 +50,7 @@ default-members = [ "generic/fmp4", "generic/threadshare", "net/onvif", + "net/raptorq", "net/reqwest", "net/aws", "utils/fallbackswitch", diff --git a/meson.build b/meson.build index 702273e33..0e47ef522 100644 --- a/meson.build +++ b/meson.build @@ -42,6 +42,7 @@ plugins = { 'gst-plugin-flavors': 'libgstrsflv', 'gst-plugin-gif': 'libgstgif', 'gst-plugin-lewton': 'libgstlewton', + 'gst-plugin-raptorq': 'libgstraptorq', 'gst-plugin-rav1e': 'libgstrav1e', 'gst-plugin-reqwest': 'libgstreqwest', 'gst-plugin-hlssink3': 'libgsthlssink3', diff --git a/net/raptorq/Cargo.toml b/net/raptorq/Cargo.toml new file mode 100644 index 000000000..f849b9982 --- /dev/null +++ b/net/raptorq/Cargo.toml @@ -0,0 +1,45 @@ +[package] +name = "gst-plugin-raptorq" +version = "0.9.0" +authors = ["Tomasz Andrzejak "] +repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" +description = "Rust Raptorq FEC Plugin" +license = "MPL-2.0" +edition = "2021" +rust-version = "1.56" + +[dependencies] +gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +gst-rtp = { package = "gstreamer-rtp", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" } +once_cell = "1.0" +raptorq = "1.7" + +[dev-dependencies] +gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_18"] } +rand = "0.8" + +[lib] +name = "gstraptorq" +crate-type = ["cdylib", "rlib"] +path = "src/lib.rs" + +[build-dependencies] +gst-plugin-version-helper = { path="../../version-helper" } + +[features] +static = [] +capi = [] + +[package.metadata.capi] +min_version = "0.8.0" + +[package.metadata.capi.header] +enabled = false + +[package.metadata.capi.library] +install_subdir = "gstreamer-1.0" +versioning = false + +[package.metadata.capi.pkg_config] +requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-rtp-1.0, gobject-2.0, glib-2.0, gmodule-2.0" diff --git a/net/raptorq/LICENSE-MPL-2.0 b/net/raptorq/LICENSE-MPL-2.0 new file mode 120000 index 000000000..eb5d24fe9 --- /dev/null +++ b/net/raptorq/LICENSE-MPL-2.0 @@ -0,0 +1 @@ +../../LICENSE-MPL-2.0 \ No newline at end of file diff --git a/net/raptorq/README.md b/net/raptorq/README.md new file mode 100644 index 000000000..0948ddecb --- /dev/null +++ b/net/raptorq/README.md @@ -0,0 +1,118 @@ +## Introduction +This is GStreamer implementation of RaptorQ FEC for RTP streams. + +The sender element produces requested number `X` of repair packets from `K` RTP +packets. The receiver only needs: + +- `K` of any repair or RTP packets to recover all the data with 99% probability +- `K + 1` of any repair or RTP packets to recover all the data with 99.99% + probability, +- `K + 2` of any repair or RTP packets to recover all the data with 99.9999% + probability etc. + +Relevant documents: +- [RFC6363 - Forward Error Correction (FEC) Framework](https://datatracker.ietf.org/doc/html/rfc6363) +- [RFC6681 - Raptor Forward Error Correction (FEC) Schemes for FECFRAME](https://datatracker.ietf.org/doc/html/rfc6681) +- [RFC6682 - RTP Payload Format for Raptor Forward Error Correction (FEC)](https://datatracker.ietf.org/doc/html/rfc6682) + + +## Sender/Receiver Example +```shell + gst-launch-1.0 \ + rtpbin name=rtp fec-encoders='fec,0="raptorqenc\ mtu=1356\ symbol-size=192";' \ + uridecodebin uri=file:///path/to/video/file ! x264enc key-int-max=60 tune=zerolatency ! \ + queue ! mpegtsmux ! rtpmp2tpay ssrc=0 ! \ + rtp.send_rtp_sink_0 rtp.send_rtp_src_0 ! udpsink host=127.0.0.1 port=5000 \ + rtp.send_fec_src_0_0 ! udpsink host=127.0.0.1 port=5002 async=false + + gst-launch-1.0 \ + rtpbin latency=200 fec-decoders='fec,0="raptorqdec";' name=rtp \ + udpsrc address=127.0.0.1 port=5002 \ + caps="application/x-rtp, payload=96, raptor-scheme-id=(string)6, repair-window=(string)1000000, t=(string)192" ! \ + queue ! rtp.recv_fec_sink_0_0 \ + udpsrc address=127.0.0.1 port=5000 \ + caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=mp2t, payload=33" ! \ + queue ! netsim drop-probability=0.05 ! rtp.recv_rtp_sink_0 \ + rtp. ! decodebin ! videoconvert ! queue ! autovideosink +``` + +## Implementation Details + +### Encoder Element +The encoder element stores the copy of original RTP packets internally until it +receives the number of packets that are requested to be protected together. At +this point it creates a Source Block that is passed to RaptorQ Encoder. Source +Block is constructed by concatenating ADUIs (Application Data Unit Information) +sometimes also called SPI (Source Packet Information). Each ADUI contains: + +- Header with Flow ID - `F(I)` and Length Indication for the packet - `L(I)`, +- UDP payload, this a complete RTP packet with header, +- Padding bytes if required, + +```text + T T T T + <----------------><--------------><---------------><----------------> + +----+--------+-----------------------+-----------------------------+ + |F[0]| L[0] | ADU[0] | Pad[0] | + +----+--------+----------+------------+-----------------------------+ + |F[1]| L[1] | ADU[1] | Pad[1] | + +----+--------+----------+------------------------------------------+ + |F[2]| L[2] | ADU[2] | + +----+--------+------+----------------------------------------------+ + |F[3]| L[3] |ADU[3]| Pad[3] | + +----+--------+------+----------------------------------------------+ + \_________________________________ ________________________________/ + \/ + RaptorQ FEC encoding + + +-------------------------------------------------------------------+ + | Repair 4 | + +-------------------------------------------------------------------+ + . . + . . + +-------------------------------------------------------------------+ + | Repair 7 | + +-------------------------------------------------------------------+ + + T - Symbol Size + F - Flow ID + L - Length Indication + ADU - Application Data Unit (RTP packet) +``` + +Encoder element creates requested number of packets for a given Source Block. +The repair packets are send during `repair-window` which is configurable +parameter. E.g. if encoder element produces 5 repair packets and `repair-window` +is set to 500ms, a first repair packet is send 100ms after the last protected +packet, second at 200ms and the last at `repair-window`. + +Each repair packet except the symbols that are required to recover missing +source packets, contains also the information about the Source Block: + +- `I` - Initial sequence number of the Source Block, +- `Lp` - ADUI length in symbols, +- `Lb` - Source Block Length in symbols, + +### Decoder Element +Decoder element stores the copy of received RTP packets, and push original +packet downstream immediately. If all the RTP packets have been received, the +buffered media packets are dropped. If any packets are missing, the receiver +checks if it has enough buffered media and repair packets to perform decoding. +If that's the case it tries to recover missing packets by building the Source +Block following the same rules as sender, except it skips missing packets and +append repair packets to the block instead. + +Because the receiver element does not introduce latency, the recovered packets +are send out of sequence, and it requires a `rtpjitterbuffer` to be chained +downstream. The `rtpjitterbuffer` needs to be configured with high enough +latency. + +The receiver to determine which media packets belongs to Source Blocks uses the +information that can be retrieved from any of the repair packets. Then media +packets with Sequence Numbers: `I + Lb/Lp - 1` inclusive, are considered during +building a Source Block. + +The receiver uses `repair-window` that is signaled by the sender, and its own +`repair-window-tolerance` parameter to decide for how long it should wait for +the corresponding repair packets before giving up. The wait time is +`repair-window + repair-window-tolerance`. diff --git a/net/raptorq/build.rs b/net/raptorq/build.rs new file mode 100644 index 000000000..cda12e57e --- /dev/null +++ b/net/raptorq/build.rs @@ -0,0 +1,3 @@ +fn main() { + gst_plugin_version_helper::info() +} diff --git a/net/raptorq/src/fecscheme.rs b/net/raptorq/src/fecscheme.rs new file mode 100644 index 000000000..1f62eddf2 --- /dev/null +++ b/net/raptorq/src/fecscheme.rs @@ -0,0 +1,96 @@ +// Copyright (C) 2022 Tomasz Andrzejak +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +pub const MAX_SOURCE_BLOCK_LEN: usize = 56403; +pub const MAX_ENCODING_SYMBOL_SIZE: usize = 65536; + +// RFC6681, section 8.1.1.1 +pub const FEC_SCHEME_ID: u32 = 6; + +#[derive(Clone, Debug, PartialEq)] +pub struct DataUnitHeader { + pub flow_indication: u8, + pub len_indication: u16, +} + +// RFC6881, section 5 +impl DataUnitHeader { + pub fn encode(&self) -> [u8; 3] { + let mut bytes: [u8; 3] = [0; 3]; + + bytes[0] = self.flow_indication; + bytes[1..3].copy_from_slice(&self.len_indication.to_be_bytes()); + bytes + } + + pub fn decode(bytes: [u8; 3]) -> Self { + Self { + flow_indication: bytes[0], + len_indication: u16::from_be_bytes([bytes[1], bytes[2]]), + } + } +} + +// RFC6881, section 8.1.3 +#[derive(Clone, Debug, PartialEq)] +pub struct RepairPayloadId { + pub initial_sequence_num: u16, + pub source_block_len: u16, + pub encoding_symbol_id: u32, // 24 bits +} + +impl RepairPayloadId { + pub fn encode(&self) -> [u8; 7] { + let mut bytes: [u8; 7] = [0; 7]; + + bytes[0..2].copy_from_slice(&self.initial_sequence_num.to_be_bytes()); + bytes[2..4].copy_from_slice(&self.source_block_len.to_be_bytes()); + bytes[4..7].copy_from_slice(&self.encoding_symbol_id.to_be_bytes()[1..4]); + bytes + } + + pub fn decode(bytes: [u8; 7]) -> Self { + Self { + initial_sequence_num: u16::from_be_bytes([bytes[0], bytes[1]]), + source_block_len: u16::from_be_bytes([bytes[2], bytes[3]]), + encoding_symbol_id: u32::from_be_bytes([0, bytes[4], bytes[5], bytes[6]]), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_repair_payload_encode() { + let payload_id = RepairPayloadId { + initial_sequence_num: 42, + source_block_len: 43, + encoding_symbol_id: 44, + }; + + let encoded = payload_id.encode(); + assert_eq!(encoded.len(), 7); + + let decoded = RepairPayloadId::decode(encoded); + assert_eq!(payload_id, decoded); + } + + #[test] + fn test_unit_data_header_encode() { + let header = DataUnitHeader { + flow_indication: 42, + len_indication: 43, + }; + + let encoded = header.encode(); + assert_eq!(encoded.len(), 3); + + let decoded = DataUnitHeader::decode(encoded); + assert_eq!(header, decoded); + } +} diff --git a/net/raptorq/src/lib.rs b/net/raptorq/src/lib.rs new file mode 100644 index 000000000..0fd3b1258 --- /dev/null +++ b/net/raptorq/src/lib.rs @@ -0,0 +1,33 @@ +// Copyright (C) 2022 Tomasz Andrzejak +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +#![allow(clippy::non_send_fields_in_send_ty)] +#![doc = include_str!("../README.md")] + +use gst::glib; + +mod fecscheme; +mod raptorqdec; +mod raptorqenc; + +fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + raptorqdec::register(plugin)?; + raptorqenc::register(plugin)?; + + Ok(()) +} + +gst::plugin_define!( + raptorq, + env!("CARGO_PKG_DESCRIPTION"), + plugin_init, + concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")), + "MPL", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_REPOSITORY"), + env!("BUILD_REL_DATE") +); diff --git a/net/raptorq/src/raptorqdec/imp.rs b/net/raptorq/src/raptorqdec/imp.rs new file mode 100644 index 000000000..6ba2d6b80 --- /dev/null +++ b/net/raptorq/src/raptorqdec/imp.rs @@ -0,0 +1,939 @@ +// Copyright (C) 2022 Tomasz Andrzejak +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use gst::{error_msg, glib}; + +use gst::prelude::*; +use gst::subclass::prelude::*; + +use gst_rtp::RTPBuffer; + +use once_cell::sync::Lazy; + +use std::collections::BTreeMap; +use std::iter; +use std::ops::Range; +use std::sync::Mutex; + +use raptorq::{EncodingPacket, ObjectTransmissionInformation, PayloadId, SourceBlockDecoder}; + +use crate::fecscheme::{self, DataUnitHeader, RepairPayloadId}; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "raptorqdec", + gst::DebugColorFlags::empty(), + Some("RTP RaptorQ Decoder"), + ) +}); + +const DEFAULT_REPAIR_WINDOW_TOLERANCE: u32 = 500; +const DEFAULT_MEDIA_PACKETS_RESET_THRESHOLD: u32 = 5000; + +#[derive(Debug, Clone, Copy)] +struct Settings { + repair_window_tolerance: u32, + media_packets_reset_threshold: u32, +} + +impl Default for Settings { + fn default() -> Self { + Self { + repair_window_tolerance: DEFAULT_REPAIR_WINDOW_TOLERANCE, + media_packets_reset_threshold: DEFAULT_MEDIA_PACKETS_RESET_THRESHOLD, + } + } +} + +#[derive(Debug, Default, Clone, Copy)] +struct Stats { + recv: u64, + lost: u64, + recovered: u64, +} + +#[derive(Debug, Clone, Copy)] +struct SourceBlockInfo { + initial_seq: u64, + symbols_per_block: u64, + symbols_per_packet: u64, +} + +impl SourceBlockInfo { + fn seq_range(&self) -> Range { + // RFC 6881, section 8.2.2 + let i = self.initial_seq; + let lp = self.symbols_per_packet; + let lb = self.symbols_per_block; + + i..i + lb / lp + } + + fn packets_num(&self) -> usize { + (self.symbols_per_block / self.symbols_per_packet) as usize + } +} + +#[derive(Debug, Clone)] +struct RepairPacketItem { + payload_id: RepairPayloadId, + payload: Vec, +} + +#[derive(Debug, Clone)] +struct MediaPacketItem { + header: DataUnitHeader, + payload: Vec, +} + +#[derive(Default)] +struct State { + media_packets: BTreeMap, + repair_packets: BTreeMap>, + expirations: BTreeMap>, + source_block_info: BTreeMap, + extended_media_seq: Option, + extended_repair_seq: Option, + symbol_size: usize, + media_packets_reset_threshold: usize, + repair_window: Option, + max_arrival_time: Option, + stats: Stats, +} + +impl State { + fn drop_source_block(&mut self, seq: u64) { + if let Some(info) = self.source_block_info.get(&seq) { + let (seq_lo, seq_hi) = (info.seq_range().start, info.seq_range().end); + + self.media_packets.retain(|&k, _| k >= seq_hi); + self.repair_packets.remove(&seq_lo); + self.source_block_info.remove(&seq_lo); + self.expirations.remove(&seq_lo); + } + } + + fn expire_packets(&mut self) -> Vec { + let expired = self + .expirations + .iter() + .filter_map(|(&seq, &expiration)| { + if self.max_arrival_time.opt_gt(expiration) == Some(true) { + Some(seq) + } else { + None + } + }) + .collect::>(); + + for seq in &expired { + self.drop_source_block(*seq); + } + + expired + } +} + +pub struct RaptorqDec { + sinkpad: gst::Pad, + srcpad: gst::Pad, + sinkpad_fec: Mutex>, + settings: Mutex, + state: Mutex, +} + +impl RaptorqDec { + fn process_source_block( + &self, + element: &super::RaptorqDec, + state: &mut State, + ) -> Result { + // Pull the information about the current Source Block from sequence. + // Data packets for current Source Block are in range: info.seq_range(), + // Repair Packets on the other hand, for a given block share the same key + // in the sequence, which is the lower seq bound of Data Packets. + let source_block_info = state + .source_block_info + .values() + .cloned() + .collect::>(); + + for info in source_block_info { + let (seq_lo, seq_hi) = (info.seq_range().start, info.seq_range().end); + let data_packets_num = state.media_packets.range(seq_lo..seq_hi).count(); + let n = info.packets_num(); + + if data_packets_num == n { + gst::trace!( + CAT, + obj: element, + "All packets ({}) received, dropping Source Block ({})", + data_packets_num, + seq_lo + ); + + state.drop_source_block(seq_lo); + continue; + } + + let repair_packets_num = state.repair_packets.entry(seq_lo).or_default().len(); + + // Wait until we have enough Symbols to start decoding a Block + if data_packets_num + repair_packets_num < n { + continue; + } + + // Build Source Block from received Data Packets and append + // Repair Packets that have the same initial sequnce number + let mut source_block = Vec::with_capacity( + (data_packets_num + repair_packets_num) + .checked_mul(state.symbol_size) + .ok_or(gst::FlowError::NotSupported)? + .checked_mul(info.symbols_per_packet as usize) + .ok_or(gst::FlowError::NotSupported)?, + ); + + source_block.extend( + Iterator::chain( + state + .media_packets + .range(seq_lo..seq_hi) + .map(|(_, packet)| { + let si = info.symbols_per_packet as usize; + let mut data = vec![0; si * state.symbol_size]; + + assert!(data.len() >= packet.payload.len() + 3); + + data[0..3].copy_from_slice(&packet.header.encode()); + data[3..3 + packet.payload.len()].copy_from_slice(&packet.payload); + data + }), + state + .repair_packets + .entry(seq_lo) + .or_default() + .iter() + .map(|packet| packet.payload.to_owned()), + ) + .flatten(), + ); + + // RFC 6881, section 8.2.2 + let esi = Iterator::chain( + state + .media_packets + .range(seq_lo..seq_hi) + .flat_map(|(seq, _)| { + let i = (seq - seq_lo) * info.symbols_per_packet; + (i..i + info.symbols_per_packet).collect::>() + }), + state + .repair_packets + .entry(seq_lo) + .or_default() + .iter() + .flat_map(|packet| { + let i = packet.payload_id.encoding_symbol_id as u64; + (i..i + info.symbols_per_packet).collect::>() + }), + ) + .collect::>(); + + let symbolsz = state.symbol_size as u64; + let blocksz = info.symbols_per_block * symbolsz; + + let config = ObjectTransmissionInformation::new(0, symbolsz as u16, 1, 1, 8); + let mut decoder = SourceBlockDecoder::new2(0, &config, blocksz); + let mut result = None; + + for (esi, symbol) in + Iterator::zip(esi.iter(), source_block.chunks_exact(state.symbol_size)) + { + let payload_id = PayloadId::new(0, *esi as u32); + let encoding_packet = EncodingPacket::new(payload_id, symbol.to_vec()); + + result = decoder.decode(iter::once(encoding_packet)); + if result.is_some() { + break; + } + } + + if let Some(data) = result { + // Find missing packets in the Source Block + let missing_indices = (seq_lo..seq_hi) + .filter_map(|seq| match state.media_packets.contains_key(&seq) { + false => Some((seq - seq_lo) as usize), + true => None, + }) + .collect::>(); + + let pktsz = (info.symbols_per_packet * symbolsz) as usize; + + let recovered_packets = missing_indices + .iter() + .filter_map(|i| { + let packet = &data[i * pktsz..]; + let header = packet[0..3].try_into().ok()?; + + let len = DataUnitHeader::decode(header).len_indication as usize; + + // Length indication does not account for Unit Header and RTP header + if packet.len() >= len + 3 + 12 { + let data_unit = packet[3..len + 12 + 3].to_owned(); + let mut buf = gst::Buffer::from_slice(data_unit); + + let buf_mut = buf.get_mut().unwrap(); + buf_mut.set_dts(state.max_arrival_time); + + return Some(buf); + } + + None + }) + .collect::>(); + + state.drop_source_block(seq_lo); + state.stats.lost += missing_indices.len() as u64; + + for packet in recovered_packets { + { + let rtpbuf = RTPBuffer::from_buffer_readable(&packet).unwrap(); + + gst::debug!( + CAT, + obj: element, + "Succesfully recovered packet: seqnum: {}, len: {}, ts: {}", + rtpbuf.seq(), + rtpbuf.payload_size(), + rtpbuf.timestamp(), + ); + } + + state.stats.recovered += 1; + self.srcpad.push(packet)?; + } + } + } + + Ok(gst::FlowSuccess::Ok) + } + + fn store_media_packet( + &self, + element: &super::RaptorqDec, + state: &mut State, + buffer: &gst::Buffer, + ) -> Result { + let this_seq = { + let rtpbuf = RTPBuffer::from_buffer_readable(buffer).map_err(|err| { + gst::error!(CAT, obj: element, "Failed to map rtp buffer : {}", err); + gst::FlowError::Error + })?; + + gst::trace!( + CAT, + obj: element, + "New data packet, seq {}, ts {}", + rtpbuf.seq(), + rtpbuf.timestamp() + ); + + // Expand cyclic sequence numbers to u64, start from u16::MAX so we + // never overflow substraction. + let seq = rtpbuf.seq(); + let prev_seq = state.extended_media_seq.unwrap_or(65_535 + seq as u64); + + let delta = gst_rtp::compare_seqnum(prev_seq as u16, seq); + + match delta.is_negative() { + true => prev_seq - delta.unsigned_abs() as u64, + false => prev_seq + delta.unsigned_abs() as u64, + } + }; + + let map = buffer.map_readable().map_err(|_| gst::FlowError::Error)?; + + // As defined in RFC6881, section 8.2.4, length indication + // should be equal to UDP packet length without RTP header. + let header = DataUnitHeader { + flow_indication: 0, + len_indication: buffer.size() as u16 - 12, + }; + + state.media_packets.insert( + this_seq, + MediaPacketItem { + header, + payload: map.to_vec(), + }, + ); + + state.stats.recv += 1; + state.extended_media_seq = Some(this_seq); + + let now = buffer.dts_or_pts(); + state.max_arrival_time = state.max_arrival_time.opt_max(now).or(now); + + Ok(gst::FlowSuccess::Ok) + } + + fn sink_chain( + &self, + _pad: &gst::Pad, + element: &super::RaptorqDec, + buffer: gst::Buffer, + ) -> Result { + let mut state = self.state.lock().unwrap(); + self.store_media_packet(element, &mut state, &buffer)?; + + // Retire the packets that have been around for too long + let expired = state.expire_packets(); + for seq in expired { + gst::trace!( + CAT, + obj: element, + "Source Block ({}) dropped, because max wait time has been exceeded", + seq as u16 + ); + } + + // This is the fuse to make sure we are not growing RTP storage indefinitely. + let thresh = state.media_packets_reset_threshold; + if thresh > 0 && state.media_packets.len() >= thresh { + gst::warning!( + CAT, + obj: element, + "Too many buffered media packets, resetting decoder. This might \ + be because we haven't received a repair packet for too long, or \ + repair packets have no valid timestamps.", + ); + + self.reset(element); + } + + self.process_source_block(element, &mut state)?; + drop(state); + + self.srcpad.push(buffer) + } + + fn fec_sink_chain( + &self, + _pad: &gst::Pad, + element: &super::RaptorqDec, + buffer: gst::Buffer, + ) -> Result { + let rtpbuf = RTPBuffer::from_buffer_readable(&buffer).map_err(|err| { + gst::error!(CAT, obj: element, "Failed to map rtp buffer : {}", err); + gst::FlowError::Error + })?; + + let payload = rtpbuf.payload().unwrap(); + let payload_id = payload[0..7].try_into().map_err(|err| { + gst::error!(CAT, obj: element, "Unexpected rtp fec payload : {}", err); + gst::FlowError::Error + })?; + + let mut state = self.state.lock().unwrap(); + + let id = RepairPayloadId::decode(payload_id); + + let i = id.initial_sequence_num; + let lb = id.source_block_len as u64; + let lp = ((payload.len() - payload_id.len()) / state.symbol_size) as u64; + + gst::trace!( + CAT, + obj: element, + "New repair packet, I: {}, LP: {}, LB: {}", + i, + lp, + lb, + ); + + // Expand cyclic sequence numbers to u64, start from u16::MAX so we + // never overflow substraction. + let prev_seq = state.extended_repair_seq.unwrap_or(65_535 + i as u64); + let delta = gst_rtp::compare_seqnum(prev_seq as u16, i); + + let this_seq = match delta.is_negative() { + true => prev_seq - delta.unsigned_abs() as u64, + false => prev_seq + delta.unsigned_abs() as u64, + }; + + state.extended_repair_seq = Some(this_seq); + + let expire_at = state.max_arrival_time.opt_add(state.repair_window); + let scheduled = state.expirations.entry(this_seq).or_insert(expire_at); + + // Update already scheduled expiration if a new value happens to be earlier + *scheduled = scheduled.opt_min(expire_at); + + state + .source_block_info + .entry(this_seq) + .or_insert(SourceBlockInfo { + initial_seq: this_seq, + symbols_per_block: lb, + symbols_per_packet: lp, + }); + + state + .repair_packets + .entry(this_seq) + .or_insert_with(Vec::new) + .push(RepairPacketItem { + payload_id: id, + payload: payload[7..].to_vec(), // without PayloadId + }); + + assert_eq!(state.repair_packets.len(), state.source_block_info.len()); + assert_eq!(state.repair_packets.len(), state.expirations.len()); + + Ok(gst::FlowSuccess::Ok) + } + + fn sink_event(&self, pad: &gst::Pad, element: &super::RaptorqDec, event: gst::Event) -> bool { + gst::debug!(CAT, "Handling event {:?}", event); + use gst::EventView; + + if let EventView::FlushStop(_) = event.view() { + self.reset(element); + } + + pad.event_default(Some(element), event) + } + + fn fec_sink_event( + &self, + pad: &gst::Pad, + element: &super::RaptorqDec, + event: gst::Event, + ) -> bool { + gst::debug!(CAT, "Handling event {:?}", event); + use gst::EventView; + + if let EventView::Caps(c) = event.view() { + if let Err(err) = self.start(element, c.caps()) { + gst::element_error!( + element, + gst::CoreError::Event, + ["Failed to start raptorqdec {:?}", err] + ); + + return false; + } + } + + pad.event_default(Some(element), event) + } + + fn iterate_internal_links( + &self, + pad: &gst::Pad, + _element: &super::RaptorqDec, + ) -> gst::Iterator { + if pad == &self.srcpad { + gst::Iterator::from_vec(vec![self.sinkpad.clone()]) + } else if pad == &self.sinkpad { + gst::Iterator::from_vec(vec![self.srcpad.clone()]) + } else { + gst::Iterator::from_vec(vec![]) + } + } + + fn start( + &self, + element: &super::RaptorqDec, + incaps: &gst::CapsRef, + ) -> Result<(), gst::ErrorMessage> { + let symbol_size = fmtp_param_from_caps::("t", incaps)?; + + if symbol_size > fecscheme::MAX_ENCODING_SYMBOL_SIZE { + let details = format!( + "Symbol size exceeds Maximum Encoding Symbol Size: {}", + fecscheme::MAX_ENCODING_SYMBOL_SIZE + ); + + gst::element_error!(element, gst::CoreError::Failed, [&details]); + return Err(error_msg!(gst::CoreError::Failed, [&details])); + } + + let settings = self.settings.lock().unwrap(); + + let tolerance = settings.repair_window_tolerance as u64; + let repair_window = fmtp_param_from_caps::("repair-window", incaps)?; + + let tolerance = gst::ClockTime::from_mseconds(tolerance); + let repair_window = gst::ClockTime::from_useconds(repair_window); + let repair_window = Some(repair_window + tolerance); + + let media_packets_reset_threshold = settings.media_packets_reset_threshold as usize; + + gst::debug!(CAT, obj: element, "Configured for caps {}", incaps); + + let mut state = self.state.lock().unwrap(); + + state.symbol_size = symbol_size; + state.repair_window = repair_window; + state.media_packets_reset_threshold = media_packets_reset_threshold; + + Ok(()) + } + + fn stop(&self, element: &super::RaptorqDec) { + self.reset(element); + } + + fn reset(&self, _element: &super::RaptorqDec) { + let mut state = self.state.lock().unwrap(); + + state.media_packets.clear(); + state.repair_packets.clear(); + state.source_block_info.clear(); + state.expirations.clear(); + state.extended_media_seq = None; + state.extended_repair_seq = None; + state.max_arrival_time = gst::ClockTime::NONE; + state.stats = Default::default(); + } +} + +#[glib::object_subclass] +impl ObjectSubclass for RaptorqDec { + const NAME: &'static str = "GstRaptorqDec"; + type Type = super::RaptorqDec; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) + .chain_function(|pad, parent, buffer| { + Self::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this, element| this.sink_chain(pad, element, buffer), + ) + }) + .event_function(|pad, parent, event| { + Self::catch_panic_pad_function( + parent, + || false, + |this, element| this.sink_event(pad, element, event), + ) + }) + .iterate_internal_links_function(|pad, parent| { + Self::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this, element| this.iterate_internal_links(pad, element), + ) + }) + .flags(gst::PadFlags::PROXY_CAPS) + .build(); + + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::Pad::builder_with_template(&templ, Some("src")) + .iterate_internal_links_function(|pad, parent| { + Self::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this, element| this.iterate_internal_links(pad, element), + ) + }) + .flags(gst::PadFlags::PROXY_CAPS) + .build(); + + Self { + srcpad, + sinkpad, + sinkpad_fec: Mutex::new(None), + settings: Mutex::new(Default::default()), + state: Mutex::new(Default::default()), + } + } +} + +impl ObjectImpl for RaptorqDec { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecUInt::new( + "repair-window-tolerance", + "Repair Window Tolerance (ms)", + "The amount of time to add to repair-window reported by RaptorQ encoder (in ms)", + 0, + u32::MAX - 1, + DEFAULT_REPAIR_WINDOW_TOLERANCE, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpecUInt::new( + "media-packets-reset-threshold", + "Media Packets Reset Threshold", + "This is the maximum allowed number of buffered packets, before we reset the decoder. \ + It can only be triggered if we don't receive repair packets for too long, or packets \ + have no valid timestamps, (0 - disable)", + 0, + u32::MAX - 1, + DEFAULT_MEDIA_PACKETS_RESET_THRESHOLD, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpecBoxed::new( + "stats", + "Statistics", + "Various statistics", + gst::Structure::static_type(), + glib::ParamFlags::READABLE, + ), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property( + &self, + _obj: &Self::Type, + _id: usize, + value: &glib::Value, + pspec: &glib::ParamSpec, + ) { + match pspec.name() { + "repair-window-tolerance" => { + let mut settings = self.settings.lock().unwrap(); + let val = value.get().expect("type checked upstream"); + settings.repair_window_tolerance = val; + } + + "media-packets-reset-threshold" => { + let mut settings = self.settings.lock().unwrap(); + let val = value.get().expect("type checked upstream"); + settings.media_packets_reset_threshold = val; + } + _ => unimplemented!(), + } + } + + fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "repair-window-tolerance" => { + let settings = self.settings.lock().unwrap(); + settings.repair_window_tolerance.to_value() + } + "media-packets-reset-threshold" => { + let settings = self.settings.lock().unwrap(); + settings.media_packets_reset_threshold.to_value() + } + "stats" => { + let state = self.state.lock().unwrap(); + let stats = state.stats; + + let (media_packets, repair_packets) = ( + state.media_packets.len() as u64, + state + .repair_packets + .values() + .fold(0, |acc, x| acc + x.len() as u64), + ); + + let s = gst::Structure::builder("application/x-rtp-raptorqdec-stats") + .field("received-packets", stats.recv) + .field("lost-packets", stats.lost) + .field("recovered-packets", stats.recovered) + .field("buffered-media-packets", media_packets) + .field("buffered-repair-packets", repair_packets) + .build(); + + s.to_value() + } + _ => unimplemented!(), + } + } + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + } +} + +impl GstObjectImpl for RaptorqDec {} + +impl ElementImpl for RaptorqDec { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP RaptorQ FEC Decoder", + "RTP RaptorQ FEC Decoding", + "Performs FEC using RaptorQ (RFC6681, RFC6682)", + "Tomasz Andrzejak ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::builder("application/x-rtp").build(); + + let srcpad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + let sinkpad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + let sink_fec_caps = gst::Caps::builder("application/x-rtp") + .field("raptor-scheme-id", fecscheme::FEC_SCHEME_ID.to_string()) + // All fmtp paramters from SDP are string in caps, those are + // required parameters that cannot be expressed as string: + // .field("kmax", (string) [1, MAX_SOURCE_BLOCK_LEN]) + // .field("t", (string) [1, MAX_ENCODING_SYMBOL_SIZE]) + // .field("repair-window", (string) ANY) + .build(); + + let sinkpad_fec_template = gst::PadTemplate::new( + "fec_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &sink_fec_caps, + ) + .unwrap(); + + vec![srcpad_template, sinkpad_template, sinkpad_fec_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::ReadyToPaused => { + self.reset(element); + } + gst::StateChange::PausedToReady => { + self.stop(element); + } + _ => (), + } + + self.parent_change_state(element, transition) + } + + fn request_new_pad( + &self, + element: &Self::Type, + templ: &gst::PadTemplate, + name: Option, + _caps: Option<&gst::Caps>, + ) -> Option { + let mut sinkpad_fec_guard = self.sinkpad_fec.lock().unwrap(); + + if sinkpad_fec_guard.is_some() { + gst::element_error!( + element, + gst::CoreError::Pad, + ["Not accepting more than one FEC stream"] + ); + + return None; + } + + let sinkpad_fec = gst::Pad::builder_with_template(templ, name.as_deref()) + .chain_function(|pad, parent, buffer| { + Self::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this, element| this.fec_sink_chain(pad, element, buffer), + ) + }) + .event_function(|pad, parent, event| { + Self::catch_panic_pad_function( + parent, + || false, + |this, element| this.fec_sink_event(pad, element, event), + ) + }) + .iterate_internal_links_function(|pad, parent| { + Self::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this, element| this.iterate_internal_links(pad, element), + ) + }) + .build(); + + sinkpad_fec.set_active(true).unwrap(); + *sinkpad_fec_guard = Some(sinkpad_fec.clone()); + + drop(sinkpad_fec_guard); + + element.add_pad(&sinkpad_fec).unwrap(); + + Some(sinkpad_fec) + } + + fn release_pad(&self, element: &Self::Type, _pad: &gst::Pad) { + let mut pad_guard = self.sinkpad_fec.lock().unwrap(); + + if let Some(pad) = pad_guard.take() { + drop(pad_guard); + pad.set_active(false).unwrap(); + element.remove_pad(&pad).unwrap(); + } + } +} + +fn fmtp_param_from_caps( + name: &str, + caps: &gst::CapsRef, +) -> Result +where + ::Err: std::fmt::Debug, +{ + caps.structure(0) + .unwrap() + .get::(name) + .map_err(|err| { + error_msg!( + gst::CoreError::Caps, + [ + "Could not get \"{}\" param from caps {:?}, err: {:?}", + name, + caps, + err + ] + ) + })? + .parse::() + .map_err(|err| { + error_msg!( + gst::CoreError::Caps, + [ + "Could not parse \"{}\" param from caps {:?}, err: {:?}", + name, + caps, + err + ] + ) + }) +} diff --git a/net/raptorq/src/raptorqdec/mod.rs b/net/raptorq/src/raptorqdec/mod.rs new file mode 100644 index 000000000..73fa33af1 --- /dev/null +++ b/net/raptorq/src/raptorqdec/mod.rs @@ -0,0 +1,23 @@ +// Copyright (C) 2022 Tomasz Andrzejak +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct RaptorqDec(ObjectSubclass) @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "raptorqdec", + gst::Rank::Marginal, + RaptorqDec::static_type(), + ) +} diff --git a/net/raptorq/src/raptorqenc/imp.rs b/net/raptorq/src/raptorqenc/imp.rs new file mode 100644 index 000000000..42825ab1c --- /dev/null +++ b/net/raptorq/src/raptorqenc/imp.rs @@ -0,0 +1,969 @@ +// Copyright (C) 2022 Tomasz Andrzejak +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use gst::{element_error, error_msg, glib, loggable_error}; + +use gst::prelude::*; +use gst::subclass::prelude::*; + +use gst_rtp::rtp_buffer::*; +use gst_rtp::RTPBuffer; + +use once_cell::sync::Lazy; + +use std::collections::HashSet; +use std::sync::{mpsc, Mutex}; + +use raptorq::{ + extended_source_block_symbols, ObjectTransmissionInformation, SourceBlockEncoder, + SourceBlockEncodingPlan, +}; + +use crate::fecscheme::{self, DataUnitHeader, RepairPayloadId}; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "raptorqenc", + gst::DebugColorFlags::empty(), + Some("RTP RaptorQ Encoder"), + ) +}); + +const DEFAULT_PROTECTED_PACKETS: u32 = 25; +const DEFAULT_REPAIR_PACKETS: u32 = 5; +const DEFAULT_REPAIR_WINDOW: u32 = 50; +const DEFAULT_SYMBOL_SIZE: u32 = 1408; +const DEFAULT_MTU: u32 = 1400; +const DEFAULT_PT: u32 = 97; + +const SYMBOL_ALIGNMENT: usize = 8; + +#[derive(Debug, Clone, Copy)] +struct Settings { + protected_packets: u32, + repair_packets: u32, + repair_window: u32, + symbol_size: u32, + mtu: u32, + pt: u32, +} + +impl Default for Settings { + fn default() -> Self { + Self { + protected_packets: DEFAULT_PROTECTED_PACKETS, + repair_packets: DEFAULT_REPAIR_PACKETS, + repair_window: DEFAULT_REPAIR_WINDOW, + symbol_size: DEFAULT_SYMBOL_SIZE, + mtu: DEFAULT_MTU, + pt: DEFAULT_PT, + } + } +} + +type BufferTarget = (Option, gst::Buffer); +type BufferTrigger = (gst::ClockId, gst::Buffer); + +enum SrcTaskMsg { + Schedule(BufferTarget), + Timeout(BufferTrigger), + Eos, +} + +#[derive(Debug, Clone)] +struct State { + packets: Vec, + seqnums: Vec, + sender: Option>, + segment: gst::FormattedSegment, + repair_packets_num: usize, + protected_packets_num: usize, + repair_window: usize, + symbol_size: usize, + symbols_per_packet: usize, + symbols_per_block: usize, + mtu: usize, + pt: u8, + seq: u16, + ssrc: u32, + clock_rate: Option, + info: ObjectTransmissionInformation, + plan: SourceBlockEncodingPlan, +} + +pub struct RaptorqEnc { + sinkpad: gst::Pad, + srcpad: gst::Pad, + srcpad_fec: gst::Pad, + settings: Mutex, + state: Mutex>, + pending_timers: Mutex>, +} + +impl RaptorqEnc { + fn process_source_block( + element: &super::RaptorqEnc, + state: &mut State, + now_pts: Option, + now_dts: Option, + now_rtpts: u32, + ) -> Result { + let sender = match &state.sender { + Some(sender) => sender, + None => return Ok(gst::FlowSuccess::Ok), + }; + + // Build Source Block, RFC6881, section 8. + let mut source_block = Vec::with_capacity( + state + .symbol_size + .checked_mul(state.symbols_per_block) + .ok_or(gst::FlowError::NotSupported)?, + ); + + source_block.extend(state.packets.iter().flat_map(|packet| { + // As defined in RFC6881, section 8.2.4, length indication + // should be equal to UDP packet length without RTP header. + let li = packet.size() - 12; + // Value of s[i] should be equal to number of repair symbols + // placed in each repair packet. + let si = state.symbols_per_packet; + + gst::trace!( + CAT, + obj: element, + "Source Block add ADU: si {}, li {}", + si, + li + ); + + let mut data = vec![0; si * state.symbol_size]; + + data[0..3].copy_from_slice( + &DataUnitHeader { + flow_indication: 0, + len_indication: li as u16, + } + .encode(), + ); + + let packet_map = packet.map_readable().unwrap(); + let packet_data = packet_map.as_slice(); + + data[3..3 + packet.size()].copy_from_slice(packet_data); + data + })); + + assert_eq!( + state.symbol_size * state.symbols_per_block, + source_block.len() + ); + + let encoder = + SourceBlockEncoder::with_encoding_plan2(0, &state.info, &source_block, &state.plan); + + let sbl = state.symbols_per_block; + + // Initial sequnce number in Repair Payload ID is a sequence number of + // the first packet in the Source Block. + let seq = state.seqnums.first().cloned().unwrap(); + + // Build FEC packets as defined in RFC6881, section 8.1.3 + let repair_symbols = state.repair_packets_num * state.symbols_per_packet; + + // Delay step is used to create linearly spaced vector of delays for + // repair packets. All the repair packets are send within repair_window + // span from the fec srcpad thread. + let delay_step = state + .repair_window + .checked_div(state.repair_packets_num) + .unwrap_or(0); + + let delays = (1..=state.repair_packets_num) + .map(|n| gst::ClockTime::from_mseconds((n * delay_step) as u64)) + .collect::>(); + + let base_time = element.base_time(); + let running_time = state.segment.to_running_time(now_pts); + + for (target_time, repair_packet) in Iterator::zip( + delays + .iter() + .map(|delay| base_time.opt_add(running_time).opt_add(delay)), + encoder + .repair_packets(0, repair_symbols as u32) + .chunks_exact(state.symbols_per_packet) + .enumerate() + .zip(&delays) + .map(|((n, packets), delay)| { + let esi = packets[0].payload_id().encoding_symbol_id(); + + let payload_id = RepairPayloadId { + initial_sequence_num: seq, + source_block_len: sbl as u16, + encoding_symbol_id: esi, + } + .encode(); + + let fecsz = payload_id.len() + state.symbol_size * state.symbols_per_packet; + let mut buf = gst::Buffer::new_rtp_with_sizes(fecsz as u32, 0, 0).unwrap(); + + { + let buf_mut = buf.get_mut().unwrap(); + buf_mut.set_pts(now_pts.opt_add(delay)); + buf_mut.set_dts(now_dts.opt_add(delay)); + + let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap(); + + rtpbuf.set_payload_type(state.pt); + rtpbuf.set_seq(state.seq); + rtpbuf.set_marker(n == state.repair_packets_num - 1); + + if let Some(clock_rate) = state.clock_rate { + let rtpdelay = delay + .mul_div_round(*gst::ClockTime::SECOND, clock_rate as u64) + .unwrap() + .nseconds() as u32; + + rtpbuf.set_timestamp(now_rtpts.overflowing_add(rtpdelay).0); + } + + state.seq = state.seq.overflowing_add(1).0; + + let payload = rtpbuf.payload_mut().unwrap(); + payload[0..payload_id.len()].copy_from_slice(&payload_id); + + for (n, packet) in packets.iter().enumerate() { + let data = packet.data(); + let start = payload_id.len() + n * data.len(); + + payload[start..start + data.len()].copy_from_slice(data); + } + } + + buf + }), + ) { + if sender + .send(SrcTaskMsg::Schedule((target_time, repair_packet))) + .is_err() + { + break; + } + } + + state.packets.clear(); + state.seqnums.clear(); + + Ok(gst::FlowSuccess::Ok) + } + + fn start_task(&self, element: &super::RaptorqEnc) -> Result<(), gst::LoggableError> { + let (sender, receiver) = mpsc::channel(); + + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().unwrap(); + + state.sender = Some(sender); + drop(state_guard); + + let element_weak = element.downgrade(); + let pad_weak = self.srcpad_fec.downgrade(); + + let mut eos = false; + + self.srcpad_fec + .start_task(move || { + while let Ok(msg) = receiver.recv() { + let pad = match pad_weak.upgrade() { + Some(pad) => pad, + None => break, + }; + + let element = match element_weak.upgrade() { + Some(element) => element, + None => break, + }; + + match msg { + SrcTaskMsg::Timeout((id, buf)) => { + let mut timers = element.imp().pending_timers.lock().unwrap(); + let _ = timers.remove(&id); + + let push_eos = eos && timers.is_empty(); + + drop(timers); + + if let Err(err) = pad.push(buf) { + gst::element_error!( + element, + gst::CoreError::Pad, + ["Failed to push on src FEC pad {:?}", err] + ); + + break; + } + + if push_eos { + pad.push_event(gst::event::Eos::new()); + break; + } + } + SrcTaskMsg::Schedule((target, buf)) => { + let target = match target { + Some(target) => target, + None => { + // No target, push buffer immediately + if let Err(err) = pad.push(buf) { + gst::element_error!( + element, + gst::CoreError::Pad, + ["Failed to push on src FEC pad {:?}", err] + ); + break; + } + + continue; + } + }; + + let clock = match element.clock() { + Some(clock) => clock, + None => { + // No clock provided, push buffer immediately + if let Err(err) = pad.push(buf) { + gst::element_error!( + element, + gst::CoreError::Pad, + ["Failed to push on src FEC pad {:?}", err] + ); + break; + } + + continue; + } + }; + + let timeout_sender = { + let state_guard = element.imp().state.lock().unwrap(); + let state = match state_guard.as_ref() { + Some(state) => state, + None => break, + }; + + state.sender.as_ref().unwrap().clone() + }; + + let timeout = clock.new_single_shot_id(target); + + let mut timers = element.imp().pending_timers.lock().unwrap(); + timers.insert(timeout.clone().into()); + + timeout + .wait_async(move |_clock, _time, id| { + let id = id.clone(); + let _ = timeout_sender.send(SrcTaskMsg::Timeout((id, buf))); + }) + .expect("Failed to wait async"); + } + SrcTaskMsg::Eos => { + if element.imp().pending_timers.lock().unwrap().is_empty() { + pad.push_event(gst::event::Eos::new()); + break; + } + + eos = true; + } + } + } + + // All senders dropped or error + let pad = match pad_weak.upgrade() { + Some(pad) => pad, + None => return, + }; + + let _ = pad.pause_task(); + }) + .map_err(|_| loggable_error!(CAT, "Failed to start pad task"))?; + + Ok(()) + } + + fn src_activatemode( + &self, + _pad: &gst::Pad, + element: &super::RaptorqEnc, + _mode: gst::PadMode, + active: bool, + ) -> Result<(), gst::LoggableError> { + if active { + self.start_task(element)?; + } else { + // element stop should be called at this point so that all mpsc + // senders used in task are dropped, otherwise channel can deadlock + self.srcpad_fec.stop_task()?; + } + + Ok(()) + } + + fn sink_chain( + &self, + _pad: &gst::Pad, + element: &super::RaptorqEnc, + buffer: gst::Buffer, + ) -> Result { + let mut state_guard = self.state.lock().unwrap(); + let state = state_guard.as_mut().ok_or(gst::FlowError::NotNegotiated)?; + + if buffer.size() > state.mtu { + gst::error!(CAT, obj: element, "Packet length exceeds configured MTU"); + return Err(gst::FlowError::NotSupported); + } + + let (curr_seq, now_rtpts) = match RTPBuffer::from_buffer_readable(&buffer) { + Ok(rtpbuf) => (rtpbuf.seq(), rtpbuf.timestamp()), + Err(_) => { + gst::error!(CAT, obj: element, "Mapping to RTP packet failed"); + return Err(gst::FlowError::NotSupported); + } + }; + + if let Some(last_seq) = state.seqnums.last() { + if last_seq.overflowing_add(1).0 != curr_seq { + gst::error!(CAT, obj: element, "Got out of sequence packets"); + return Err(gst::FlowError::NotSupported); + } + } + + state.packets.push(buffer.clone()); + state.seqnums.push(curr_seq); + + assert_eq!(state.packets.len(), state.seqnums.len()); + + if state.packets.len() == state.protected_packets_num as usize { + // We use current buffer timing as a base for repair packets timestamps + let now_pts = buffer.pts(); + let now_dts = buffer.dts_or_pts(); + + Self::process_source_block(element, state, now_pts, now_dts, now_rtpts)?; + } + + drop(state_guard); + self.srcpad.push(buffer) + } + + fn sink_event(&self, pad: &gst::Pad, element: &super::RaptorqEnc, event: gst::Event) -> bool { + gst::debug!(CAT, "Handling event {:?}", event); + use gst::EventView; + + match event.view() { + EventView::FlushStart(_) => { + if let Err(err) = self.stop(element) { + element_error!( + element, + gst::CoreError::Event, + ["Failed to stop encoder after flush start {:?}", err] + ); + return false; + } + + let _ = self.srcpad_fec.set_active(false); + } + EventView::FlushStop(_) => { + if let Err(err) = self.start(element) { + element_error!( + element, + gst::CoreError::Event, + ["Failed to start encoder after flush stop {:?}", err] + ); + return false; + } + + let _ = self.srcpad_fec.set_active(true); + } + EventView::Caps(ev) => { + let caps = ev.caps(); + gst::info!(CAT, obj: pad, "Got caps {:?}", caps); + + let mut state_guard = self.state.lock().unwrap(); + + if let Some(state) = state_guard.as_mut() { + let s = caps.structure(0).unwrap(); + + // We need clock rate to calculate RTP timestamps of + // delayed repair packets. + if let Ok(clock_rate) = s.get::("clock-rate") { + if clock_rate <= 0 { + element_error!(element, gst::CoreError::Event, ["Invalid clock rate"]); + return false; + } + + state.clock_rate = Some(clock_rate as u32); + } + } + } + EventView::Segment(ev) => { + let mut state_guard = self.state.lock().unwrap(); + + if let Some(state) = state_guard.as_mut() { + let segment = ev.segment().clone(); + let segment = match segment.downcast::() { + Ok(segment) => segment, + Err(_) => { + element_error!( + element, + gst::CoreError::Event, + ["Only time segments are supported"] + ); + return false; + } + }; + + state.segment = segment.clone(); + + // Push stream events on FEC srcpad as well + let pad = &self.srcpad_fec; + let stream_id = pad.create_stream_id(element, Some("fec")).to_string(); + + let kmax = extended_source_block_symbols(state.symbols_per_block as u32); + let scheme_id = fecscheme::FEC_SCHEME_ID; + + // RFC 6682, section 6.1.1 + let caps = gst::Caps::builder("application/x-rtp") + .field("payload", state.pt as i32) + .field("ssrc", state.ssrc as i32) + .field("clock-rate", state.clock_rate.unwrap_or(0) as i32) + .field("encoding-name", "RAPTORFEC") + .field("raptor-scheme-id", scheme_id.to_string()) + .field("kmax", kmax.to_string()) + .field("repair-window", (state.repair_window * 1000).to_string()) // ms -> us + .field("t", state.symbol_size.to_string()) + .field("p", "B") + .build(); + + drop(state_guard); + + pad.push_event(gst::event::StreamStart::new(&stream_id)); + pad.push_event(gst::event::Caps::new(&caps)); + pad.push_event(gst::event::Segment::new(&segment)); + } + } + EventView::Eos(_) => { + let mut state_guard = self.state.lock().unwrap(); + + if let Some(state) = state_guard.as_mut() { + let _ = state.sender.as_ref().unwrap().send(SrcTaskMsg::Eos); + } + } + _ => (), + } + + pad.event_default(Some(element), event) + } + + fn iterate_internal_links( + &self, + pad: &gst::Pad, + _element: &super::RaptorqEnc, + ) -> gst::Iterator { + if pad == &self.sinkpad { + gst::Iterator::from_vec(vec![self.srcpad.clone()]) + } else if pad == &self.srcpad { + gst::Iterator::from_vec(vec![self.sinkpad.clone()]) + } else { + gst::Iterator::from_vec(vec![]) + } + } + + fn start(&self, element: &super::RaptorqEnc) -> Result<(), gst::ErrorMessage> { + let settings = self.settings.lock().unwrap(); + + let protected_packets_num = settings.protected_packets as usize; + let repair_packets_num = settings.repair_packets as usize; + let repair_window = settings.repair_window as usize; + let symbol_size = settings.symbol_size as usize; + let mtu = settings.mtu as usize; + let pt = settings.pt as u8; + + // this is the number of repair symbols placed in each repair packet, + // it SHALL be the same for all repair packets in a block. This include + // 1 byte of flow indication and 2 bytes of lenght indication as defined + // in RFC6881, section 8.2.4. + let symbols_per_packet = (mtu + 3 + symbol_size - 1) / symbol_size; + let symbols_per_block = symbols_per_packet * protected_packets_num; + + if symbol_size.rem_euclid(SYMBOL_ALIGNMENT) != 0 { + let details = format!( + "Symbol size is not multiple of Symbol Alignment {}", + SYMBOL_ALIGNMENT + ); + + gst::element_error!(element, gst::CoreError::Failed, [&details]); + return Err(error_msg!(gst::CoreError::Failed, [&details])); + } + + if symbol_size > fecscheme::MAX_ENCODING_SYMBOL_SIZE { + let details = format!( + "Symbol size exceeds Maximum Encoding Symbol Size: {}", + fecscheme::MAX_ENCODING_SYMBOL_SIZE + ); + + gst::element_error!(element, gst::CoreError::Failed, [&details]); + return Err(error_msg!(gst::CoreError::Failed, [&details])); + } + + if symbols_per_block > fecscheme::MAX_SOURCE_BLOCK_LEN { + let details = format!( + "Source block length exceeds Maximum Source Block Length: {}", + fecscheme::MAX_SOURCE_BLOCK_LEN + ); + + gst::element_error!(element, gst::CoreError::Failed, [&details]); + return Err(error_msg!(gst::CoreError::Failed, [&details])); + } + + gst::info!( + CAT, + obj: element, + "Starting RaptorQ Encoder, Symbols per Block: {}, Symbol Size: {}", + symbols_per_block, + symbol_size + ); + + let plan = SourceBlockEncodingPlan::generate(symbols_per_block as u16); + let info = ObjectTransmissionInformation::new(0, symbol_size as u16, 1, 1, 8); + + let segment = gst::FormattedSegment::::default(); + + *self.state.lock().unwrap() = Some(State { + info, + plan, + repair_packets_num, + protected_packets_num, + repair_window, + symbol_size, + symbols_per_packet, + symbols_per_block, + mtu, + pt, + segment, + seq: 0, + ssrc: 0, + packets: Vec::new(), + seqnums: Vec::new(), + clock_rate: None, + sender: None, + }); + + Ok(()) + } + + fn stop(&self, _element: &super::RaptorqEnc) -> Result<(), gst::ErrorMessage> { + let mut timers = self.pending_timers.lock().unwrap(); + for timer in timers.drain() { + timer.unschedule(); + } + + // Drop state + let _ = self.state.lock().unwrap().take(); + Ok(()) + } +} + +#[glib::object_subclass] +impl ObjectSubclass for RaptorqEnc { + const NAME: &'static str = "GstRaptorqEnc"; + type Type = super::RaptorqEnc; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) + .chain_function(|pad, parent, buffer| { + Self::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this, element| this.sink_chain(pad, element, buffer), + ) + }) + .event_function(|pad, parent, event| { + Self::catch_panic_pad_function( + parent, + || false, + |this, element| this.sink_event(pad, element, event), + ) + }) + .iterate_internal_links_function(|pad, parent| { + Self::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this, element| this.iterate_internal_links(pad, element), + ) + }) + .flags(gst::PadFlags::PROXY_CAPS) + .build(); + + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::Pad::builder_with_template(&templ, Some("src")) + .iterate_internal_links_function(|pad, parent| { + Self::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this, element| this.iterate_internal_links(pad, element), + ) + }) + .flags(gst::PadFlags::PROXY_CAPS) + .build(); + + let templ = klass.pad_template("fec_0").unwrap(); + let srcpad_fec = gst::Pad::builder_with_template(&templ, Some("fec_0")) + .activatemode_function(move |pad, parent, mode, active| { + Self::catch_panic_pad_function( + parent, + || Err(loggable_error!(CAT, "Panic activating src pad with mode")), + |this, element| this.src_activatemode(pad, element, mode, active), + ) + }) + .iterate_internal_links_function(|pad, parent| { + Self::catch_panic_pad_function( + parent, + || gst::Iterator::from_vec(vec![]), + |this, element| this.iterate_internal_links(pad, element), + ) + }) + .build(); + + Self { + sinkpad, + srcpad, + srcpad_fec, + settings: Mutex::new(Default::default()), + state: Mutex::new(None), + pending_timers: Mutex::new(HashSet::new()), + } + } +} + +impl ObjectImpl for RaptorqEnc { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecUInt::new( + "protected-packets", + "Protected Packets", + "Number of packets to protect together", + 1, + u32::MAX - 1, + DEFAULT_PROTECTED_PACKETS, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpecUInt::new( + "repair-packets", + "Repair Packets", + "Number of repair packets per block to send", + 1, + u32::MAX - 1, + DEFAULT_REPAIR_PACKETS, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpecUInt::new( + "repair-window", + "Repair Window", + "A time span in milliseconds in which repair packets are send", + 0, + u32::MAX - 1, + DEFAULT_REPAIR_PACKETS, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpecUInt::new( + "symbol-size", + "Symbol Size", + "Size of RaptorQ data unit", + 1, + u32::MAX - 1, + DEFAULT_SYMBOL_SIZE, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpecUInt::new( + // TODO: maybe change this to max-rtp-packet-size or max-media-packet-size + "mtu", + "MTU", + "Maximum expected packet size", + 0, + i32::MAX as u32, + DEFAULT_MTU, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + glib::ParamSpecUInt::new( + "pt", + "Payload Type", + "The payload type of FEC packets", + 96, + 255, + DEFAULT_PT, + glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY, + ), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property( + &self, + _obj: &Self::Type, + _id: usize, + value: &glib::Value, + pspec: &glib::ParamSpec, + ) { + match pspec.name() { + "protected-packets" => { + let mut settings = self.settings.lock().unwrap(); + let protected_packets = value.get().expect("type checked upstream"); + settings.protected_packets = protected_packets; + } + "repair-packets" => { + let mut settings = self.settings.lock().unwrap(); + let repair_packets = value.get().expect("type checked upstream"); + settings.repair_packets = repair_packets; + } + "repair-window" => { + let mut settings = self.settings.lock().unwrap(); + let repair_window = value.get().expect("type checked upstream"); + settings.repair_window = repair_window; + } + "symbol-size" => { + let mut settings = self.settings.lock().unwrap(); + let symbol_size = value.get().expect("type checked upstream"); + settings.symbol_size = symbol_size; + } + "mtu" => { + let mut settings = self.settings.lock().unwrap(); + let mtu = value.get().expect("type checked upstream"); + settings.mtu = mtu; + } + "pt" => { + let mut settings = self.settings.lock().unwrap(); + let pt = value.get().expect("type checked upstream"); + settings.pt = pt; + } + _ => unimplemented!(), + } + } + + fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "protected-packets" => { + let settings = self.settings.lock().unwrap(); + settings.protected_packets.to_value() + } + "repair-packets" => { + let settings = self.settings.lock().unwrap(); + settings.repair_packets.to_value() + } + "repair-window" => { + let settings = self.settings.lock().unwrap(); + settings.repair_window.to_value() + } + "symbol-size" => { + let settings = self.settings.lock().unwrap(); + settings.symbol_size.to_value() + } + "mtu" => { + let settings = self.settings.lock().unwrap(); + settings.mtu.to_value() + } + "pt" => { + let settings = self.settings.lock().unwrap(); + settings.pt.to_value() + } + _ => unimplemented!(), + } + } + + fn constructed(&self, obj: &Self::Type) { + self.parent_constructed(obj); + + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + obj.add_pad(&self.srcpad_fec).unwrap(); + } +} + +impl GstObjectImpl for RaptorqEnc {} + +impl ElementImpl for RaptorqEnc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "RTP RaptorQ FEC Encoder", + "RTP RaptorQ FEC Encoding", + "Performs FEC using RaptorQ (RFC6681, RFC6682)", + "Tomasz Andrzejak ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::builder("application/x-rtp") + .field("clock-rate", gst::IntRange::new(0, std::i32::MAX)) + .build(); + + let srcpad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + let sinkpad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + let srcpad_fec_template = gst::PadTemplate::new( + "fec_0", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + vec![srcpad_template, sinkpad_template, srcpad_fec_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + element: &Self::Type, + transition: gst::StateChange, + ) -> Result { + gst::trace!(CAT, obj: element, "Changing state {:?}", transition); + + match transition { + gst::StateChange::ReadyToPaused => { + self.start(element).map_err(|_| gst::StateChangeError)?; + } + gst::StateChange::PausedToReady => { + self.stop(element).map_err(|_| gst::StateChangeError)?; + } + _ => (), + } + + self.parent_change_state(element, transition) + } +} diff --git a/net/raptorq/src/raptorqenc/mod.rs b/net/raptorq/src/raptorqenc/mod.rs new file mode 100644 index 000000000..7214d6ddd --- /dev/null +++ b/net/raptorq/src/raptorqenc/mod.rs @@ -0,0 +1,23 @@ +// Copyright (C) 2022 Tomasz Andrzejak +// +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at http://mozilla.org/MPL/2.0/. + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct RaptorqEnc(ObjectSubclass) @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "raptorqenc", + gst::Rank::Marginal, + RaptorqEnc::static_type(), + ) +} diff --git a/net/raptorq/tests/raptorq.rs b/net/raptorq/tests/raptorq.rs new file mode 100644 index 000000000..8595c3071 --- /dev/null +++ b/net/raptorq/tests/raptorq.rs @@ -0,0 +1,618 @@ +// Copyright (C) 2022 Tomasz Andrzejak +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::prelude::*; + +use gst_rtp::rtp_buffer::*; +use gst_rtp::RTPBuffer; + +use rand::Rng; + +#[must_use] +struct RaptorqTest { + protected_packets: usize, + repair_packets: usize, + repair_window: usize, + symbol_size: usize, + mtu: usize, + initial_seq: u16, + lost_buffers: Vec, + swapped_buffers: Vec, + input_buffers: usize, + expect_output_buffers: usize, +} + +fn init() { + use std::sync::Once; + static INIT: Once = Once::new(); + + INIT.call_once(|| { + gst::init().unwrap(); + gstraptorq::plugin_register_static().expect("Failed to register raptorqenc plugin"); + }); +} + +impl RaptorqTest { + fn new() -> Self { + init(); + + let enc = gst::ElementFactory::make("raptorqenc", None).unwrap(); + + let protected_packets = enc.property::("protected-packets") as usize; + let repair_packets = enc.property::("repair-packets") as usize; + let repair_window = enc.property::("repair-window") as usize; + let symbol_size = enc.property::("symbol-size") as usize; + let mtu = enc.property::("mtu") as usize; + + Self { + protected_packets, + repair_packets, + repair_window, + symbol_size, + mtu, + initial_seq: 42, + lost_buffers: vec![0], + swapped_buffers: vec![], + input_buffers: protected_packets, + expect_output_buffers: protected_packets, + } + } + + fn protected_packets(mut self, protected_packets: usize) -> Self { + self.protected_packets = protected_packets; + self + } + + fn repair_packets(mut self, repair_packets: usize) -> Self { + self.repair_packets = repair_packets; + self + } + + fn repair_window(mut self, repair_window: usize) -> Self { + self.repair_window = repair_window; + self + } + + fn symbol_size(mut self, symbol_size: usize) -> Self { + self.symbol_size = symbol_size; + self + } + + fn initial_seq(mut self, initial_seq: u16) -> Self { + self.initial_seq = initial_seq; + self + } + + fn mtu(mut self, mtu: usize) -> Self { + self.mtu = mtu; + self + } + + fn lost_buffers(mut self, lost_buffers: Vec) -> Self { + self.lost_buffers = lost_buffers; + self + } + + fn swapped_buffers(mut self, swapped_buffers: Vec) -> Self { + self.swapped_buffers = swapped_buffers; + self + } + + fn input_buffers(mut self, input_buffers: usize) -> Self { + self.input_buffers = input_buffers; + self + } + + fn expect_output_buffers(mut self, expect_output_buffers: usize) -> Self { + self.expect_output_buffers = expect_output_buffers; + self + } + + fn run(self) { + assert!(self.input_buffers >= self.protected_packets); + + // 1. Decoder Setup: + let enc = gst::ElementFactory::make("raptorqenc", None).unwrap(); + + enc.set_property("protected-packets", self.protected_packets as u32); + enc.set_property("repair-packets", self.repair_packets as u32); + enc.set_property("repair-window", self.repair_window as u32); + enc.set_property("symbol-size", self.symbol_size as u32); + enc.set_property("mtu", self.mtu as u32); + + let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src")); + let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0")); + + h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000"); + + // 2. Decoder Setup: + let dec = gst::ElementFactory::make("raptorqdec", None).unwrap(); + + let mut h_dec = gst_check::Harness::with_element(&dec, Some("sink"), Some("src")); + let mut h_dec_fec = gst_check::Harness::with_element(&dec, Some("fec_0"), None); + + let caps = gst::Caps::builder("application/x-rtp") + .field("raptor-scheme-id", "6") + .field("repair-window", "1000000") + .field("t", self.symbol_size.to_string()) + .build(); + + h_dec.set_src_caps_str("application/x-rtp"); + h_dec_fec.set_src_caps(caps); + + let mut rng = rand::thread_rng(); + + let input_buffers = (0..self.input_buffers) + .map(|i| { + // payload size without RTP Header and ADUI Header + let size = rng.gen_range(1..self.mtu - 12 - 3); + let data = (0..size).map(|_| rng.gen()).collect::>(); + + let mut buf = gst::Buffer::new_rtp_with_sizes(size as u32, 0, 0).unwrap(); + { + let buf_mut = buf.get_mut().unwrap(); + buf_mut.set_pts(gst::ClockTime::ZERO); + buf_mut.set_dts(gst::ClockTime::ZERO); + + let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap(); + let payload = rtpbuf.payload_mut().unwrap(); + + payload.copy_from_slice(data.as_slice()); + rtpbuf.set_seq(self.initial_seq.wrapping_add(i as u16)); + rtpbuf.set_timestamp(0); + } + + buf + }) + .collect::>(); + + // 3. Encoder Operations: + + // Do not consume buffers here so we can compare it with the output + for buf in &input_buffers { + let result = h_enc.push(buf.clone()); + assert!(result.is_ok()); + } + + assert_eq!(h_enc.buffers_in_queue(), self.input_buffers as u32); + + let mut media_packets = (0..self.input_buffers) + .map(|_| { + let result = h_enc.pull(); + assert!(result.is_ok()); + result.unwrap() + }) + .collect::>(); + + // Simulate out of order packets + for x in self.swapped_buffers.chunks_exact(2) { + media_packets.swap(x[0], x[1]) + } + + // Check if repair packets pushed from encoder are delayed properly + let delay_step = + gst::ClockTime::from_mseconds((self.repair_window / self.repair_packets) as u64); + let mut delay = delay_step; + + let repair_packets = (0..self.repair_packets) + .map(|_| { + // Set time just before the timer to push the buffer fires up, + // we shouldn't see the buffer just yet. + h_enc_fec.set_time(delay - gst::ClockTime::NSECOND).unwrap(); + assert_eq!(h_enc_fec.buffers_in_queue(), 0); + + // Advance time to the delay and crank clock id, we should + // get a buffer with adjusted timestamps now. All input buffers + // have zero timestamp, so the pts/dts/rtp-timestamp should be + // equal to delay. + h_enc_fec.set_time(delay).unwrap(); + h_enc_fec.crank_single_clock_wait().unwrap(); + + let result = h_enc_fec.pull(); + assert!(result.is_ok()); + + let buf = result.unwrap(); + assert_eq!(buf.pts().unwrap(), delay); + assert_eq!(buf.dts().unwrap(), delay); + + let ts = RTPBuffer::from_buffer_readable(&buf).unwrap().timestamp(); + let expected_ts = + *delay.mul_div_round(*gst::ClockTime::SECOND, 8000).unwrap() as u32; + + assert_eq!(ts, expected_ts); + + delay += delay_step; + buf + }) + .collect::>(); + + // 4. Decoder Operations: + + // remove media packets to simulate packet loss + let media_packets = media_packets + .iter() + .cloned() + .enumerate() + .filter(|(i, _)| !self.lost_buffers.contains(i)) + .map(|(_, x)| x) + .collect::>(); + + // Push media packets to decoder + for buf in media_packets { + assert!(h_dec.push(buf).is_ok()); + } + + // Push repair packets to decoder + for buf in repair_packets { + assert!(h_dec_fec.push(buf).is_ok()); + } + + // At this point decoder has all the information it needs to + // recover packets, we just need an input buffer to run sink + // chain operations. + let result = h_dec.push(input_buffers.iter().last().unwrap().clone()); + assert!(result.is_ok()); + + let mut output_buffers = (0..self.expect_output_buffers) + .map(|_| { + let result = h_dec.pull(); + assert!(result.is_ok()); + result.unwrap() + }) + .collect::>(); + + // Output buffers are out of sequence, we should sort it by + // seqnum so we can compare them with input buffers. + output_buffers.sort_unstable_by(|a, b| { + let aa = RTPBuffer::from_buffer_readable(a).unwrap(); + let bb = RTPBuffer::from_buffer_readable(b).unwrap(); + + match gst_rtp::compare_seqnum(bb.seq(), aa.seq()) { + x if x > 0 => std::cmp::Ordering::Greater, + x if x < 0 => std::cmp::Ordering::Less, + _ => std::cmp::Ordering::Equal, + } + }); + + assert_eq!(output_buffers.len(), self.expect_output_buffers); + + if self.input_buffers == self.expect_output_buffers { + for (inbuf, outbuf) in Iterator::zip(input_buffers.iter(), output_buffers.iter()) { + let rtp1 = RTPBuffer::from_buffer_readable(inbuf).unwrap(); + let rtp2 = RTPBuffer::from_buffer_readable(outbuf).unwrap(); + + assert_eq!(rtp1.seq(), rtp2.seq()); + assert_eq!(rtp1.payload().unwrap(), rtp2.payload().unwrap()); + } + } + } +} + +#[test] +fn test_raptorq_all_default() { + RaptorqTest::new().run(); +} + +#[test] +fn test_raptorq_decoder_media_packets_out_of_sequence() { + RaptorqTest::new() + .swapped_buffers(vec![5, 10, 12, 15]) + .run(); +} + +#[test] +fn test_raptorq_10_percent_overhead() { + RaptorqTest::new() + .protected_packets(100) + .repair_packets(10) + .lost_buffers(vec![4, 42, 43, 44, 45]) + .input_buffers(100) + .expect_output_buffers(100) + .run(); +} + +#[test] +fn test_raptorq_5_percent_overhead() { + RaptorqTest::new() + .protected_packets(100) + .repair_packets(5) + .input_buffers(100) + .lost_buffers(vec![8, 11]) + .expect_output_buffers(100) + .run(); +} + +#[test] +fn test_raptorq_symbol_size_128() { + RaptorqTest::new() + .protected_packets(20) + .repair_packets(4) + .symbol_size(128) + .mtu(400) + .input_buffers(20) + .lost_buffers(vec![9]) + .expect_output_buffers(20) + .run(); +} + +#[test] +fn test_raptorq_symbol_size_192() { + RaptorqTest::new() + .protected_packets(20) + .repair_packets(4) + .symbol_size(192) + .mtu(999) + .input_buffers(20) + .lost_buffers(vec![16, 19]) + .expect_output_buffers(20) + .run(); +} + +#[test] +fn test_raptorq_symbol_size_1024() { + RaptorqTest::new() + .protected_packets(20) + .repair_packets(8) + .symbol_size(192) + .mtu(100) + .input_buffers(20) + .lost_buffers(vec![0, 1, 2, 3, 4, 5]) + .expect_output_buffers(20) + .run(); +} + +#[test] +fn test_raptorq_mtu_lt_symbol_size() { + RaptorqTest::new() + .protected_packets(20) + .repair_packets(8) + .symbol_size(1400) + .mtu(100) + .input_buffers(20) + .lost_buffers(vec![14, 15, 16, 17, 18, 19]) + .expect_output_buffers(20) + .run(); +} + +#[test] +fn test_raptorq_heavy_loss() { + RaptorqTest::new() + .protected_packets(40) + .repair_packets(8) + .input_buffers(40) + .lost_buffers(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9]) + .expect_output_buffers(30) + .run(); +} + +#[test] +fn test_raptorq_repair_window_100ms() { + RaptorqTest::new() + .protected_packets(10) + .repair_packets(10) + .repair_window(100) + .input_buffers(10) + .lost_buffers(vec![2, 6]) + .expect_output_buffers(10) + .run(); +} + +#[test] +fn test_raptorq_repair_window_500ms() { + RaptorqTest::new() + .protected_packets(8) + .repair_packets(2) + .repair_window(500) + .input_buffers(8) + .lost_buffers(vec![]) + .expect_output_buffers(8) + .run(); +} + +#[test] +fn test_raptorq_wrapping_sequence_number_1() { + RaptorqTest::new().initial_seq(u16::MAX - 5).run(); +} + +#[test] +fn test_raptorq_wrapping_sequence_number_2() { + RaptorqTest::new() + .initial_seq(u16::MAX - 5) + .swapped_buffers(vec![4, 5]) + .run(); +} + +#[test] +fn test_raptorq_wrapping_sequence_number_3() { + RaptorqTest::new() + .initial_seq(u16::MAX - 3) + .lost_buffers(vec![0, 1, 2, 8]) + .run(); +} + +#[test] +fn test_raptorq_encoder_flush_cancels_pending_timers() { + init(); + + let enc = gst::ElementFactory::make("raptorqenc", None).unwrap(); + + // Set delay to 5s, this way each buffer should be delayed by 1s + enc.set_property("repair-window", 5000u32); + enc.set_property("protected-packets", 5u32); + enc.set_property("repair-packets", 5u32); + + let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src")); + let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0")); + + h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000"); + + for i in 0u64..5 { + let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap(); + + let buf_mut = buf.get_mut().unwrap(); + buf_mut.set_pts(gst::ClockTime::SECOND * i); + + let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap(); + rtpbuf.set_seq(i as u16); + + drop(rtpbuf); + + let result = h_enc.push(buf); + assert!(result.is_ok()); + } + + // We want to check if flush cancels pending timers, last buffer of source + // block is at 5s, at 6s we should have 1 buffer qeued already, then we flush + // and move time to 10s. Flush should cancel pending timers and we should + // have no buffers at the output + h_enc_fec.set_time(gst::ClockTime::SECOND * 6).unwrap(); + h_enc_fec.crank_single_clock_wait().unwrap(); + + let result = h_enc_fec.pull(); + assert!(result.is_ok()); + + h_enc.push_event(gst::event::FlushStart::new()); + h_enc.push_event(gst::event::FlushStop::new(true)); + + h_enc_fec.set_time(gst::ClockTime::SECOND * 10).unwrap(); + + loop { + let event = h_enc.pull_event(); + + if let Ok(event) = event { + match event.view() { + gst::EventView::FlushStart(_) => { + continue; + } + gst::EventView::FlushStop(_) => { + break; + } + _ => (), + } + } + } + + assert_eq!(h_enc_fec.buffers_in_queue(), 0); + assert_eq!(h_enc_fec.testclock().unwrap().peek_id_count(), 0); +} + +#[test] +fn test_raptorq_repair_window_tolerance() { + init(); + + let enc = gst::ElementFactory::make("raptorqenc", None).unwrap(); + + // Set delay to 5s, this way each buffer should be delayed by 1s + enc.set_property("repair-window", 1000u32); + enc.set_property("protected-packets", 5u32); + enc.set_property("repair-packets", 5u32); + + let mut h_enc = gst_check::Harness::with_element(&enc, Some("sink"), Some("src")); + let mut h_enc_fec = gst_check::Harness::with_element(&enc, None, Some("fec_0")); + + h_enc.set_src_caps_str("application/x-rtp,clock-rate=8000"); + + for i in 0u64..5 { + let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap(); + + let buf_mut = buf.get_mut().unwrap(); + buf_mut.set_pts(gst::ClockTime::SECOND * i); + + let mut rtpbuf = RTPBuffer::from_buffer_writable(buf_mut).unwrap(); + rtpbuf.set_seq(i as u16); + + drop(rtpbuf); + + let result = h_enc.push(buf); + assert!(result.is_ok()); + } + + let dec = gst::ElementFactory::make("raptorqdec", None).unwrap(); + + dec.set_property("repair-window-tolerance", 1000u32); + + let mut h_dec = gst_check::Harness::with_element(&dec, Some("sink"), Some("src")); + let mut h_dec_fec = gst_check::Harness::with_element(&dec, Some("fec_0"), None); + + let caps = loop { + let event = h_enc_fec.pull_event(); + + if let Ok(event) = event { + #[allow(clippy::single_match)] + match event.view() { + gst::EventView::Caps(c) => { + break c.caps_owned(); + } + _ => (), + } + } + }; + + h_dec.set_src_caps_str("application/x-rtp"); + h_dec_fec.set_src_caps(caps); + + h_enc_fec.set_time(gst::ClockTime::from_seconds(1)).unwrap(); + + let result = h_enc.pull(); + assert!(result.is_ok()); + + let buf = result.unwrap(); + let result = h_dec.push(buf); + assert!(result.is_ok()); + + // Push some of repair packets to decoder, just not enough to recover + // media packets + for _ in 0..2 { + h_enc_fec.crank_single_clock_wait().unwrap(); + + let result = h_enc_fec.pull(); + assert!(result.is_ok()); + + let buf = result.unwrap(); + let result = h_dec_fec.push(buf); + assert!(result.is_ok()); + } + + let stats = h_dec.element().unwrap().property::("stats"); + assert_eq!( + stats + .get::("buffered-media-packets") + .expect("type error"), + 1 + ); + assert_eq!( + stats + .get::("buffered-repair-packets") + .expect("type error"), + 2 + ); + + // Media buffer is way beyond repair window which is 2 seconds, + // (repair_window (1s) + repair_window_tolerance (1s)), + // the decoder should drop buffered packets as they were kept for too long. + let mut buf = gst::Buffer::new_rtp_with_sizes(42, 0, 0).unwrap(); + let buf_mut = buf.get_mut().unwrap(); + buf_mut.set_pts(gst::ClockTime::SECOND * 10); + + let result = h_dec.push(buf); + assert!(result.is_ok()); + + let stats = h_dec.element().unwrap().property::("stats"); + assert_eq!( + stats + .get::("buffered-media-packets") + .expect("type error"), + 0 + ); + assert_eq!( + stats + .get::("buffered-repair-packets") + .expect("type error"), + 0 + ); +}