diff --git a/net/quinn/src/common.rs b/net/quinn/src/common.rs index 89bf8e9b..877e557f 100644 --- a/net/quinn/src/common.rs +++ b/net/quinn/src/common.rs @@ -6,6 +6,7 @@ // . // // SPDX-License-Identifier: MPL-2.0 +use bytes::BufMut; use gst::glib; use quinn::VarInt; @@ -84,3 +85,93 @@ impl Default for QuinnQuicTransportConfig { } } } + +// Taken from quinn-rs. +pub fn get_varint_size(val: u64) -> usize { + if val < 2u64.pow(6) { + 1 + } else if val < 2u64.pow(14) { + 2 + } else if val < 2u64.pow(30) { + 4 + } else if val < 2u64.pow(62) { + 8 + } else { + unreachable!("malformed VarInt"); + } +} + +// Adapted from quinn-rs. +pub fn get_varint(data: &[u8]) -> Option<(u64 /* VarInt value */, usize /* VarInt length */)> { + if data.is_empty() { + return None; + } + + let data_length = data.len(); + let tag = data[0] >> 6; + + match tag { + 0b00 => { + let mut slice = [0; 1]; + slice.clone_from_slice(&data[..1]); + slice[0] &= 0b0011_1111; + + Some((u64::from(slice[0]), 1)) + } + 0b01 => { + if data_length < 2 { + return None; + } + + let mut buf = [0; 2]; + buf.clone_from_slice(&data[..2]); + buf[0] &= 0b0011_1111; + + Some(( + u64::from(u16::from_be_bytes(buf[..2].try_into().unwrap())), + 2, + )) + } + 0b10 => { + if data_length < 4 { + return None; + } + + let mut buf = [0; 4]; + buf.clone_from_slice(&data[..4]); + buf[0] &= 0b0011_1111; + + Some(( + u64::from(u32::from_be_bytes(buf[..4].try_into().unwrap())), + 4, + )) + } + 0b11 => { + if data_length < 8 { + return None; + } + + let mut buf = [0; 8]; + buf.clone_from_slice(&data[..8]); + buf[0] &= 0b0011_1111; + + Some((u64::from_be_bytes(buf[..8].try_into().unwrap()), 8)) + } + _ => unreachable!(), + } +} + +// Taken from quinn-rs. +pub fn set_varint(data: &mut B, val: u64) { + if val < 2u64.pow(6) { + data.put_u8(val as u8); + } else if val < 2u64.pow(14) { + data.put_u16(0b01 << 14 | val as u16); + } else if val < 2u64.pow(30) { + data.put_u32(0b10 << 30 | val as u32); + } else if val < 2u64.pow(62) { + data.put_u64(0b11 << 62 | val); + } else { + unreachable!("malformed varint"); + } +} diff --git a/net/quinn/src/lib.rs b/net/quinn/src/lib.rs index 432ca5f0..5ee69b31 100644 --- a/net/quinn/src/lib.rs +++ b/net/quinn/src/lib.rs @@ -26,6 +26,8 @@ mod quinnquicmux; pub mod quinnquicquery; mod quinnquicsink; mod quinnquicsrc; +mod quinnroqdemux; +mod quinnroqmux; mod utils; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { @@ -36,6 +38,8 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { } quinnquicdemux::register(plugin)?; quinnquicmux::register(plugin)?; + quinnroqmux::register(plugin)?; + quinnroqdemux::register(plugin)?; quinnquicsink::register(plugin)?; quinnquicsrc::register(plugin)?; diff --git a/net/quinn/src/quinnroqdemux/imp.rs b/net/quinn/src/quinnroqdemux/imp.rs new file mode 100644 index 00000000..9248ebd8 --- /dev/null +++ b/net/quinn/src/quinnroqdemux/imp.rs @@ -0,0 +1,664 @@ +// Copyright (C) 2024, Asymptotic Inc. +// Author: Sanchayan Maity +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +// Implements RTP over QUIC as per the following specification, +// https://datatracker.ietf.org/doc/draft-ietf-avtcore-rtp-over-quic/ + +use crate::common::*; +use crate::quinnquicmeta::QuinnQuicMeta; +use crate::quinnquicquery::*; +use bytes::{Buf, BytesMut}; +use gst::{glib, prelude::*, subclass::prelude::*}; +use std::collections::{HashMap, VecDeque}; +use std::io::{self, Cursor, Read}; +use std::ops::{Range, RangeBounds, RangeFrom}; +use std::sync::{LazyLock, Mutex}; + +const SIGNAL_FLOW_ID_MAP: &str = "request-flow-id-map"; + +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "quinnroqdemux", + gst::DebugColorFlags::empty(), + Some("Quinn RTP over QUIC Demuxer"), + ) +}); + +struct Reassembler { + buffer: BytesMut, + last_packet_len: u64, + // Track the timestamp of the last buffer pushed. Chunk boundaries + // in QUIC do not correspond to peer writes, and hence cannot be + // used for framing. BaseSrc will timestamp the buffers captured + // with running time. We make a best case effort to timestamp the + // reassembled packets by using the timestamp of the last buffer + // pushed after which a reassembled packet becomes available. + last_ts: Option, + // Source pad which this reassembler is for, primarily used for + // trace logging. + pad: gst::Pad, +} + +impl Reassembler { + fn new(pad: gst::Pad) -> Self { + Self { + // `quinnquicsrc` reads 4096 bytes at a time by default + // which is the default BaseSrc blocksize. + buffer: BytesMut::with_capacity(4096), + // Track the length of the packet we are reassembling + last_packet_len: 0, + last_ts: None, + pad, + } + } + + fn build_buffer(&mut self, packet_sz: u64) -> gst::Buffer { + let packet = self.buffer.split_to(packet_sz as usize); + + let mut buffer = gst::Buffer::with_size(packet_sz as usize).unwrap(); + { + let buffer_mut = buffer.get_mut().unwrap(); + { + let mut buf_mut = buffer_mut.map_writable().unwrap(); + buf_mut.clone_from_slice(&packet); + } + + // Set DTS and let downstream manage PTS using jitterbuffer + // as jitterbuffer will do the DTS -> PTS work. + buffer_mut.set_dts(self.last_ts); + } + + buffer.to_owned() + } + + fn push_buffer( + &mut self, + buffer: &mut gst::Buffer, + ) -> Result { + if buffer.size() == 0 { + return Ok(gst::FlowSuccess::Ok); + } + + self.last_ts = buffer.dts(); + + let buffer_mut = buffer.get_mut().ok_or(gst::FlowError::Error)?; + let map = buffer_mut + .map_readable() + .map_err(|_| gst::FlowError::Error)?; + let buffer_slice = map.as_slice(); + + self.buffer.extend_from_slice(buffer_slice); + + gst::trace!( + CAT, + obj = self.pad, + "Added buffer of {} bytes, current buffer size: {}", + buffer_slice.len(), + self.buffer.len() + ); + + Ok(gst::FlowSuccess::Ok) + } + + fn push_bytes( + &mut self, + buffer: &[u8], + dts: Option, + ) -> Result { + if buffer.is_empty() { + return Ok(gst::FlowSuccess::Ok); + } + + self.last_ts = dts; + + self.buffer.extend_from_slice(buffer); + + gst::trace!( + CAT, + obj = self.pad, + "Added {} bytes, current buffer size: {}", + buffer.len(), + self.buffer.len() + ); + + Ok(gst::FlowSuccess::Ok) + } + + fn pop(&mut self) -> Option { + if self.buffer.is_empty() { + return None; + } + + self.last_ts?; + + if self.last_packet_len != 0 { + // In the middle of reassembling a packet + if self.buffer.len() > self.last_packet_len as usize { + // Enough data to reassemble the packet + let buffer = self.build_buffer(self.last_packet_len); + + self.last_packet_len = 0; + + gst::trace!( + CAT, + obj = self.pad, + "Reassembled packet of size: {}, current buffer size: {}", + buffer.size(), + self.buffer.len() + ); + + return Some(buffer.to_owned()); + } else { + // Do not have enough data yet to reassemble the packet + return None; + } + } + + let (packet_sz, packet_sz_len) = get_varint(&self.buffer)?; + + gst::trace!( + CAT, + obj = self.pad, + "Reassembler, packet size length: {}, packet: {}, buffer: {}", + packet_sz_len, + packet_sz, + self.buffer.len(), + ); + + self.buffer.advance(packet_sz_len); + + if packet_sz > self.buffer.len() as u64 { + // Packet will span multiple buffers + self.last_packet_len = packet_sz; + + gst::trace!( + CAT, + obj = self.pad, + "Accumulating for packet of size: {}, current buffer size: {}", + packet_sz, + self.buffer.len() + ); + + return None; + } + + let buffer = self.build_buffer(packet_sz); + + gst::trace!( + CAT, + obj = self.pad, + "Reassembled packet of size: {}, current buffer size: {}", + buffer.size(), + self.buffer.len() + ); + + Some(buffer.to_owned()) + } +} + +#[derive(Default)] +struct Started { + stream_pads_map: HashMap, + datagram_pads_map: HashMap, +} + +#[derive(Default)] +enum State { + #[default] + Stopped, + Started(Started), +} + +pub struct QuinnRoqDemux { + state: Mutex, + sinkpad: gst::Pad, +} + +impl GstObjectImpl for QuinnRoqDemux {} + +impl ElementImpl for QuinnRoqDemux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + gst::subclass::ElementMetadata::new( + "Quinn RTP over QUIC Demultiplexer", + "Source/Network/QUIC", + "Demultiplexes multiple RTP streams over QUIC", + "Sanchayan Maity ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &gst::Caps::new_any(), + ) + .unwrap(); + + let src_caps = gst::Caps::builder("application/x-rtp").build(); + + let src_pad_template = gst::PadTemplate::new( + "src_%u", // src_ + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &src_caps, + ) + .unwrap(); + + vec![sink_pad_template, src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn change_state( + &self, + transition: gst::StateChange, + ) -> Result { + let ret = self.parent_change_state(transition)?; + + if let gst::StateChange::NullToReady = transition { + let mut state = self.state.lock().unwrap(); + *state = State::Started(Started::default()); + } + + Ok(ret) + } +} + +impl ObjectImpl for QuinnRoqDemux { + fn signals() -> &'static [glib::subclass::Signal] { + static SIGNALS: LazyLock> = LazyLock::new(|| { + /* + * See section 5.1 of RTP over QUIC specification. + * + * Endpoints need to associate flow identifiers with RTP + * streams. Depending on the context of the application, + * the association can be statically configured, signaled + * using an out-of-band signaling mechanism (e.g., SDP), + * or applications might be able to identify the stream + * based on the RTP packets sent on the stream (e.g., by + * inspecting the payload type). + * + * In this implementation, we use this signal to associate + * the flow-id with an incoming stream by requesting caps. + * If no caps are provided, the pipeline will fail with a + * flow error. + * + * TODO: Close the connection with ROQ_UNKNOWN_FLOW_ID error + * code if the signal fails. This will have to be communicated + * upstream to quinnquicsrc. + */ + vec![glib::subclass::Signal::builder(SIGNAL_FLOW_ID_MAP) + .param_types([u64::static_type()]) + .return_type::() + .build()] + }); + + SIGNALS.as_ref() + } + + fn constructed(&self) { + self.parent_constructed(); + + self.obj() + .add_pad(&self.sinkpad) + .expect("Failed to add sink pad"); + } +} + +#[glib::object_subclass] +impl ObjectSubclass for QuinnRoqDemux { + const NAME: &'static str = "GstQuinnQuicRtpDemux"; + type Type = super::QuinnRoqDemux; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let sinkpad = gst::Pad::builder_from_template(&klass.pad_template("sink").unwrap()) + .chain_function(|_pad, parent, buffer| { + QuinnRoqDemux::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |demux| demux.sink_chain(buffer), + ) + }) + .event_function(|pad, parent, event| { + QuinnRoqDemux::catch_panic_pad_function( + parent, + || false, + |demux| demux.sink_event(pad, event), + ) + }) + .build(); + + Self { + state: Mutex::new(State::default()), + sinkpad, + } + } +} + +impl ChildProxyImpl for QuinnRoqDemux { + fn children_count(&self) -> u32 { + let object = self.obj(); + object.num_pads() as u32 + } + + fn child_by_name(&self, name: &str) -> Option { + let object = self.obj(); + object + .pads() + .into_iter() + .find(|p| p.name() == name) + .map(|p| p.upcast()) + } + + fn child_by_index(&self, index: u32) -> Option { + let object = self.obj(); + object + .pads() + .into_iter() + .nth(index as usize) + .map(|p| p.upcast()) + } +} + +impl QuinnRoqDemux { + fn add_srcpad_for_flowid(&self, flow_id: u64) -> Result { + let caps = self + .obj() + .emit_by_name::>(SIGNAL_FLOW_ID_MAP, &[&(flow_id)]) + .ok_or_else(|| { + gst::error!(CAT, imp = self, "Could not get caps for flow-id {flow_id}"); + gst::FlowError::Error + })?; + + let src_pad_name = format!("src_{flow_id}"); + let templ = self.obj().pad_template("src_%u").unwrap(); + let srcpad = gst::Pad::builder_from_template(&templ) + .name(src_pad_name.clone()) + .build(); + + srcpad.set_active(true).unwrap(); + + let stream_start_evt = gst::event::StreamStart::builder(&flow_id.to_string()) + .group_id(gst::GroupId::next()) + .build(); + srcpad.push_event(stream_start_evt); + + gst::log!( + CAT, + imp = self, + "Caps {caps:?}, received for pad {src_pad_name} for flow-id {flow_id}" + ); + + srcpad.push_event(gst::event::Caps::new(&caps)); + + let segment_evt = gst::event::Segment::new(&gst::FormattedSegment::::new()); + srcpad.push_event(segment_evt); + + self.obj().add_pad(&srcpad).expect("Failed to add pad"); + + gst::trace!( + CAT, + imp = self, + "Added pad {src_pad_name} for flow-id {flow_id}" + ); + + Ok(srcpad) + } + + fn remove_pad(&self, stream_id: u64) -> bool { + gst::debug!(CAT, imp = self, "Removing pad for stream id {stream_id}"); + + let mut state = self.state.lock().unwrap(); + let stream_pad = if let State::Started(ref mut state) = *state { + state.stream_pads_map.remove(&stream_id) + } else { + None + }; + drop(state); + + if let Some((pad, reassembler)) = stream_pad { + drop(reassembler); + + let _ = pad.set_active(false); + + if let Err(err) = self.obj().remove_pad(&pad) { + gst::error!( + CAT, + imp = self, + "Failed to remove pad {} for stream id {stream_id}, error: {err:?}", + pad.name() + ); + return false; + } else { + gst::log!( + CAT, + imp = self, + "Pad {} removed for stream id {stream_id}", + pad.name() + ); + return true; + } + } + + false + } + + fn sink_chain(&self, buffer: gst::Buffer) -> Result { + let meta = buffer.meta::(); + + match meta { + Some(m) => { + if m.is_datagram() { + self.datagram_sink_chain(buffer) + } else { + let stream_id = m.stream_id(); + + self.stream_sink_chain(buffer, stream_id) + } + } + None => { + gst::warning!(CAT, imp = self, "Buffer dropped, no metadata"); + Ok(gst::FlowSuccess::Ok) + } + } + } + + fn datagram_sink_chain( + &self, + mut buffer: gst::Buffer, + ) -> Result { + /* + * See section 5.3 of RTP over QUIC specification. + * + * DATAGRAMs preserve application frame boundaries. Thus, a + * single RTP packet can be mapped to a single DATAGRAM without + * additional framing. + * + * Since datagrams preserve framing we do not need packet + * reassembly here. + */ + let mut state = self.state.lock().unwrap(); + + if let State::Started(ref mut started) = *state { + let dts = buffer.dts(); + let buf_mut = buffer.get_mut().ok_or(gst::FlowError::Error)?; + let map = buf_mut.map_readable().map_err(|_| gst::FlowError::Error)?; + + let data = map.as_slice(); + let varint = get_varint(data); + + if varint.is_none() { + gst::error!(CAT, imp = self, "Unexpected VarInt parse error"); + return Err(gst::FlowError::Error); + } + + let (flow_id, flow_id_len) = { + let (flow_id, flow_id_len) = varint.unwrap(); + (flow_id, flow_id_len) + }; + + gst::trace!(CAT, imp = self, "Got buffer with flow-id {flow_id}",); + + let mut outbuf = buf_mut + .copy_region(gst::BufferCopyFlags::all(), flow_id_len..) + .unwrap(); + { + let outbuf_mut = outbuf.get_mut().ok_or(gst::FlowError::Error)?; + // Set DTS and let downstream manage PTS using jitterbuffer + // as jitterbuffer will do the DTS -> PTS work. + outbuf_mut.set_dts(dts); + } + + match started.datagram_pads_map.get_mut(&flow_id) { + Some(srcpad) => { + gst::trace!( + CAT, + obj = srcpad, + "Pushing buffer of {} bytes with dts: {:?}", + outbuf.size(), + outbuf.dts(), + ); + + return srcpad.push(outbuf); + } + None => { + let srcpad = self.add_srcpad_for_flowid(flow_id)?; + + gst::trace!( + CAT, + obj = srcpad, + "Pushing buffer of {} bytes with dts: {:?}", + outbuf.size(), + outbuf.dts(), + ); + + started.datagram_pads_map.insert(flow_id, srcpad.clone()); + + return srcpad.push(outbuf); + } + } + } + + Ok(gst::FlowSuccess::Ok) + } + + fn stream_sink_chain( + &self, + mut buffer: gst::Buffer, + stream_id: u64, + ) -> Result { + let mut state = self.state.lock().unwrap(); + + if let State::Started(ref mut started) = *state { + match started.stream_pads_map.get_mut(&stream_id) { + Some((srcpad, reassembler)) => { + reassembler.push_buffer(&mut buffer)?; + + while let Some(buffer) = reassembler.pop() { + gst::trace!( + CAT, + obj = srcpad, + "Pushing buffer of {} bytes with dts: {:?} for stream: {stream_id}", + buffer.size(), + buffer.dts(), + ); + + if let Err(err) = srcpad.push(buffer) { + gst::error!(CAT, imp = self, "Failed to push buffer: {err}"); + return Err(gst::FlowError::Error); + } + } + + return Ok(gst::FlowSuccess::Ok); + } + None => { + let dts = buffer.dts(); + + let buf_mut = buffer.get_mut().ok_or(gst::FlowError::Error)?; + let map = buf_mut.map_readable().map_err(|_| gst::FlowError::Error)?; + + let data = map.as_slice(); + let varint = get_varint(data); + + if varint.is_none() { + gst::error!(CAT, imp = self, "Unexpected VarInt parse error"); + return Err(gst::FlowError::Error); + } + + // We have not seen the flow id for this pad + let (flow_id, flow_id_len) = { + let (flow_id, flow_id_len) = varint.unwrap(); + (flow_id, flow_id_len) + }; + + gst::trace!( + CAT, + imp = self, + "Got stream-id {stream_id} with flow-id {flow_id} of length {flow_id_len} {}", + data.len() + ); + + let srcpad = self.add_srcpad_for_flowid(flow_id)?; + + let mut reassembler = Reassembler::new(srcpad.clone()); + reassembler.push_bytes(&data[flow_id_len..], dts)?; + + while let Some(buffer) = reassembler.pop() { + gst::trace!( + CAT, + obj = srcpad, + "Pushing output buffer of size: {} with dts: {:?} for stream: {stream_id}", + buffer.size(), + buffer.dts(), + ); + + if let Err(err) = srcpad.push(buffer) { + gst::error!(CAT, imp = self, "Failed to push buffer: {err}"); + return Err(gst::FlowError::Error); + } + } + + started + .stream_pads_map + .insert(stream_id, (srcpad, reassembler)); + + return Ok(gst::FlowSuccess::Ok); + } + } + } + + Ok(gst::FlowSuccess::Ok) + } + + fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + use gst::EventView; + + gst::debug!(CAT, imp = self, "Handling event {:?}", event); + + if let EventView::CustomDownstream(ev) = event.view() { + if let Some(s) = ev.structure() { + if s.name() == QUIC_STREAM_CLOSE_CUSTOMDOWNSTREAM_EVENT { + if let Ok(stream_id) = s.get::(QUIC_STREAM_ID) { + return self.remove_pad(stream_id); + } + } + } + } + + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } +} diff --git a/net/quinn/src/quinnroqdemux/mod.rs b/net/quinn/src/quinnroqdemux/mod.rs new file mode 100644 index 00000000..af6a3620 --- /dev/null +++ b/net/quinn/src/quinnroqdemux/mod.rs @@ -0,0 +1,31 @@ +// Copyright (C) 2024, Asymptotic Inc. +// Author: Sanchayan Maity +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * element-quinnroqdemux: + * @short-description: Supports stream demultiplexing of RTP packets over QUIC + * + */ +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct QuinnRoqDemux(ObjectSubclass) @extends gst::Element, gst::Object, @implements gst::ChildProxy; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "quinnroqdemux", + gst::Rank::NONE, + QuinnRoqDemux::static_type(), + ) +} diff --git a/net/quinn/src/quinnroqmux/imp.rs b/net/quinn/src/quinnroqmux/imp.rs new file mode 100644 index 00000000..9f229cd0 --- /dev/null +++ b/net/quinn/src/quinnroqmux/imp.rs @@ -0,0 +1,499 @@ +// Copyright (C) 2024, Asymptotic Inc. +// Author: Sanchayan Maity +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +// Implements RTP over QUIC as per the following specification, +// https://datatracker.ietf.org/doc/draft-ietf-avtcore-rtp-over-quic/ + +use crate::common::*; +use crate::quinnquicmeta::QuinnQuicMeta; +use crate::quinnquicquery::*; +use gst::{glib, prelude::*, subclass::prelude::*}; +use gst_base::prelude::*; +use gst_base::subclass::prelude::*; +use itertools::Itertools; +use std::collections::HashMap; +use std::io::Read; +use std::sync::{LazyLock, Mutex}; + +const INITIAL_FLOW_ID: u64 = 1; +const MAXIMUM_FLOW_ID: u64 = (1 << 62) - 1; +const DEFAULT_STREAM_PRIORITY: i32 = 0; + +static CAT: LazyLock = LazyLock::new(|| { + gst::DebugCategory::new( + "quinnroqmux", + gst::DebugColorFlags::empty(), + Some("Quinn RTP over QUIC Muxer"), + ) +}); + +#[derive(Default)] +struct PadState { + flow_id_sent: bool, + stream_id: Option, +} + +struct QuinnRoqMuxPadSettings { + flow_id: u64, + priority: i32, +} + +impl Default for QuinnRoqMuxPadSettings { + fn default() -> Self { + Self { + flow_id: INITIAL_FLOW_ID, + priority: 0, + } + } +} + +#[derive(Default)] +pub(crate) struct QuinnRoqMuxPad { + settings: Mutex, + state: Mutex, +} + +#[glib::object_subclass] +impl ObjectSubclass for QuinnRoqMuxPad { + const NAME: &'static str = "QuinnRoqMuxPad"; + type Type = super::QuinnRoqMuxPad; + type ParentType = gst::Pad; +} + +impl ObjectImpl for QuinnRoqMuxPad { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: LazyLock> = LazyLock::new(|| { + vec![ + glib::ParamSpecUInt64::builder("flow-id") + .nick("Flow identifier") + .blurb("Flow identifier") + .default_value(INITIAL_FLOW_ID) + .minimum(INITIAL_FLOW_ID) + .maximum(MAXIMUM_FLOW_ID) + .readwrite() + .build(), + glib::ParamSpecInt::builder("priority") + .nick("Priority of the stream, ignored by datagrams") + .blurb("Priority of the stream, ignored by datagrams") + .default_value(DEFAULT_STREAM_PRIORITY) + .readwrite() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "flow-id" => { + let mut settings = self.settings.lock().unwrap(); + settings.flow_id = value.get::().expect("type checked upstream"); + } + "priority" => { + let mut settings = self.settings.lock().unwrap(); + settings.priority = value.get::().expect("type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "flow-id" => { + let settings = self.settings.lock().unwrap(); + settings.flow_id.to_value() + } + "priority" => { + let settings = self.settings.lock().unwrap(); + settings.priority.to_value() + } + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for QuinnRoqMuxPad {} + +impl PadImpl for QuinnRoqMuxPad {} + +impl ProxyPadImpl for QuinnRoqMuxPad {} + +#[derive(Default)] +struct State { + stream_uni_conns: u64, + datagrams: u64, +} + +pub struct QuinnRoqMux { + state: Mutex, + srcpad: gst::Pad, +} + +impl GstObjectImpl for QuinnRoqMux {} + +impl ElementImpl for QuinnRoqMux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: LazyLock = LazyLock::new(|| { + gst::subclass::ElementMetadata::new( + "Quinn RTP over QUIC Multiplexer", + "Source/Network/QUIC", + "Multiplexes multiple RTP streams over QUIC", + "Sanchayan Maity ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: LazyLock> = LazyLock::new(|| { + let sink_caps = gst::Caps::builder("application/x-rtp").build(); + + let stream_pad_template = gst::PadTemplate::with_gtype( + "stream_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &sink_caps, + super::QuinnRoqMuxPad::static_type(), + ) + .unwrap(); + + let datagram_pad_template = gst::PadTemplate::with_gtype( + "datagram_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &sink_caps, + super::QuinnRoqMuxPad::static_type(), + ) + .unwrap(); + + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::new_any(), + ) + .unwrap(); + + vec![stream_pad_template, datagram_pad_template, src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } + + fn request_new_pad( + &self, + templ: &gst::PadTemplate, + _name: Option<&str>, + _caps: Option<&gst::Caps>, + ) -> Option { + let mut state = self.state.lock().unwrap(); + + match templ.name_template() { + "stream_%u" => { + let sink_pad_name = format!("stream_{}", state.stream_uni_conns); + + gst::debug!(CAT, imp = self, "Requesting pad {}", sink_pad_name); + + let sinkpad = gst::PadBuilder::::from_template(templ) + .name(sink_pad_name.clone()) + .chain_function(|pad, parent, buffer| { + QuinnRoqMux::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this| this.rtp_stream_sink_chain(pad, buffer), + ) + }) + .flags(gst::PadFlags::FIXED_CAPS) + .build(); + + self.obj() + .add_pad(&sinkpad) + .expect("Failed to add sink pad"); + + state.stream_uni_conns += 1; + + Some(sinkpad.upcast()) + } + "datagram_%u" => { + if request_datagram(&self.srcpad) { + gst::warning!(CAT, imp = self, "Datagram unsupported by peer"); + return None; + } + + let sink_pad_name = format!("datagram_{}", state.datagrams); + + gst::debug!(CAT, imp = self, "Requesting pad {}", sink_pad_name); + + let sinkpad = gst::PadBuilder::::from_template(templ) + .name(sink_pad_name.clone()) + .chain_function(|pad, parent, buffer| { + QuinnRoqMux::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |this| this.rtp_datagram_sink_chain(pad, buffer), + ) + }) + .flags(gst::PadFlags::FIXED_CAPS) + .build(); + + self.obj() + .add_pad(&sinkpad) + .expect("Failed to add sink pad"); + + state.datagrams += 1; + + Some(sinkpad.upcast()) + } + _ => None, + } + } + + fn release_pad(&self, pad: &gst::Pad) { + pad.set_active(false).unwrap(); + + if pad.name().starts_with("stream") { + self.close_stream_for_pad(pad); + } + + self.obj().remove_pad(pad).unwrap(); + } +} + +impl ObjectImpl for QuinnRoqMux { + fn constructed(&self) { + self.parent_constructed(); + + self.obj() + .add_pad(&self.srcpad) + .expect("Failed to add source pad"); + } +} + +#[glib::object_subclass] +impl ObjectSubclass for QuinnRoqMux { + const NAME: &'static str = "GstQuinnRoqMux"; + type Type = super::QuinnRoqMux; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::Pad::builder_from_template(&templ).build(); + + Self { + state: Mutex::new(State::default()), + srcpad, + } + } +} + +impl ChildProxyImpl for QuinnRoqMux { + fn children_count(&self) -> u32 { + let object = self.obj(); + object.num_pads() as u32 + } + + fn child_by_name(&self, name: &str) -> Option { + let object = self.obj(); + object + .pads() + .into_iter() + .find(|p| p.name() == name) + .map(|p| p.upcast()) + } + + fn child_by_index(&self, index: u32) -> Option { + let object = self.obj(); + object + .pads() + .into_iter() + .nth(index as usize) + .map(|p| p.upcast()) + } +} + +impl QuinnRoqMux { + fn rtp_datagram_sink_chain( + &self, + pad: &super::QuinnRoqMuxPad, + buffer: gst::Buffer, + ) -> Result { + /* + * As per section 5.2.1 of RTP over QUIC specification. + * Stream encapsulation format for ROQ datagrams is as + * follows: + * + * Payload { + * Flow Identifier(i) + * RTP Packet(..) + * } + * + * See section 5.3 of RTP over QUIC specification. + * + * DATAGRAMs preserve application frame boundaries. Thus, a + * single RTP packet can be mapped to a single DATAGRAM without + * additional framing. Because QUIC DATAGRAMs cannot be + * IP-fragmented (Section 5 of [RFC9221]), senders need to + * consider the header overhead associated with DATAGRAMs, and + * ensure that the RTP packets, including their payloads, flow + * identifier, QUIC, and IP headers, will fit into the Path MTU. + */ + + let mux_pad_settings = pad.imp().settings.lock().unwrap(); + let flow_id = mux_pad_settings.flow_id; + drop(mux_pad_settings); + + let size = get_varint_size(flow_id); + let mut outbuf = gst::Buffer::with_size(size).unwrap(); + { + let outbuffer = outbuf.get_mut().unwrap(); + { + let mut map = outbuffer.map_writable().unwrap(); + let mut data = map.as_mut_slice(); + + set_varint(&mut data, flow_id); + } + + outbuffer.set_pts(buffer.pts()); + outbuffer.set_dts(buffer.dts()); + + QuinnQuicMeta::add(outbuffer, 0, true); + } + + outbuf.append(buffer); + + self.srcpad.push(outbuf) + } + + fn rtp_stream_sink_chain( + &self, + pad: &super::QuinnRoqMuxPad, + buffer: gst::Buffer, + ) -> Result { + /* + * As per section 5.2.1 of RTP over QUIC specification. + * Stream encapsulation format for ROQ streams is as + * follows: + * + * Payload { + * Flow Identifier(i) + * RTP Payload(..) + * } + * + * RTP Payload { + * Length(i) + * RTP Packet(..) + * } + */ + + let mut pad_state = pad.imp().state.lock().unwrap(); + let stream_id = match pad_state.stream_id { + Some(stream_id) => stream_id, + None => { + let mux_pad_settings = pad.imp().settings.lock().unwrap(); + let priority = mux_pad_settings.priority; + drop(mux_pad_settings); + + gst::info!(CAT, obj = pad, "Requesting stream with priority {priority}"); + + match request_stream(&self.srcpad, priority) { + Some(stream_id) => { + pad_state.stream_id = Some(stream_id); + stream_id + } + None => { + gst::error!(CAT, obj = pad, "Failed to request stream"); + + return Err(gst::FlowError::Error); + } + } + } + }; + + if !pad_state.flow_id_sent { + let mux_pad_settings = pad.imp().settings.lock().unwrap(); + let flow_id = mux_pad_settings.flow_id; + drop(mux_pad_settings); + + let size = get_varint_size(flow_id); + let mut flow_id_buf = gst::Buffer::with_size(size).unwrap(); + { + let buffer = flow_id_buf.get_mut().unwrap(); + { + let mut map = buffer.map_writable().unwrap(); + let mut data = map.as_mut_slice(); + + set_varint(&mut data, flow_id); + } + + QuinnQuicMeta::add(buffer, stream_id, false); + } + + if let Err(e) = self.srcpad.push(flow_id_buf) { + gst::error!(CAT, obj = pad, "Failed to push flow id buffer: {e:?}"); + return Err(gst::FlowError::Error); + } + + pad_state.flow_id_sent = true; + } + + drop(pad_state); + + let buf_sz_len = get_varint_size(buffer.size() as u64); + let mut outbuf = gst::Buffer::with_size(buf_sz_len).unwrap(); + + gst::trace!( + CAT, + obj = pad, + "Got input buffer of size: {}, pts: {:?}, dts: {:?} for stream: {stream_id}", + buffer.size(), + buffer.pts(), + buffer.dts(), + ); + + { + let outbuf = outbuf.get_mut().unwrap(); + { + let mut obuf = outbuf.map_writable().unwrap(); + let mut obuf_slice = obuf.as_mut_slice(); + set_varint(&mut obuf_slice, buffer.size() as u64); + } + + QuinnQuicMeta::add(outbuf, stream_id, false); + + outbuf.set_pts(buffer.pts()); + outbuf.set_dts(buffer.dts()); + } + + outbuf.append(buffer); + + gst::trace!( + CAT, + obj = pad, + "Pushing buffer of {} bytes for stream: {stream_id}", + outbuf.size(), + ); + + self.srcpad.push(outbuf) + } + + fn close_stream_for_pad(&self, pad: &gst::Pad) { + let mux_pad = pad.downcast_ref::().unwrap(); + let pad_state = mux_pad.imp().state.lock().unwrap(); + + if let Some(stream_id) = pad_state.stream_id { + if close_stream(&self.srcpad, stream_id) { + gst::info!(CAT, obj = pad, "Closed connection"); + } else { + gst::warning!(CAT, obj = pad, "Failed to close connection"); + } + } + } +} diff --git a/net/quinn/src/quinnroqmux/mod.rs b/net/quinn/src/quinnroqmux/mod.rs new file mode 100644 index 00000000..4806bf4d --- /dev/null +++ b/net/quinn/src/quinnroqmux/mod.rs @@ -0,0 +1,35 @@ +// Copyright (C) 2024, Asymptotic Inc. +// Author: Sanchayan Maity +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * element-quinnroqmux: + * @short-description: Supports stream multiplexing of RTP packets over QUIC + * + */ +use gst::glib; +use gst::prelude::*; + +pub mod imp; + +glib::wrapper! { + pub struct QuinnRoqMux(ObjectSubclass) @extends gst_base::Aggregator, gst::Element, gst::Object, @implements gst::ChildProxy; +} + +glib::wrapper! { + pub(crate) struct QuinnRoqMuxPad(ObjectSubclass) @extends gst::ProxyPad, gst::Pad, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "quinnroqmux", + gst::Rank::NONE, + QuinnRoqMux::static_type(), + ) +}