net/quinn: Use aggregator as base class for quinnroqmux

While at it, also update and fix the docs.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1775>
This commit is contained in:
Sanchayan Maity 2024-10-15 18:19:09 +05:30
parent 8dc8aa6f55
commit 28e66e150f
3 changed files with 150 additions and 43 deletions

View file

@ -5922,6 +5922,7 @@
"description": "Multiplexes multiple RTP streams over QUIC", "description": "Multiplexes multiple RTP streams over QUIC",
"hierarchy": [ "hierarchy": [
"GstQuinnRoqMux", "GstQuinnRoqMux",
"GstAggregator",
"GstElement", "GstElement",
"GstObject", "GstObject",
"GInitiallyUnowned", "GInitiallyUnowned",
@ -5935,16 +5936,16 @@
"presence": "request", "presence": "request",
"type": "QuinnRoqMuxPad" "type": "QuinnRoqMuxPad"
}, },
"src": {
"caps": "ANY",
"direction": "src",
"presence": "always"
},
"stream_%%u": { "stream_%%u": {
"caps": "application/x-rtp:\n", "caps": "application/x-rtp:\n",
"direction": "sink", "direction": "sink",
"presence": "request", "presence": "request",
"type": "QuinnRoqMuxPad" "type": "QuinnRoqMuxPad"
},
"src": {
"caps": "ANY",
"direction": "src",
"presence": "always"
} }
}, },
"rank": "none" "rank": "none"
@ -5994,6 +5995,47 @@
"writable": true "writable": true
} }
} }
},
"QuinnRoqMuxPad": {
"hierarchy": [
"QuinnRoqMuxPad",
"GstAggregatorPad",
"GstPad",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"kind": "object",
"properties": {
"flow-id": {
"blurb": "Flow identifier",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1",
"max": "4611686018427387903",
"min": "1",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
},
"priority": {
"blurb": "Priority of the stream, ignored by datagrams",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "2147483647",
"min": "-2147483648",
"mutable": "null",
"readable": true,
"type": "gint",
"writable": true
}
}
} }
}, },
"package": "gst-plugin-quinn", "package": "gst-plugin-quinn",
@ -15274,4 +15316,4 @@
"tracers": {}, "tracers": {},
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
} }
} }

View file

@ -63,7 +63,7 @@ pub(crate) struct QuinnRoqMuxPad {
impl ObjectSubclass for QuinnRoqMuxPad { impl ObjectSubclass for QuinnRoqMuxPad {
const NAME: &'static str = "QuinnRoqMuxPad"; const NAME: &'static str = "QuinnRoqMuxPad";
type Type = super::QuinnRoqMuxPad; type Type = super::QuinnRoqMuxPad;
type ParentType = gst::Pad; type ParentType = gst_base::AggregatorPad;
} }
impl ObjectImpl for QuinnRoqMuxPad { impl ObjectImpl for QuinnRoqMuxPad {
@ -125,15 +125,17 @@ impl PadImpl for QuinnRoqMuxPad {}
impl ProxyPadImpl for QuinnRoqMuxPad {} impl ProxyPadImpl for QuinnRoqMuxPad {}
impl AggregatorPadImpl for QuinnRoqMuxPad {}
#[derive(Default)] #[derive(Default)]
struct State { struct State {
stream_uni_conns: u64,
datagrams: u64, datagrams: u64,
stream_uni_conns: u64,
segment_updated: bool,
} }
pub struct QuinnRoqMux { pub struct QuinnRoqMux {
state: Mutex<State>, state: Mutex<State>,
srcpad: gst::Pad,
} }
impl GstObjectImpl for QuinnRoqMux {} impl GstObjectImpl for QuinnRoqMux {}
@ -204,13 +206,6 @@ impl ElementImpl for QuinnRoqMux {
let sinkpad = gst::PadBuilder::<super::QuinnRoqMuxPad>::from_template(templ) let sinkpad = gst::PadBuilder::<super::QuinnRoqMuxPad>::from_template(templ)
.name(sink_pad_name.clone()) .name(sink_pad_name.clone())
.chain_function(|pad, parent, buffer| {
QuinnRoqMux::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.rtp_stream_sink_chain(pad, buffer),
)
})
.flags(gst::PadFlags::FIXED_CAPS) .flags(gst::PadFlags::FIXED_CAPS)
.build(); .build();
@ -223,7 +218,7 @@ impl ElementImpl for QuinnRoqMux {
Some(sinkpad.upcast()) Some(sinkpad.upcast())
} }
"datagram_%u" => { "datagram_%u" => {
if request_datagram(&self.srcpad) { if request_datagram(&self.srcpad()) {
gst::warning!(CAT, imp = self, "Datagram unsupported by peer"); gst::warning!(CAT, imp = self, "Datagram unsupported by peer");
return None; return None;
} }
@ -234,13 +229,6 @@ impl ElementImpl for QuinnRoqMux {
let sinkpad = gst::PadBuilder::<super::QuinnRoqMuxPad>::from_template(templ) let sinkpad = gst::PadBuilder::<super::QuinnRoqMuxPad>::from_template(templ)
.name(sink_pad_name.clone()) .name(sink_pad_name.clone())
.chain_function(|pad, parent, buffer| {
QuinnRoqMux::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.rtp_datagram_sink_chain(pad, buffer),
)
})
.flags(gst::PadFlags::FIXED_CAPS) .flags(gst::PadFlags::FIXED_CAPS)
.build(); .build();
@ -257,23 +245,18 @@ impl ElementImpl for QuinnRoqMux {
} }
fn release_pad(&self, pad: &gst::Pad) { fn release_pad(&self, pad: &gst::Pad) {
pad.set_active(false).unwrap();
if pad.name().starts_with("stream") { if pad.name().starts_with("stream") {
self.close_stream_for_pad(pad); self.close_stream_for_pad(pad);
} }
self.obj().remove_pad(pad).unwrap(); self.parent_release_pad(pad)
} }
} }
impl ObjectImpl for QuinnRoqMux { impl ObjectImpl for QuinnRoqMux {
fn constructed(&self) { fn constructed(&self) {
self.parent_constructed(); self.parent_constructed();
self.obj().set_force_live(true);
self.obj()
.add_pad(&self.srcpad)
.expect("Failed to add source pad");
} }
} }
@ -281,15 +264,11 @@ impl ObjectImpl for QuinnRoqMux {
impl ObjectSubclass for QuinnRoqMux { impl ObjectSubclass for QuinnRoqMux {
const NAME: &'static str = "GstQuinnRoqMux"; const NAME: &'static str = "GstQuinnRoqMux";
type Type = super::QuinnRoqMux; type Type = super::QuinnRoqMux;
type ParentType = gst::Element; type ParentType = gst_base::Aggregator;
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("src").unwrap();
let srcpad = gst::Pad::builder_from_template(&templ).build();
fn with_class(_klass: &Self::Class) -> Self {
Self { Self {
state: Mutex::new(State::default()), state: Mutex::new(State::default()),
srcpad,
} }
} }
} }
@ -319,6 +298,83 @@ impl ChildProxyImpl for QuinnRoqMux {
} }
} }
impl AggregatorImpl for QuinnRoqMux {
fn aggregate(&self, timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::trace!(CAT, imp = self, "Aggregate (timeout: {timeout})");
let all_eos = self.obj().sink_pads().iter().all(|pad| {
pad.downcast_ref::<super::QuinnRoqMuxPad>()
.expect("Not a QuinnRoqMux pad")
.is_eos()
});
if all_eos {
gst::debug!(CAT, imp = self, "All pads are EOS now");
return Err(gst::FlowError::Eos);
}
// Buffers might arrive with different PTS on the pads. Consider
// a scenario where a buffer with PTS 1s arrives after a buffer
// with PTS 2s on one of the pads. Buffer with PTS 2s gets pushed
// and then buffer with PTS 1s gets pushed. Sink will wait to sync
// on 2s PTS. This effectively creates a scenario similar to HOL
// blocking which we want to avoid with QUIC streams in the first
// place.
//
// To avoid this, we order the pads based on the buffer PTS and
// then push the buffers downstream.
let mut pad_pts: Vec<(Option<gst::ClockTime>, gst::Pad)> = Vec::new();
let mut state = self.state.lock().unwrap();
for pad in self.obj().sink_pads() {
let muxpad = pad
.downcast_ref::<super::QuinnRoqMuxPad>()
.expect("Not a QuinnRoqMux pad");
if muxpad.is_eos() {
continue;
}
match muxpad.peek_buffer() {
Some(buffer) => {
pad_pts.push((buffer.pts(), pad));
}
None => continue,
}
}
let buffers_sorted_by_pts = pad_pts.into_iter().sorted_by_key(|x| x.0);
gst::trace!(
CAT,
imp = self,
"Buffers PTS sorted: {buffers_sorted_by_pts:?}"
);
for (pts, pad) in buffers_sorted_by_pts {
if !state.segment_updated {
state.segment_updated = true;
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
segment.set_start(pts);
self.obj().update_segment(&segment);
}
let muxpad = pad
.downcast_ref::<super::QuinnRoqMuxPad>()
.expect("Not a QuinnRoqMux pad");
let buffer = muxpad.pop_buffer().unwrap();
if muxpad.name().starts_with("stream_") {
self.rtp_stream_sink_chain(muxpad, buffer)?;
} else {
self.rtp_datagram_sink_chain(muxpad, buffer)?;
}
}
Ok(gst::FlowSuccess::Ok)
}
}
impl QuinnRoqMux { impl QuinnRoqMux {
fn rtp_datagram_sink_chain( fn rtp_datagram_sink_chain(
&self, &self,
@ -369,7 +425,7 @@ impl QuinnRoqMux {
outbuf.append(buffer); outbuf.append(buffer);
self.srcpad.push(outbuf) self.obj().finish_buffer(outbuf)
} }
fn rtp_stream_sink_chain( fn rtp_stream_sink_chain(
@ -403,7 +459,7 @@ impl QuinnRoqMux {
gst::info!(CAT, obj = pad, "Requesting stream with priority {priority}"); gst::info!(CAT, obj = pad, "Requesting stream with priority {priority}");
match request_stream(&self.srcpad, priority) { match request_stream(&self.srcpad(), priority) {
Some(stream_id) => { Some(stream_id) => {
pad_state.stream_id = Some(stream_id); pad_state.stream_id = Some(stream_id);
stream_id stream_id
@ -436,7 +492,7 @@ impl QuinnRoqMux {
QuinnQuicMeta::add(buffer, stream_id, false); QuinnQuicMeta::add(buffer, stream_id, false);
} }
if let Err(e) = self.srcpad.push(flow_id_buf) { if let Err(e) = self.obj().finish_buffer(flow_id_buf) {
gst::error!(CAT, obj = pad, "Failed to push flow id buffer: {e:?}"); gst::error!(CAT, obj = pad, "Failed to push flow id buffer: {e:?}");
return Err(gst::FlowError::Error); return Err(gst::FlowError::Error);
} }
@ -481,7 +537,7 @@ impl QuinnRoqMux {
outbuf.size(), outbuf.size(),
); );
self.srcpad.push(outbuf) self.obj().finish_buffer(outbuf)
} }
fn close_stream_for_pad(&self, pad: &gst::Pad) { fn close_stream_for_pad(&self, pad: &gst::Pad) {
@ -489,11 +545,16 @@ impl QuinnRoqMux {
let pad_state = mux_pad.imp().state.lock().unwrap(); let pad_state = mux_pad.imp().state.lock().unwrap();
if let Some(stream_id) = pad_state.stream_id { if let Some(stream_id) = pad_state.stream_id {
if close_stream(&self.srcpad, stream_id) { if close_stream(&self.srcpad(), stream_id) {
gst::info!(CAT, obj = pad, "Closed connection"); gst::info!(CAT, obj = pad, "Closed connection");
} else { } else {
gst::warning!(CAT, obj = pad, "Failed to close connection"); gst::warning!(CAT, obj = pad, "Failed to close connection");
} }
} }
} }
fn srcpad(&self) -> gst::Pad {
let obj = self.obj();
obj.src_pad().upcast_ref::<gst::Pad>().clone()
}
} }

View file

@ -22,10 +22,14 @@ glib::wrapper! {
} }
glib::wrapper! { glib::wrapper! {
pub(crate) struct QuinnRoqMuxPad(ObjectSubclass<imp::QuinnRoqMuxPad>) @extends gst::ProxyPad, gst::Pad, gst::Object; pub(crate) struct QuinnRoqMuxPad(ObjectSubclass<imp::QuinnRoqMuxPad>) @extends gst_base::AggregatorPad, gst::ProxyPad, gst::Pad, gst::Object;
} }
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
#[cfg(feature = "doc")]
{
QuinnRoqMuxPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
}
gst::Element::register( gst::Element::register(
Some(plugin), Some(plugin),
"quinnroqmux", "quinnroqmux",