rtp: Add KLV RTP payloader/depayloader

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1580>
This commit is contained in:
Tim-Philipp Müller 2023-11-10 12:25:02 +00:00
parent 4259d284bd
commit 566e6443f4
8 changed files with 720 additions and 0 deletions

View file

@ -7253,6 +7253,58 @@
}, },
"rank": "marginal" "rank": "marginal"
}, },
"rtpklvdepay2": {
"author": "Tim-Philipp Müller <tim centricular com>",
"description": "Depayload an SMPTE ST 336 KLV metadata stream from RTP packets (RFC 6597)",
"hierarchy": [
"GstRtpKlvDepay2",
"GstRtpBaseDepay2",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Codec/Depayloader/Network/RTP",
"pad-templates": {
"sink": {
"caps": "application/x-rtp:\n media: application\n clock-rate: [ 1, 2147483647 ]\n encoding-name: SMPTE336M\n",
"direction": "sink",
"presence": "always"
},
"src": {
"caps": "meta/x-klv:\n parsed: true\n",
"direction": "src",
"presence": "always"
}
},
"rank": "marginal"
},
"rtpklvpay2": {
"author": "Tim-Philipp Müller <tim centricular com>",
"description": "Payload an SMPTE ST 336 KLV metadata stream into RTP packets (RFC 6597)",
"hierarchy": [
"GstRtpKlvPay2",
"GstRtpBasePay2",
"GstElement",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"klass": "Codec/Payloader/Network/RTP",
"pad-templates": {
"sink": {
"caps": "meta/x-klv:\n parsed: true\n",
"direction": "sink",
"presence": "always"
},
"src": {
"caps": "application/x-rtp:\n media: application\n encoding-name: SMPTE336M\n clock-rate: 90000\n",
"direction": "src",
"presence": "always"
}
},
"rank": "marginal"
},
"rtpmp2tdepay2": { "rtpmp2tdepay2": {
"author": "Tim-Philipp Müller <tim centricular com>", "author": "Tim-Philipp Müller <tim centricular com>",
"description": "Depayload an MPEG Transport Stream from RTP packets (RFC 2250)", "description": "Depayload an MPEG Transport Stream from RTP packets (RFC 2250)",

View file

@ -0,0 +1,321 @@
// GStreamer RTP KLV Metadata Depayloader
//
// Copyright (C) 2023 Tim-Philipp Müller <tim centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/**
* SECTION:element-rtpklvdepay2
* @see_also: rtpklvpay2, rtpklvdepay, rtpklvpay
*
* Depayload an SMPTE ST 336 KLV metadata stream from RTP packets as per [RFC 6597][rfc-6597].
*
* [rfc-6597]: https://www.rfc-editor.org/rfc/rfc6597.html
*
* ## Example pipeline
*
* |[
* gst-launch-1.0 udpsrc caps='application/x-rtp, media=(string)application, clock-rate=(int)90000, encoding-name=(string)SMPTE336M' ! rtpklvdepay2 ! fakesink dump=true
* ]| This will depayload an RTP KLV stream and display a hexdump of the KLV data on stdout.
* You can use the #rtpklvpay2 or #rtpklvpay elements to create such an RTP stream.
*
* Since: plugins-rs-0.13.0
*/
use atomic_refcell::AtomicRefCell;
use gst::{glib, subclass::prelude::*};
use once_cell::sync::Lazy;
use crate::basedepay::{
Packet, PacketToBufferRelation, RtpBaseDepay2Ext, RtpBaseDepay2Impl, RtpBaseDepay2ImplExt,
};
use crate::klv::klv_utils;
use std::cmp::Ordering;
#[derive(Debug, PartialEq)]
enum LooksLike {
Start,
SelfContained,
Undetermined,
}
#[derive(Default)]
pub struct RtpKlvDepay {
state: AtomicRefCell<State>,
}
#[derive(Default)]
struct State {
prev_marker_seqnum: Option<u64>,
accumulator: Vec<u8>,
acc_seqnum: Option<u64>,
acc_ts: Option<u64>,
}
impl State {
fn clear_accumulator(&mut self) {
self.accumulator.clear();
self.acc_seqnum = None;
self.acc_ts = None;
}
}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rtpklvdepay2",
gst::DebugColorFlags::empty(),
Some("RTP KLV Metadata Depayloader"),
)
});
#[glib::object_subclass]
impl ObjectSubclass for RtpKlvDepay {
const NAME: &'static str = "GstRtpKlvDepay2";
type Type = super::RtpKlvDepay;
type ParentType = crate::basedepay::RtpBaseDepay2;
}
impl ObjectImpl for RtpKlvDepay {}
impl GstObjectImpl for RtpKlvDepay {}
impl ElementImpl for RtpKlvDepay {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"RTP KLV Metadata Depayloader",
"Codec/Depayloader/Network/RTP",
"Depayload an SMPTE ST 336 KLV metadata stream from RTP packets (RFC 6597)",
"Tim-Philipp Müller <tim centricular com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&gst::Caps::builder_full()
.structure(
gst::Structure::builder("application/x-rtp")
.field("media", "application")
.field("clock-rate", gst::IntRange::new(1i32, i32::MAX))
.field("encoding-name", "SMPTE336M")
.build(),
)
.build(),
)
.unwrap();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&gst::Caps::builder("meta/x-klv")
.field("parsed", true)
.build(),
)
.unwrap();
vec![src_pad_template, sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}
impl RtpBaseDepay2Impl for RtpKlvDepay {
const ALLOWED_META_TAGS: &'static [&'static str] = &[];
fn set_sink_caps(&self, _caps: &gst::Caps) -> bool {
let src_caps = gst::Caps::builder("meta/x-klv")
.field("parsed", true)
.build();
self.obj().set_src_caps(&src_caps);
true
}
// https://www.rfc-editor.org/rfc/rfc6597.html#section-4.2
//
// We either get a full single KLV unit in an RTP packet, or a fragment of a single KLV unit.
//
fn handle_packet(&self, packet: &Packet) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.borrow_mut();
let payload = packet.payload();
// Clear out any unused accumulated data on discont or timestamp changes
if !state.accumulator.is_empty()
&& (packet.discont() || state.acc_ts != Some(packet.ext_timestamp()))
{
gst::debug!(CAT, imp: self,
"Discontinuity, discarding {} bytes in accumulator",
state.accumulator.len());
state.clear_accumulator();
}
let looks_like = match klv_utils::peek_klv(payload) {
Ok(klv_unit_size) => match payload.len().cmp(&klv_unit_size) {
Ordering::Equal => LooksLike::SelfContained,
Ordering::Less => LooksLike::Start,
Ordering::Greater => LooksLike::Undetermined, // Questionable?
},
_ => LooksLike::Undetermined,
};
// Packet looks like start or self-contained, or is directly after one with marker bit set?
let start = looks_like != LooksLike::Undetermined
|| match state.prev_marker_seqnum {
Some(prev_marker_seqnum) => packet.ext_seqnum() == (prev_marker_seqnum + 1),
None => false,
};
let end = packet.marker_bit() || looks_like == LooksLike::SelfContained;
gst::trace!(CAT, imp: self, "start: {start}, end: {end}, looks like: {looks_like:?}");
if end {
state.prev_marker_seqnum = Some(packet.ext_seqnum());
}
if start && looks_like == LooksLike::Undetermined {
gst::warning!(CAT, imp: self,
"New start, but data doesn't look like the start of a KLV unit?! Discarding");
state.clear_accumulator();
self.obj().drop_packet(packet);
return Ok(gst::FlowSuccess::Ok);
}
// Self-contained? Push out as-is, re-using the input buffer
if looks_like == LooksLike::SelfContained {
state.clear_accumulator();
gst::debug!(CAT, imp: self, "Finished KLV unit, pushing out {} bytes", payload.len());
return self
.obj()
.queue_buffer(packet.into(), packet.payload_buffer());
}
// .. else accumulate
if looks_like == LooksLike::Start {
if !state.accumulator.is_empty() {
gst::debug!(CAT, imp: self,
"New start, but still {} bytes in accumulator, discarding",
state.accumulator.len());
state.clear_accumulator();
}
state.accumulator.extend_from_slice(payload);
state.acc_seqnum = Some(packet.ext_seqnum());
state.acc_ts = Some(packet.ext_timestamp());
// if it looks like a start we know we don't have enough bytes yet
gst::debug!(CAT, imp: self,
"Start. Have {} bytes, but want {} bytes, waiting for more data",
state.accumulator.len(),
klv_utils::peek_klv(payload).unwrap(),
);
return Ok(gst::FlowSuccess::Ok);
}
// Continuation fragment
assert_eq!(looks_like, LooksLike::Undetermined);
if state.accumulator.is_empty() {
gst::debug!(CAT, imp: self,
"Continuation fragment, but no data in accumulator. Need to wait for start of next unit, discarding.");
self.obj().drop_packet(packet);
return Ok(gst::FlowSuccess::Ok);
}
state.accumulator.extend_from_slice(payload);
let Ok(klv_unit_size) = klv_utils::peek_klv(&state.accumulator) else {
gst::warning!(CAT, imp: self,
"Accumulator does not contain KLV unit start?! Clearing.");
state.clear_accumulator();
self.obj().drop_packet(packet);
return Ok(gst::FlowSuccess::Ok);
};
gst::log!(CAT, imp: self,
"Continuation. Have {} bytes, want {} bytes",
state.accumulator.len(),
klv_unit_size,
);
// Push out once we have enough data
if state.accumulator.len() >= klv_unit_size || end {
if state.accumulator.len() != klv_unit_size {
if state.accumulator.len() > klv_unit_size {
gst::warning!(CAT, imp: self, "More bytes than expected in accumulator!");
} else {
// For now we'll honour the marker bit unconditionally and don't second-guess it
gst::warning!(CAT, imp: self, "Fewer bytes than expected in accumulator, but marker bit set!");
}
}
let accumulator = std::mem::replace(
&mut state.accumulator,
Vec::<u8>::with_capacity(klv_unit_size),
);
gst::debug!(CAT, imp: self,
"Finished KLV unit, pushing out {} bytes", accumulator.len());
let outbuf = gst::Buffer::from_mut_slice(accumulator);
let first_seqnum = state.acc_seqnum.unwrap();
return self.obj().queue_buffer(
PacketToBufferRelation::Seqnums(first_seqnum..=packet.ext_seqnum()),
outbuf,
);
}
// .. else wait for more data
Ok(gst::FlowSuccess::Ok)
}
fn sink_event(&self, mut event: gst::Event) -> Result<gst::FlowSuccess, gst::FlowError> {
#[allow(clippy::single_match)]
match event.view() {
// Add SPARSE flag to stream-start event stream flags
gst::EventView::StreamStart(stream_start) => {
let stream_flags = stream_start.stream_flags();
let ev = event.make_mut();
let s = ev.structure_mut();
s.set("stream-flags", stream_flags | gst::StreamFlags::SPARSE);
}
_ => (),
};
self.parent_sink_event(event)
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
*self.state.borrow_mut() = State::default();
Ok(())
}
}

View file

@ -0,0 +1,28 @@
// GStreamer RTP KLV Metadata Depayloader
//
// Copyright (C) 2023 Tim-Philipp Müller <tim centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
pub mod imp;
glib::wrapper! {
pub struct RtpKlvDepay(ObjectSubclass<imp::RtpKlvDepay>)
@extends crate::basedepay::RtpBaseDepay2, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rtpklvdepay2",
gst::Rank::MARGINAL,
RtpKlvDepay::static_type(),
)
}

View file

@ -0,0 +1,86 @@
// GStreamer RTP KLV Metadata Utility Functions
//
// Copyright (C) 2023 Tim-Philipp Müller <tim centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
// Limit max KLV unit size allowed for now
const MAX_KLV_UNIT_LEN_ALLOWED: u64 = 32 * 1024 * 1024;
/// Errors that can be produced when peeking at KLV units
#[derive(thiserror::Error, Debug, PartialEq, Eq)]
pub(crate) enum KlvError {
#[error("Unexpected KLV unit length length {}", 0)]
UnexpectedLengthLength(usize),
#[error("Unexpectedly large KLV unit ({}), max allowed {}", 0, 1)]
UnitTooLarge(u64, u64),
#[error("Invalid header: {reason}")]
InvalidHeader { reason: &'static str },
}
// Or maybe return struct with named fields instead of tuple?
fn peek_klv_len(data: &[u8]) -> Result<(usize, usize), KlvError> {
use KlvError::*;
// Size already checked by caller peek_klv()
let first_byte = data[0];
if first_byte & 0x80 == 0 {
return Ok((1, first_byte as usize));
}
let len_len = (first_byte & 0x7f) as usize;
if len_len == 0 || len_len > 8 || data.len() < (1 + len_len) {
Err(UnexpectedLengthLength(len_len))?;
}
let len = data[1..=len_len]
.iter()
.fold(0u64, |acc, &elem| (acc << 8) + elem as u64);
assert!(MAX_KLV_UNIT_LEN_ALLOWED <= usize::MAX as u64);
// Check length in u64 before casting to usize (which might only be 4 bytes on some arches)
if len > MAX_KLV_UNIT_LEN_ALLOWED {
Err(UnitTooLarge(len, MAX_KLV_UNIT_LEN_ALLOWED))?;
}
let len = len as usize;
Ok((len_len + 1, len))
}
pub(crate) fn peek_klv(data: &[u8]) -> anyhow::Result<usize> {
use anyhow::Context;
use KlvError::*;
if data.len() < 17 {
Err(InvalidHeader {
reason: "Not enough data",
})?;
}
if !data.starts_with(&[0x06, 0x0e, 0x2b, 0x34]) {
Err(InvalidHeader {
reason: "No KLV Universal Label start code",
})?;
}
// UL Designator byte values shall be limited to the range 0x01 to 0x7F
if data[4..8].iter().any(|&b| b > 0x7f) {
Err(InvalidHeader {
reason: "Invalid KLV Universal Label designator",
})?;
}
let (len_len, value_len) = peek_klv_len(&data[16..]).context("length")?;
Ok(16 + len_len + value_len)
}

6
net/rtp/src/klv/mod.rs Normal file
View file

@ -0,0 +1,6 @@
// SPDX-License-Identifier: MPL-2.0
pub mod depay;
pub mod pay;
mod klv_utils;

195
net/rtp/src/klv/pay/imp.rs Normal file
View file

@ -0,0 +1,195 @@
// GStreamer RTP KLV Metadata Payloader
//
// Copyright (C) 2023 Tim-Philipp Müller <tim centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/**
* SECTION:element-rtpklvpay2
* @see_also: rtpklvdepay2, rtpklvpay, rtpklvdepay
*
* Payload an SMPTE ST 336 KLV metadata stream into RTP packets as per [RFC 6597][rfc-6597].
*
* [rfc-6597]: https://www.rfc-editor.org/rfc/rfc6597.html
*
* ## Example pipeline
*
* |[
* gst-launch-1.0 filesrc location=video-with-klv.ts ! tsdemux ! rtpklvpay2 ! udpsink
* ]| This example pipeline will payload an RTP KLV stream extracted from an
* MPEG-TS stream and send it via UDP to an RTP receiver. Note that `rtpklvpay2` expects the
* incoming KLV packets to be timestamped, which may not always be the case when they come from
* an MPEG-TS file. For testing purposes you can add artificial timestamps with e.g.
* `identity datarate=2560` for example (then each 256 byte packet will be timestamped 100ms apart).
*
* Since: plugins-rs-0.13.0
*/
use gst::{glib, subclass::prelude::*};
use once_cell::sync::Lazy;
use crate::basepay::{RtpBasePay2Ext, RtpBasePay2Impl};
use crate::klv::klv_utils;
#[derive(Default)]
pub struct RtpKlvPay {}
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"rtpklvpay2",
gst::DebugColorFlags::empty(),
Some("RTP KLV Metadata Payloader"),
)
});
#[glib::object_subclass]
impl ObjectSubclass for RtpKlvPay {
const NAME: &'static str = "GstRtpKlvPay2";
type Type = super::RtpKlvPay;
type ParentType = crate::basepay::RtpBasePay2;
}
impl ObjectImpl for RtpKlvPay {}
impl GstObjectImpl for RtpKlvPay {}
impl ElementImpl for RtpKlvPay {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"RTP KLV Metadata Payloader",
"Codec/Payloader/Network/RTP",
"Payload an SMPTE ST 336 KLV metadata stream into RTP packets (RFC 6597)",
"Tim-Philipp Müller <tim centricular com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&gst::Caps::builder("meta/x-klv")
.field("parsed", true)
.build(),
)
.unwrap();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&gst::Caps::builder_full()
.structure(
gst::Structure::builder("application/x-rtp")
.field("media", "application")
.field("encoding-name", "SMPTE336M")
.field("clock-rate", 90000i32)
.build(),
)
.build(),
)
.unwrap();
vec![src_pad_template, sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}
impl RtpBasePay2Impl for RtpKlvPay {
const ALLOWED_META_TAGS: &'static [&'static str] = &[];
fn set_sink_caps(&self, _caps: &gst::Caps) -> bool {
let src_caps = gst::Caps::builder("application/x-rtp")
.field("media", "application")
.field("encoding-name", "SMPTE336M")
.field("clock-rate", 90000i32)
.build();
self.obj().set_src_caps(&src_caps);
true
}
// https://www.rfc-editor.org/rfc/rfc6597.html#section-4.2
//
// We either fit our KLV unit(s) into a single RTP packet or have to split up the KLV unit(s).
//
fn handle_buffer(
&self,
buffer: &gst::Buffer,
id: u64,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let map = buffer.map_readable().map_err(|_| {
gst::error!(CAT, imp: self, "Can't map buffer readable");
gst::FlowError::Error
})?;
if map.size() == 0 {
gst::log!(CAT, imp: self, "Empty buffer, skipping");
self.obj().drop_buffers(id..=id);
return Ok(gst::FlowSuccess::Ok);
}
let max_payload_size = self.obj().max_payload_size() as usize;
let mut data = map.as_slice();
// KLV coding shall use and only use a fixed 16-byte SMPTE-administered
// Universal Label, according to SMPTE 298M as Key (Rec. ITU R-BT.1653-1)
let unit_len = match klv_utils::peek_klv(data) {
Ok(unit_len) => unit_len,
Err(err) => {
// Also post warning message?
gst::warning!(CAT, imp: self, "Input doesn't look like a KLV unit, ignoring. {err:?}");
return Ok(gst::FlowSuccess::Ok);
}
};
if unit_len != data.len() {
gst::error!(CAT, imp: self,
"Input is not properly framed: KLV unit of size {unit_len} but buffer is {} bytes",
data.len()
);
if unit_len > data.len() {
// Also post warning or error message?
return Ok(gst::FlowSuccess::Ok);
}
data = &data[0..unit_len];
}
// Data now contains exactly one KLV unit
while data.len() > max_payload_size {
self.obj().queue_packet(
id.into(),
rtp_types::RtpPacketBuilder::new().payload(&data[0..max_payload_size]),
)?;
data = &data[max_payload_size..];
}
// Single packet or last packet
self.obj().queue_packet(
id.into(),
rtp_types::RtpPacketBuilder::new()
.payload(data)
.marker_bit(true),
)
}
}
impl RtpKlvPay {}

View file

@ -0,0 +1,28 @@
// GStreamer RTP KLV Metadata Payloader
//
// Copyright (C) 2023 Tim-Philipp Müller <tim centricular com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
pub mod imp;
glib::wrapper! {
pub struct RtpKlvPay(ObjectSubclass<imp::RtpKlvPay>)
@extends crate::basepay::RtpBasePay2, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"rtpklvpay2",
gst::Rank::MARGINAL,
RtpKlvPay::static_type(),
)
}

View file

@ -28,6 +28,7 @@ mod basepay;
mod av1; mod av1;
mod jpeg; mod jpeg;
mod klv;
mod mp2t; mod mp2t;
mod mp4a; mod mp4a;
mod mp4g; mod mp4g;
@ -60,6 +61,9 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
jpeg::depay::register(plugin)?; jpeg::depay::register(plugin)?;
jpeg::pay::register(plugin)?; jpeg::pay::register(plugin)?;
klv::depay::register(plugin)?;
klv::pay::register(plugin)?;
mp2t::depay::register(plugin)?; mp2t::depay::register(plugin)?;
mp2t::pay::register(plugin)?; mp2t::pay::register(plugin)?;