net/quinn: Add muxer and demuxer for RTP over QUIC

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1775>
This commit is contained in:
Sanchayan Maity 2024-08-08 22:06:10 +05:30
parent d5425c5225
commit accb6b02ea
6 changed files with 1324 additions and 0 deletions

View file

@ -6,6 +6,7 @@
// <https://mozilla.org/MPL/2.0/>. // <https://mozilla.org/MPL/2.0/>.
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use bytes::BufMut;
use gst::glib; use gst::glib;
use quinn::VarInt; use quinn::VarInt;
@ -84,3 +85,93 @@ impl Default for QuinnQuicTransportConfig {
} }
} }
} }
// Taken from quinn-rs.
pub fn get_varint_size(val: u64) -> usize {
if val < 2u64.pow(6) {
1
} else if val < 2u64.pow(14) {
2
} else if val < 2u64.pow(30) {
4
} else if val < 2u64.pow(62) {
8
} else {
unreachable!("malformed VarInt");
}
}
// Adapted from quinn-rs.
pub fn get_varint(data: &[u8]) -> Option<(u64 /* VarInt value */, usize /* VarInt length */)> {
if data.is_empty() {
return None;
}
let data_length = data.len();
let tag = data[0] >> 6;
match tag {
0b00 => {
let mut slice = [0; 1];
slice.clone_from_slice(&data[..1]);
slice[0] &= 0b0011_1111;
Some((u64::from(slice[0]), 1))
}
0b01 => {
if data_length < 2 {
return None;
}
let mut buf = [0; 2];
buf.clone_from_slice(&data[..2]);
buf[0] &= 0b0011_1111;
Some((
u64::from(u16::from_be_bytes(buf[..2].try_into().unwrap())),
2,
))
}
0b10 => {
if data_length < 4 {
return None;
}
let mut buf = [0; 4];
buf.clone_from_slice(&data[..4]);
buf[0] &= 0b0011_1111;
Some((
u64::from(u32::from_be_bytes(buf[..4].try_into().unwrap())),
4,
))
}
0b11 => {
if data_length < 8 {
return None;
}
let mut buf = [0; 8];
buf.clone_from_slice(&data[..8]);
buf[0] &= 0b0011_1111;
Some((u64::from_be_bytes(buf[..8].try_into().unwrap()), 8))
}
_ => unreachable!(),
}
}
// Taken from quinn-rs.
pub fn set_varint<B: BufMut>(data: &mut B, val: u64) {
if val < 2u64.pow(6) {
data.put_u8(val as u8);
} else if val < 2u64.pow(14) {
data.put_u16(0b01 << 14 | val as u16);
} else if val < 2u64.pow(30) {
data.put_u32(0b10 << 30 | val as u32);
} else if val < 2u64.pow(62) {
data.put_u64(0b11 << 62 | val);
} else {
unreachable!("malformed varint");
}
}

View file

@ -26,6 +26,8 @@ mod quinnquicmux;
pub mod quinnquicquery; pub mod quinnquicquery;
mod quinnquicsink; mod quinnquicsink;
mod quinnquicsrc; mod quinnquicsrc;
mod quinnroqdemux;
mod quinnroqmux;
mod utils; mod utils;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
@ -36,6 +38,8 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
} }
quinnquicdemux::register(plugin)?; quinnquicdemux::register(plugin)?;
quinnquicmux::register(plugin)?; quinnquicmux::register(plugin)?;
quinnroqmux::register(plugin)?;
quinnroqdemux::register(plugin)?;
quinnquicsink::register(plugin)?; quinnquicsink::register(plugin)?;
quinnquicsrc::register(plugin)?; quinnquicsrc::register(plugin)?;

View file

@ -0,0 +1,664 @@
// Copyright (C) 2024, Asymptotic Inc.
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
// Implements RTP over QUIC as per the following specification,
// https://datatracker.ietf.org/doc/draft-ietf-avtcore-rtp-over-quic/
use crate::common::*;
use crate::quinnquicmeta::QuinnQuicMeta;
use crate::quinnquicquery::*;
use bytes::{Buf, BytesMut};
use gst::{glib, prelude::*, subclass::prelude::*};
use std::collections::{HashMap, VecDeque};
use std::io::{self, Cursor, Read};
use std::ops::{Range, RangeBounds, RangeFrom};
use std::sync::{LazyLock, Mutex};
const SIGNAL_FLOW_ID_MAP: &str = "request-flow-id-map";
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"quinnroqdemux",
gst::DebugColorFlags::empty(),
Some("Quinn RTP over QUIC Demuxer"),
)
});
struct Reassembler {
buffer: BytesMut,
last_packet_len: u64,
// Track the timestamp of the last buffer pushed. Chunk boundaries
// in QUIC do not correspond to peer writes, and hence cannot be
// used for framing. BaseSrc will timestamp the buffers captured
// with running time. We make a best case effort to timestamp the
// reassembled packets by using the timestamp of the last buffer
// pushed after which a reassembled packet becomes available.
last_ts: Option<gst::ClockTime>,
// Source pad which this reassembler is for, primarily used for
// trace logging.
pad: gst::Pad,
}
impl Reassembler {
fn new(pad: gst::Pad) -> Self {
Self {
// `quinnquicsrc` reads 4096 bytes at a time by default
// which is the default BaseSrc blocksize.
buffer: BytesMut::with_capacity(4096),
// Track the length of the packet we are reassembling
last_packet_len: 0,
last_ts: None,
pad,
}
}
fn build_buffer(&mut self, packet_sz: u64) -> gst::Buffer {
let packet = self.buffer.split_to(packet_sz as usize);
let mut buffer = gst::Buffer::with_size(packet_sz as usize).unwrap();
{
let buffer_mut = buffer.get_mut().unwrap();
{
let mut buf_mut = buffer_mut.map_writable().unwrap();
buf_mut.clone_from_slice(&packet);
}
// Set DTS and let downstream manage PTS using jitterbuffer
// as jitterbuffer will do the DTS -> PTS work.
buffer_mut.set_dts(self.last_ts);
}
buffer.to_owned()
}
fn push_buffer(
&mut self,
buffer: &mut gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
if buffer.size() == 0 {
return Ok(gst::FlowSuccess::Ok);
}
self.last_ts = buffer.dts();
let buffer_mut = buffer.get_mut().ok_or(gst::FlowError::Error)?;
let map = buffer_mut
.map_readable()
.map_err(|_| gst::FlowError::Error)?;
let buffer_slice = map.as_slice();
self.buffer.extend_from_slice(buffer_slice);
gst::trace!(
CAT,
obj = self.pad,
"Added buffer of {} bytes, current buffer size: {}",
buffer_slice.len(),
self.buffer.len()
);
Ok(gst::FlowSuccess::Ok)
}
fn push_bytes(
&mut self,
buffer: &[u8],
dts: Option<gst::ClockTime>,
) -> Result<gst::FlowSuccess, gst::FlowError> {
if buffer.is_empty() {
return Ok(gst::FlowSuccess::Ok);
}
self.last_ts = dts;
self.buffer.extend_from_slice(buffer);
gst::trace!(
CAT,
obj = self.pad,
"Added {} bytes, current buffer size: {}",
buffer.len(),
self.buffer.len()
);
Ok(gst::FlowSuccess::Ok)
}
fn pop(&mut self) -> Option<gst::Buffer> {
if self.buffer.is_empty() {
return None;
}
self.last_ts?;
if self.last_packet_len != 0 {
// In the middle of reassembling a packet
if self.buffer.len() > self.last_packet_len as usize {
// Enough data to reassemble the packet
let buffer = self.build_buffer(self.last_packet_len);
self.last_packet_len = 0;
gst::trace!(
CAT,
obj = self.pad,
"Reassembled packet of size: {}, current buffer size: {}",
buffer.size(),
self.buffer.len()
);
return Some(buffer.to_owned());
} else {
// Do not have enough data yet to reassemble the packet
return None;
}
}
let (packet_sz, packet_sz_len) = get_varint(&self.buffer)?;
gst::trace!(
CAT,
obj = self.pad,
"Reassembler, packet size length: {}, packet: {}, buffer: {}",
packet_sz_len,
packet_sz,
self.buffer.len(),
);
self.buffer.advance(packet_sz_len);
if packet_sz > self.buffer.len() as u64 {
// Packet will span multiple buffers
self.last_packet_len = packet_sz;
gst::trace!(
CAT,
obj = self.pad,
"Accumulating for packet of size: {}, current buffer size: {}",
packet_sz,
self.buffer.len()
);
return None;
}
let buffer = self.build_buffer(packet_sz);
gst::trace!(
CAT,
obj = self.pad,
"Reassembled packet of size: {}, current buffer size: {}",
buffer.size(),
self.buffer.len()
);
Some(buffer.to_owned())
}
}
#[derive(Default)]
struct Started {
stream_pads_map: HashMap<u64 /* Stream ID */, (gst::Pad, Reassembler)>,
datagram_pads_map: HashMap<u64 /* Flow ID */, gst::Pad>,
}
#[derive(Default)]
enum State {
#[default]
Stopped,
Started(Started),
}
pub struct QuinnRoqDemux {
state: Mutex<State>,
sinkpad: gst::Pad,
}
impl GstObjectImpl for QuinnRoqDemux {}
impl ElementImpl for QuinnRoqDemux {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> = LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"Quinn RTP over QUIC Demultiplexer",
"Source/Network/QUIC",
"Demultiplexes multiple RTP streams over QUIC",
"Sanchayan Maity <sanchayan@asymptotic.io>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<gst::PadTemplate>> = LazyLock::new(|| {
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
gst::PadPresence::Always,
&gst::Caps::new_any(),
)
.unwrap();
let src_caps = gst::Caps::builder("application/x-rtp").build();
let src_pad_template = gst::PadTemplate::new(
"src_%u", // src_<flow-id>
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&src_caps,
)
.unwrap();
vec![sink_pad_template, src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn change_state(
&self,
transition: gst::StateChange,
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
let ret = self.parent_change_state(transition)?;
if let gst::StateChange::NullToReady = transition {
let mut state = self.state.lock().unwrap();
*state = State::Started(Started::default());
}
Ok(ret)
}
}
impl ObjectImpl for QuinnRoqDemux {
fn signals() -> &'static [glib::subclass::Signal] {
static SIGNALS: LazyLock<Vec<glib::subclass::Signal>> = LazyLock::new(|| {
/*
* See section 5.1 of RTP over QUIC specification.
*
* Endpoints need to associate flow identifiers with RTP
* streams. Depending on the context of the application,
* the association can be statically configured, signaled
* using an out-of-band signaling mechanism (e.g., SDP),
* or applications might be able to identify the stream
* based on the RTP packets sent on the stream (e.g., by
* inspecting the payload type).
*
* In this implementation, we use this signal to associate
* the flow-id with an incoming stream by requesting caps.
* If no caps are provided, the pipeline will fail with a
* flow error.
*
* TODO: Close the connection with ROQ_UNKNOWN_FLOW_ID error
* code if the signal fails. This will have to be communicated
* upstream to quinnquicsrc.
*/
vec![glib::subclass::Signal::builder(SIGNAL_FLOW_ID_MAP)
.param_types([u64::static_type()])
.return_type::<gst::Caps>()
.build()]
});
SIGNALS.as_ref()
}
fn constructed(&self) {
self.parent_constructed();
self.obj()
.add_pad(&self.sinkpad)
.expect("Failed to add sink pad");
}
}
#[glib::object_subclass]
impl ObjectSubclass for QuinnRoqDemux {
const NAME: &'static str = "GstQuinnQuicRtpDemux";
type Type = super::QuinnRoqDemux;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
let sinkpad = gst::Pad::builder_from_template(&klass.pad_template("sink").unwrap())
.chain_function(|_pad, parent, buffer| {
QuinnRoqDemux::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|demux| demux.sink_chain(buffer),
)
})
.event_function(|pad, parent, event| {
QuinnRoqDemux::catch_panic_pad_function(
parent,
|| false,
|demux| demux.sink_event(pad, event),
)
})
.build();
Self {
state: Mutex::new(State::default()),
sinkpad,
}
}
}
impl ChildProxyImpl for QuinnRoqDemux {
fn children_count(&self) -> u32 {
let object = self.obj();
object.num_pads() as u32
}
fn child_by_name(&self, name: &str) -> Option<glib::Object> {
let object = self.obj();
object
.pads()
.into_iter()
.find(|p| p.name() == name)
.map(|p| p.upcast())
}
fn child_by_index(&self, index: u32) -> Option<glib::Object> {
let object = self.obj();
object
.pads()
.into_iter()
.nth(index as usize)
.map(|p| p.upcast())
}
}
impl QuinnRoqDemux {
fn add_srcpad_for_flowid(&self, flow_id: u64) -> Result<gst::Pad, gst::FlowError> {
let caps = self
.obj()
.emit_by_name::<Option<gst::Caps>>(SIGNAL_FLOW_ID_MAP, &[&(flow_id)])
.ok_or_else(|| {
gst::error!(CAT, imp = self, "Could not get caps for flow-id {flow_id}");
gst::FlowError::Error
})?;
let src_pad_name = format!("src_{flow_id}");
let templ = self.obj().pad_template("src_%u").unwrap();
let srcpad = gst::Pad::builder_from_template(&templ)
.name(src_pad_name.clone())
.build();
srcpad.set_active(true).unwrap();
let stream_start_evt = gst::event::StreamStart::builder(&flow_id.to_string())
.group_id(gst::GroupId::next())
.build();
srcpad.push_event(stream_start_evt);
gst::log!(
CAT,
imp = self,
"Caps {caps:?}, received for pad {src_pad_name} for flow-id {flow_id}"
);
srcpad.push_event(gst::event::Caps::new(&caps));
let segment_evt = gst::event::Segment::new(&gst::FormattedSegment::<gst::ClockTime>::new());
srcpad.push_event(segment_evt);
self.obj().add_pad(&srcpad).expect("Failed to add pad");
gst::trace!(
CAT,
imp = self,
"Added pad {src_pad_name} for flow-id {flow_id}"
);
Ok(srcpad)
}
fn remove_pad(&self, stream_id: u64) -> bool {
gst::debug!(CAT, imp = self, "Removing pad for stream id {stream_id}");
let mut state = self.state.lock().unwrap();
let stream_pad = if let State::Started(ref mut state) = *state {
state.stream_pads_map.remove(&stream_id)
} else {
None
};
drop(state);
if let Some((pad, reassembler)) = stream_pad {
drop(reassembler);
let _ = pad.set_active(false);
if let Err(err) = self.obj().remove_pad(&pad) {
gst::error!(
CAT,
imp = self,
"Failed to remove pad {} for stream id {stream_id}, error: {err:?}",
pad.name()
);
return false;
} else {
gst::log!(
CAT,
imp = self,
"Pad {} removed for stream id {stream_id}",
pad.name()
);
return true;
}
}
false
}
fn sink_chain(&self, buffer: gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
let meta = buffer.meta::<QuinnQuicMeta>();
match meta {
Some(m) => {
if m.is_datagram() {
self.datagram_sink_chain(buffer)
} else {
let stream_id = m.stream_id();
self.stream_sink_chain(buffer, stream_id)
}
}
None => {
gst::warning!(CAT, imp = self, "Buffer dropped, no metadata");
Ok(gst::FlowSuccess::Ok)
}
}
}
fn datagram_sink_chain(
&self,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
/*
* See section 5.3 of RTP over QUIC specification.
*
* DATAGRAMs preserve application frame boundaries. Thus, a
* single RTP packet can be mapped to a single DATAGRAM without
* additional framing.
*
* Since datagrams preserve framing we do not need packet
* reassembly here.
*/
let mut state = self.state.lock().unwrap();
if let State::Started(ref mut started) = *state {
let dts = buffer.dts();
let buf_mut = buffer.get_mut().ok_or(gst::FlowError::Error)?;
let map = buf_mut.map_readable().map_err(|_| gst::FlowError::Error)?;
let data = map.as_slice();
let varint = get_varint(data);
if varint.is_none() {
gst::error!(CAT, imp = self, "Unexpected VarInt parse error");
return Err(gst::FlowError::Error);
}
let (flow_id, flow_id_len) = {
let (flow_id, flow_id_len) = varint.unwrap();
(flow_id, flow_id_len)
};
gst::trace!(CAT, imp = self, "Got buffer with flow-id {flow_id}",);
let mut outbuf = buf_mut
.copy_region(gst::BufferCopyFlags::all(), flow_id_len..)
.unwrap();
{
let outbuf_mut = outbuf.get_mut().ok_or(gst::FlowError::Error)?;
// Set DTS and let downstream manage PTS using jitterbuffer
// as jitterbuffer will do the DTS -> PTS work.
outbuf_mut.set_dts(dts);
}
match started.datagram_pads_map.get_mut(&flow_id) {
Some(srcpad) => {
gst::trace!(
CAT,
obj = srcpad,
"Pushing buffer of {} bytes with dts: {:?}",
outbuf.size(),
outbuf.dts(),
);
return srcpad.push(outbuf);
}
None => {
let srcpad = self.add_srcpad_for_flowid(flow_id)?;
gst::trace!(
CAT,
obj = srcpad,
"Pushing buffer of {} bytes with dts: {:?}",
outbuf.size(),
outbuf.dts(),
);
started.datagram_pads_map.insert(flow_id, srcpad.clone());
return srcpad.push(outbuf);
}
}
}
Ok(gst::FlowSuccess::Ok)
}
fn stream_sink_chain(
&self,
mut buffer: gst::Buffer,
stream_id: u64,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
if let State::Started(ref mut started) = *state {
match started.stream_pads_map.get_mut(&stream_id) {
Some((srcpad, reassembler)) => {
reassembler.push_buffer(&mut buffer)?;
while let Some(buffer) = reassembler.pop() {
gst::trace!(
CAT,
obj = srcpad,
"Pushing buffer of {} bytes with dts: {:?} for stream: {stream_id}",
buffer.size(),
buffer.dts(),
);
if let Err(err) = srcpad.push(buffer) {
gst::error!(CAT, imp = self, "Failed to push buffer: {err}");
return Err(gst::FlowError::Error);
}
}
return Ok(gst::FlowSuccess::Ok);
}
None => {
let dts = buffer.dts();
let buf_mut = buffer.get_mut().ok_or(gst::FlowError::Error)?;
let map = buf_mut.map_readable().map_err(|_| gst::FlowError::Error)?;
let data = map.as_slice();
let varint = get_varint(data);
if varint.is_none() {
gst::error!(CAT, imp = self, "Unexpected VarInt parse error");
return Err(gst::FlowError::Error);
}
// We have not seen the flow id for this pad
let (flow_id, flow_id_len) = {
let (flow_id, flow_id_len) = varint.unwrap();
(flow_id, flow_id_len)
};
gst::trace!(
CAT,
imp = self,
"Got stream-id {stream_id} with flow-id {flow_id} of length {flow_id_len} {}",
data.len()
);
let srcpad = self.add_srcpad_for_flowid(flow_id)?;
let mut reassembler = Reassembler::new(srcpad.clone());
reassembler.push_bytes(&data[flow_id_len..], dts)?;
while let Some(buffer) = reassembler.pop() {
gst::trace!(
CAT,
obj = srcpad,
"Pushing output buffer of size: {} with dts: {:?} for stream: {stream_id}",
buffer.size(),
buffer.dts(),
);
if let Err(err) = srcpad.push(buffer) {
gst::error!(CAT, imp = self, "Failed to push buffer: {err}");
return Err(gst::FlowError::Error);
}
}
started
.stream_pads_map
.insert(stream_id, (srcpad, reassembler));
return Ok(gst::FlowSuccess::Ok);
}
}
}
Ok(gst::FlowSuccess::Ok)
}
fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
use gst::EventView;
gst::debug!(CAT, imp = self, "Handling event {:?}", event);
if let EventView::CustomDownstream(ev) = event.view() {
if let Some(s) = ev.structure() {
if s.name() == QUIC_STREAM_CLOSE_CUSTOMDOWNSTREAM_EVENT {
if let Ok(stream_id) = s.get::<u64>(QUIC_STREAM_ID) {
return self.remove_pad(stream_id);
}
}
}
}
gst::Pad::event_default(pad, Some(&*self.obj()), event)
}
}

View file

@ -0,0 +1,31 @@
// Copyright (C) 2024, Asymptotic Inc.
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/**
* element-quinnroqdemux:
* @short-description: Supports stream demultiplexing of RTP packets over QUIC
*
*/
use gst::glib;
use gst::prelude::*;
pub mod imp;
glib::wrapper! {
pub struct QuinnRoqDemux(ObjectSubclass<imp::QuinnRoqDemux>) @extends gst::Element, gst::Object, @implements gst::ChildProxy;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"quinnroqdemux",
gst::Rank::NONE,
QuinnRoqDemux::static_type(),
)
}

View file

@ -0,0 +1,499 @@
// Copyright (C) 2024, Asymptotic Inc.
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
// Implements RTP over QUIC as per the following specification,
// https://datatracker.ietf.org/doc/draft-ietf-avtcore-rtp-over-quic/
use crate::common::*;
use crate::quinnquicmeta::QuinnQuicMeta;
use crate::quinnquicquery::*;
use gst::{glib, prelude::*, subclass::prelude::*};
use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use itertools::Itertools;
use std::collections::HashMap;
use std::io::Read;
use std::sync::{LazyLock, Mutex};
const INITIAL_FLOW_ID: u64 = 1;
const MAXIMUM_FLOW_ID: u64 = (1 << 62) - 1;
const DEFAULT_STREAM_PRIORITY: i32 = 0;
static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
gst::DebugCategory::new(
"quinnroqmux",
gst::DebugColorFlags::empty(),
Some("Quinn RTP over QUIC Muxer"),
)
});
#[derive(Default)]
struct PadState {
flow_id_sent: bool,
stream_id: Option<u64>,
}
struct QuinnRoqMuxPadSettings {
flow_id: u64,
priority: i32,
}
impl Default for QuinnRoqMuxPadSettings {
fn default() -> Self {
Self {
flow_id: INITIAL_FLOW_ID,
priority: 0,
}
}
}
#[derive(Default)]
pub(crate) struct QuinnRoqMuxPad {
settings: Mutex<QuinnRoqMuxPadSettings>,
state: Mutex<PadState>,
}
#[glib::object_subclass]
impl ObjectSubclass for QuinnRoqMuxPad {
const NAME: &'static str = "QuinnRoqMuxPad";
type Type = super::QuinnRoqMuxPad;
type ParentType = gst::Pad;
}
impl ObjectImpl for QuinnRoqMuxPad {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![
glib::ParamSpecUInt64::builder("flow-id")
.nick("Flow identifier")
.blurb("Flow identifier")
.default_value(INITIAL_FLOW_ID)
.minimum(INITIAL_FLOW_ID)
.maximum(MAXIMUM_FLOW_ID)
.readwrite()
.build(),
glib::ParamSpecInt::builder("priority")
.nick("Priority of the stream, ignored by datagrams")
.blurb("Priority of the stream, ignored by datagrams")
.default_value(DEFAULT_STREAM_PRIORITY)
.readwrite()
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"flow-id" => {
let mut settings = self.settings.lock().unwrap();
settings.flow_id = value.get::<u64>().expect("type checked upstream");
}
"priority" => {
let mut settings = self.settings.lock().unwrap();
settings.priority = value.get::<i32>().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"flow-id" => {
let settings = self.settings.lock().unwrap();
settings.flow_id.to_value()
}
"priority" => {
let settings = self.settings.lock().unwrap();
settings.priority.to_value()
}
_ => unimplemented!(),
}
}
}
impl GstObjectImpl for QuinnRoqMuxPad {}
impl PadImpl for QuinnRoqMuxPad {}
impl ProxyPadImpl for QuinnRoqMuxPad {}
#[derive(Default)]
struct State {
stream_uni_conns: u64,
datagrams: u64,
}
pub struct QuinnRoqMux {
state: Mutex<State>,
srcpad: gst::Pad,
}
impl GstObjectImpl for QuinnRoqMux {}
impl ElementImpl for QuinnRoqMux {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: LazyLock<gst::subclass::ElementMetadata> = LazyLock::new(|| {
gst::subclass::ElementMetadata::new(
"Quinn RTP over QUIC Multiplexer",
"Source/Network/QUIC",
"Multiplexes multiple RTP streams over QUIC",
"Sanchayan Maity <sanchayan@asymptotic.io>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: LazyLock<Vec<gst::PadTemplate>> = LazyLock::new(|| {
let sink_caps = gst::Caps::builder("application/x-rtp").build();
let stream_pad_template = gst::PadTemplate::with_gtype(
"stream_%u",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&sink_caps,
super::QuinnRoqMuxPad::static_type(),
)
.unwrap();
let datagram_pad_template = gst::PadTemplate::with_gtype(
"datagram_%u",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&sink_caps,
super::QuinnRoqMuxPad::static_type(),
)
.unwrap();
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&gst::Caps::new_any(),
)
.unwrap();
vec![stream_pad_template, datagram_pad_template, src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
fn request_new_pad(
&self,
templ: &gst::PadTemplate,
_name: Option<&str>,
_caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
let mut state = self.state.lock().unwrap();
match templ.name_template() {
"stream_%u" => {
let sink_pad_name = format!("stream_{}", state.stream_uni_conns);
gst::debug!(CAT, imp = self, "Requesting pad {}", sink_pad_name);
let sinkpad = gst::PadBuilder::<super::QuinnRoqMuxPad>::from_template(templ)
.name(sink_pad_name.clone())
.chain_function(|pad, parent, buffer| {
QuinnRoqMux::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.rtp_stream_sink_chain(pad, buffer),
)
})
.flags(gst::PadFlags::FIXED_CAPS)
.build();
self.obj()
.add_pad(&sinkpad)
.expect("Failed to add sink pad");
state.stream_uni_conns += 1;
Some(sinkpad.upcast())
}
"datagram_%u" => {
if request_datagram(&self.srcpad) {
gst::warning!(CAT, imp = self, "Datagram unsupported by peer");
return None;
}
let sink_pad_name = format!("datagram_{}", state.datagrams);
gst::debug!(CAT, imp = self, "Requesting pad {}", sink_pad_name);
let sinkpad = gst::PadBuilder::<super::QuinnRoqMuxPad>::from_template(templ)
.name(sink_pad_name.clone())
.chain_function(|pad, parent, buffer| {
QuinnRoqMux::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.rtp_datagram_sink_chain(pad, buffer),
)
})
.flags(gst::PadFlags::FIXED_CAPS)
.build();
self.obj()
.add_pad(&sinkpad)
.expect("Failed to add sink pad");
state.datagrams += 1;
Some(sinkpad.upcast())
}
_ => None,
}
}
fn release_pad(&self, pad: &gst::Pad) {
pad.set_active(false).unwrap();
if pad.name().starts_with("stream") {
self.close_stream_for_pad(pad);
}
self.obj().remove_pad(pad).unwrap();
}
}
impl ObjectImpl for QuinnRoqMux {
fn constructed(&self) {
self.parent_constructed();
self.obj()
.add_pad(&self.srcpad)
.expect("Failed to add source pad");
}
}
#[glib::object_subclass]
impl ObjectSubclass for QuinnRoqMux {
const NAME: &'static str = "GstQuinnRoqMux";
type Type = super::QuinnRoqMux;
type ParentType = gst::Element;
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("src").unwrap();
let srcpad = gst::Pad::builder_from_template(&templ).build();
Self {
state: Mutex::new(State::default()),
srcpad,
}
}
}
impl ChildProxyImpl for QuinnRoqMux {
fn children_count(&self) -> u32 {
let object = self.obj();
object.num_pads() as u32
}
fn child_by_name(&self, name: &str) -> Option<glib::Object> {
let object = self.obj();
object
.pads()
.into_iter()
.find(|p| p.name() == name)
.map(|p| p.upcast())
}
fn child_by_index(&self, index: u32) -> Option<glib::Object> {
let object = self.obj();
object
.pads()
.into_iter()
.nth(index as usize)
.map(|p| p.upcast())
}
}
impl QuinnRoqMux {
fn rtp_datagram_sink_chain(
&self,
pad: &super::QuinnRoqMuxPad,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
/*
* As per section 5.2.1 of RTP over QUIC specification.
* Stream encapsulation format for ROQ datagrams is as
* follows:
*
* Payload {
* Flow Identifier(i)
* RTP Packet(..)
* }
*
* See section 5.3 of RTP over QUIC specification.
*
* DATAGRAMs preserve application frame boundaries. Thus, a
* single RTP packet can be mapped to a single DATAGRAM without
* additional framing. Because QUIC DATAGRAMs cannot be
* IP-fragmented (Section 5 of [RFC9221]), senders need to
* consider the header overhead associated with DATAGRAMs, and
* ensure that the RTP packets, including their payloads, flow
* identifier, QUIC, and IP headers, will fit into the Path MTU.
*/
let mux_pad_settings = pad.imp().settings.lock().unwrap();
let flow_id = mux_pad_settings.flow_id;
drop(mux_pad_settings);
let size = get_varint_size(flow_id);
let mut outbuf = gst::Buffer::with_size(size).unwrap();
{
let outbuffer = outbuf.get_mut().unwrap();
{
let mut map = outbuffer.map_writable().unwrap();
let mut data = map.as_mut_slice();
set_varint(&mut data, flow_id);
}
outbuffer.set_pts(buffer.pts());
outbuffer.set_dts(buffer.dts());
QuinnQuicMeta::add(outbuffer, 0, true);
}
outbuf.append(buffer);
self.srcpad.push(outbuf)
}
fn rtp_stream_sink_chain(
&self,
pad: &super::QuinnRoqMuxPad,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
/*
* As per section 5.2.1 of RTP over QUIC specification.
* Stream encapsulation format for ROQ streams is as
* follows:
*
* Payload {
* Flow Identifier(i)
* RTP Payload(..)
* }
*
* RTP Payload {
* Length(i)
* RTP Packet(..)
* }
*/
let mut pad_state = pad.imp().state.lock().unwrap();
let stream_id = match pad_state.stream_id {
Some(stream_id) => stream_id,
None => {
let mux_pad_settings = pad.imp().settings.lock().unwrap();
let priority = mux_pad_settings.priority;
drop(mux_pad_settings);
gst::info!(CAT, obj = pad, "Requesting stream with priority {priority}");
match request_stream(&self.srcpad, priority) {
Some(stream_id) => {
pad_state.stream_id = Some(stream_id);
stream_id
}
None => {
gst::error!(CAT, obj = pad, "Failed to request stream");
return Err(gst::FlowError::Error);
}
}
}
};
if !pad_state.flow_id_sent {
let mux_pad_settings = pad.imp().settings.lock().unwrap();
let flow_id = mux_pad_settings.flow_id;
drop(mux_pad_settings);
let size = get_varint_size(flow_id);
let mut flow_id_buf = gst::Buffer::with_size(size).unwrap();
{
let buffer = flow_id_buf.get_mut().unwrap();
{
let mut map = buffer.map_writable().unwrap();
let mut data = map.as_mut_slice();
set_varint(&mut data, flow_id);
}
QuinnQuicMeta::add(buffer, stream_id, false);
}
if let Err(e) = self.srcpad.push(flow_id_buf) {
gst::error!(CAT, obj = pad, "Failed to push flow id buffer: {e:?}");
return Err(gst::FlowError::Error);
}
pad_state.flow_id_sent = true;
}
drop(pad_state);
let buf_sz_len = get_varint_size(buffer.size() as u64);
let mut outbuf = gst::Buffer::with_size(buf_sz_len).unwrap();
gst::trace!(
CAT,
obj = pad,
"Got input buffer of size: {}, pts: {:?}, dts: {:?} for stream: {stream_id}",
buffer.size(),
buffer.pts(),
buffer.dts(),
);
{
let outbuf = outbuf.get_mut().unwrap();
{
let mut obuf = outbuf.map_writable().unwrap();
let mut obuf_slice = obuf.as_mut_slice();
set_varint(&mut obuf_slice, buffer.size() as u64);
}
QuinnQuicMeta::add(outbuf, stream_id, false);
outbuf.set_pts(buffer.pts());
outbuf.set_dts(buffer.dts());
}
outbuf.append(buffer);
gst::trace!(
CAT,
obj = pad,
"Pushing buffer of {} bytes for stream: {stream_id}",
outbuf.size(),
);
self.srcpad.push(outbuf)
}
fn close_stream_for_pad(&self, pad: &gst::Pad) {
let mux_pad = pad.downcast_ref::<super::QuinnRoqMuxPad>().unwrap();
let pad_state = mux_pad.imp().state.lock().unwrap();
if let Some(stream_id) = pad_state.stream_id {
if close_stream(&self.srcpad, stream_id) {
gst::info!(CAT, obj = pad, "Closed connection");
} else {
gst::warning!(CAT, obj = pad, "Failed to close connection");
}
}
}
}

View file

@ -0,0 +1,35 @@
// Copyright (C) 2024, Asymptotic Inc.
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/**
* element-quinnroqmux:
* @short-description: Supports stream multiplexing of RTP packets over QUIC
*
*/
use gst::glib;
use gst::prelude::*;
pub mod imp;
glib::wrapper! {
pub struct QuinnRoqMux(ObjectSubclass<imp::QuinnRoqMux>) @extends gst_base::Aggregator, gst::Element, gst::Object, @implements gst::ChildProxy;
}
glib::wrapper! {
pub(crate) struct QuinnRoqMuxPad(ObjectSubclass<imp::QuinnRoqMuxPad>) @extends gst::ProxyPad, gst::Pad, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"quinnroqmux",
gst::Rank::NONE,
QuinnRoqMux::static_type(),
)
}