net/quinn: Use aggregator as base class for quinnquicmux

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1634>
This commit is contained in:
Sanchayan Maity 2024-10-11 02:22:46 +05:30 committed by GStreamer Marge Bot
parent 46f1fb4430
commit 324f3531be
5 changed files with 184 additions and 96 deletions

1
Cargo.lock generated
View file

@ -2852,6 +2852,7 @@ dependencies = [
"gstreamer", "gstreamer",
"gstreamer-base", "gstreamer-base",
"gstreamer-check", "gstreamer-check",
"itertools 0.12.1",
"quinn", "quinn",
"quinn-proto", "quinn-proto",
"rcgen", "rcgen",

View file

@ -5115,6 +5115,7 @@
"description": "Multiplexes multiple streams and datagram for QUIC", "description": "Multiplexes multiple streams and datagram for QUIC",
"hierarchy": [ "hierarchy": [
"GstQuinnQuicMux", "GstQuinnQuicMux",
"GstAggregator",
"GstElement", "GstElement",
"GstObject", "GstObject",
"GInitiallyUnowned", "GInitiallyUnowned",
@ -5125,7 +5126,8 @@
"datagram": { "datagram": {
"caps": "ANY", "caps": "ANY",
"direction": "sink", "direction": "sink",
"presence": "request" "presence": "request",
"type": "GstAggregatorPad"
}, },
"src": { "src": {
"caps": "ANY", "caps": "ANY",
@ -5896,6 +5898,33 @@
"value": "1" "value": "1"
} }
] ]
},
"QuinnQuicMuxPad": {
"hierarchy": [
"QuinnQuicMuxPad",
"GstAggregatorPad",
"GstPad",
"GstObject",
"GInitiallyUnowned",
"GObject"
],
"kind": "object",
"properties": {
"priority": {
"blurb": "Priority of the stream",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "2147483647",
"min": "-2147483648",
"mutable": "null",
"readable": true,
"type": "gint",
"writable": true
}
}
} }
}, },
"package": "gst-plugin-quinn", "package": "gst-plugin-quinn",

View file

@ -25,6 +25,7 @@ rcgen = "0.13"
bytes = "1.5.0" bytes = "1.5.0"
thiserror = "2" thiserror = "2"
async-channel = "2.3" async-channel = "2.3"
itertools = "0.12"
[dev-dependencies] [dev-dependencies]
gst-check = { workspace = true, features = ["v1_20"] } gst-check = { workspace = true, features = ["v1_20"] }

View file

@ -41,7 +41,7 @@ pub(crate) struct QuinnQuicMuxPad {
impl ObjectSubclass for QuinnQuicMuxPad { impl ObjectSubclass for QuinnQuicMuxPad {
const NAME: &'static str = "QuinnQuicMuxPad"; const NAME: &'static str = "QuinnQuicMuxPad";
type Type = super::QuinnQuicMuxPad; type Type = super::QuinnQuicMuxPad;
type ParentType = gst::Pad; type ParentType = gst_base::AggregatorPad;
} }
impl ObjectImpl for QuinnQuicMuxPad { impl ObjectImpl for QuinnQuicMuxPad {
@ -85,16 +85,18 @@ impl PadImpl for QuinnQuicMuxPad {}
impl ProxyPadImpl for QuinnQuicMuxPad {} impl ProxyPadImpl for QuinnQuicMuxPad {}
impl AggregatorPadImpl for QuinnQuicMuxPad {}
#[derive(Default)] #[derive(Default)]
struct State { struct State {
stream_uni_conns: u64, stream_uni_conns: u64,
datagram_requested: bool, datagram_requested: bool,
stream_id_map: HashMap<gst::Pad, u64>, stream_id_map: HashMap<gst::Pad, u64>,
segment_updated: bool,
} }
pub struct QuinnQuicMux { pub struct QuinnQuicMux {
state: Mutex<State>, state: Mutex<State>,
srcpad: gst::Pad,
} }
impl GstObjectImpl for QuinnQuicMux {} impl GstObjectImpl for QuinnQuicMux {}
@ -124,11 +126,12 @@ impl ElementImpl for QuinnQuicMux {
) )
.unwrap(); .unwrap();
let datagram_pad_template = gst::PadTemplate::new( let datagram_pad_template = gst::PadTemplate::with_gtype(
"datagram", "datagram",
gst::PadDirection::Sink, gst::PadDirection::Sink,
gst::PadPresence::Request, gst::PadPresence::Request,
&gst::Caps::new_any(), &gst::Caps::new_any(),
gst_base::AggregatorPad::static_type(),
) )
.unwrap(); .unwrap();
@ -171,13 +174,6 @@ impl ElementImpl for QuinnQuicMux {
let stream_pad = gst::PadBuilder::<super::QuinnQuicMuxPad>::from_template(templ) let stream_pad = gst::PadBuilder::<super::QuinnQuicMuxPad>::from_template(templ)
.name(stream_pad_name) .name(stream_pad_name)
.chain_function(|pad, parent, buffer| {
QuinnQuicMux::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.stream_uni_sink_chain(pad, buffer),
)
})
.flags(gst::PadFlags::FIXED_CAPS) .flags(gst::PadFlags::FIXED_CAPS)
.build(); .build();
@ -198,13 +194,6 @@ impl ElementImpl for QuinnQuicMux {
let datagram_pad = gst::Pad::builder_from_template(templ) let datagram_pad = gst::Pad::builder_from_template(templ)
.name("datagram") .name("datagram")
.chain_function(|pad, parent, buffer| {
QuinnQuicMux::catch_panic_pad_function(
parent,
|| Err(gst::FlowError::Error),
|this| this.datagram_sink_chain(pad, buffer),
)
})
.flags(gst::PadFlags::FIXED_CAPS) .flags(gst::PadFlags::FIXED_CAPS)
.build(); .build();
@ -223,15 +212,13 @@ impl ElementImpl for QuinnQuicMux {
fn release_pad(&self, pad: &gst::Pad) { fn release_pad(&self, pad: &gst::Pad) {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
pad.set_active(false).unwrap();
if pad.name() == "datagram" { if pad.name() == "datagram" {
state.datagram_requested = false; state.datagram_requested = false;
} else if pad.name().starts_with("stream") { } else if pad.name().starts_with("stream") {
self.close_stream_for_pad(pad, &mut state); self.close_stream_for_pad(pad, &mut state);
} }
self.obj().remove_pad(pad).unwrap(); self.parent_release_pad(pad)
} }
fn change_state( fn change_state(
@ -240,7 +227,7 @@ impl ElementImpl for QuinnQuicMux {
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
if let gst::StateChange::NullToReady = transition { if let gst::StateChange::NullToReady = transition {
for pad in self.obj().sink_pads() { for pad in self.obj().sink_pads() {
if pad.name() == "datagram" && !request_datagram(&self.srcpad) { if pad.name() == "datagram" && !request_datagram(&self.srcpad()) {
gst::warning!(CAT, imp = self, "Datagram unsupported by the peer"); gst::warning!(CAT, imp = self, "Datagram unsupported by the peer");
return Err(gst::StateChangeError); return Err(gst::StateChangeError);
@ -252,13 +239,140 @@ impl ElementImpl for QuinnQuicMux {
} }
} }
impl AggregatorImpl for QuinnQuicMux {
fn aggregate(&self, timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::trace!(CAT, imp = self, "Aggregate (timeout: {timeout})");
let all_eos = self.obj().sink_pads().iter().all(|pad| {
pad.downcast_ref::<super::QuinnQuicMuxPad>()
.expect("Not a QuinnQuicMux pad")
.is_eos()
});
if all_eos {
gst::debug!(CAT, imp = self, "All pads are EOS now");
return Err(gst::FlowError::Eos);
}
// Buffers might arrive with different PTS on the pads. Consider
// a scenario where a buffer with PTS 1s arrives after a buffer
// with PTS 2s on one of the pads. Buffer with PTS 2s gets pushed
// and then buffer with PTS 1s gets pushed. Sink will wait to sync
// on 2s PTS. This effectively creates a scenario similar to HOL
// blocking which we want to avoid with QUIC streams in the first
// place.
//
// To avoid this, we order the pads based on the buffer PTS and
// then push the buffers downstream.
let mut pad_pts: Vec<(Option<gst::ClockTime>, gst::Pad)> = Vec::new();
let mut state = self.state.lock().unwrap();
for pad in self.obj().sink_pads() {
let muxpad = pad
.downcast_ref::<super::QuinnQuicMuxPad>()
.expect("Not a QuinnQuicMux pad");
if muxpad.is_eos() {
continue;
}
match muxpad.peek_buffer() {
Some(buffer) => {
pad_pts.push((buffer.pts(), pad));
}
None => continue,
}
}
let buffers_sorted_by_pts = pad_pts.into_iter().sorted_by_key(|x| x.0);
gst::trace!(
CAT,
imp = self,
"Buffers PTS sorted: {buffers_sorted_by_pts:?}"
);
for (pts, pad) in buffers_sorted_by_pts {
if !state.segment_updated {
state.segment_updated = true;
let mut segment = gst::FormattedSegment::<gst::ClockTime>::new();
segment.set_start(pts);
self.obj().update_segment(&segment);
}
let muxpad = pad
.downcast_ref::<super::QuinnQuicMuxPad>()
.expect("Not a QuinnQuicMux pad");
if muxpad.name().starts_with("stream_") {
let stream_id = match state.stream_id_map.get(&pad) {
Some(stream_id) => *stream_id,
None => {
let mux_pad_settings = muxpad.imp().settings.lock().unwrap();
let priority = mux_pad_settings.priority;
drop(mux_pad_settings);
gst::info!(
CAT,
obj = pad,
"Requesting stream connection with priority {priority}"
);
match request_stream(&self.srcpad(), priority) {
Some(stream_id) => {
state.stream_id_map.insert(pad.clone(), stream_id);
stream_id
}
None => {
gst::error!(CAT, obj = pad, "Failed to request stream");
return Err(gst::FlowError::Error);
}
}
}
};
let buffer = {
let mut buffer = muxpad.pop_buffer().unwrap();
let outbuf = buffer.make_mut();
QuinnQuicMeta::add(outbuf, stream_id, false);
buffer
};
gst::trace!(
CAT,
obj = pad,
"Finishing buffer {buffer:?} for stream {stream_id}"
);
self.obj().finish_buffer(buffer)?;
gst::trace!(CAT, obj = pad, "Finished buffer for stream {stream_id}");
} else {
let buffer = {
let mut buffer = muxpad.pop_buffer().unwrap();
let outbuf = buffer.make_mut();
QuinnQuicMeta::add(outbuf, 0, true);
buffer
};
gst::trace!(CAT, obj = pad, "Finishing buffer {buffer:?} for datagram");
self.obj().finish_buffer(buffer)?;
}
}
Ok(gst::FlowSuccess::Ok)
}
}
impl ObjectImpl for QuinnQuicMux { impl ObjectImpl for QuinnQuicMux {
fn constructed(&self) { fn constructed(&self) {
self.parent_constructed(); self.parent_constructed();
self.obj().set_force_live(true);
self.obj()
.add_pad(&self.srcpad)
.expect("Failed to add source pad");
} }
} }
@ -266,15 +380,11 @@ impl ObjectImpl for QuinnQuicMux {
impl ObjectSubclass for QuinnQuicMux { impl ObjectSubclass for QuinnQuicMux {
const NAME: &'static str = "GstQuinnQuicMux"; const NAME: &'static str = "GstQuinnQuicMux";
type Type = super::QuinnQuicMux; type Type = super::QuinnQuicMux;
type ParentType = gst::Element; type ParentType = gst_base::Aggregator;
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.pad_template("src").unwrap();
let srcpad = gst::Pad::builder_from_template(&templ).build();
fn with_class(_klass: &Self::Class) -> Self {
Self { Self {
state: Mutex::new(State::default()), state: Mutex::new(State::default()),
srcpad,
} }
} }
} }
@ -305,75 +415,18 @@ impl ChildProxyImpl for QuinnQuicMux {
} }
impl QuinnQuicMux { impl QuinnQuicMux {
fn stream_uni_sink_chain(
&self,
pad: &super::QuinnQuicMuxPad,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
let gst_pad = pad.upcast_ref::<gst::Pad>();
let stream_id = match state.stream_id_map.get(gst_pad) {
Some(stream_id) => *stream_id,
None => {
let mux_pad_settings = pad.imp().settings.lock().unwrap();
let priority = mux_pad_settings.priority;
drop(mux_pad_settings);
gst::info!(
CAT,
obj = pad,
"Requesting stream connection with priority {priority}"
);
match request_stream(&self.srcpad, priority) {
Some(stream_id) => {
state.stream_id_map.insert(gst_pad.clone(), stream_id);
stream_id
}
None => {
gst::error!(CAT, obj = pad, "Failed to request stream");
return Err(gst::FlowError::Error);
}
}
}
};
gst::trace!(
CAT,
obj = pad,
"Sending buffer {buffer:?} for stream {stream_id}"
);
let outbuf = buffer.make_mut();
QuinnQuicMeta::add(outbuf, stream_id, false);
self.srcpad.push(outbuf.to_owned())
}
fn datagram_sink_chain(
&self,
pad: &gst::Pad,
mut buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst::trace!(CAT, obj = pad, "Handling buffer for datagram {:?}", buffer);
let outbuf = buffer.make_mut();
QuinnQuicMeta::add(outbuf, 0, true);
self.srcpad.push(outbuf.to_owned())
}
fn close_stream_for_pad(&self, pad: &gst::Pad, state: &mut State) { fn close_stream_for_pad(&self, pad: &gst::Pad, state: &mut State) {
if let Some(stream_id) = state.stream_id_map.remove(pad) { if let Some(stream_id) = state.stream_id_map.remove(pad) {
if close_stream(&self.srcpad, stream_id) { if close_stream(&self.srcpad(), stream_id) {
gst::info!(CAT, obj = pad, "Closed connection"); gst::info!(CAT, obj = pad, "Closed connection");
} else { } else {
gst::warning!(CAT, obj = pad, "Failed to close connection"); gst::warning!(CAT, obj = pad, "Failed to close connection");
} }
} }
} }
fn srcpad(&self) -> gst::Pad {
let obj = self.obj();
obj.src_pad().upcast_ref::<gst::Pad>().clone()
}
} }

View file

@ -22,10 +22,14 @@ glib::wrapper! {
} }
glib::wrapper! { glib::wrapper! {
pub(crate) struct QuinnQuicMuxPad(ObjectSubclass<imp::QuinnQuicMuxPad>) @extends gst::ProxyPad, gst::Pad, gst::Object; pub(crate) struct QuinnQuicMuxPad(ObjectSubclass<imp::QuinnQuicMuxPad>) @extends gst_base::AggregatorPad, gst::ProxyPad, gst::Pad, gst::Object;
} }
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
#[cfg(feature = "doc")]
{
QuinnQuicMuxPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
}
gst::Element::register( gst::Element::register(
Some(plugin), Some(plugin),
"quinnquicmux", "quinnquicmux",