diff --git a/Cargo.lock b/Cargo.lock index 2daf9800..93f09275 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2852,6 +2852,7 @@ dependencies = [ "gstreamer", "gstreamer-base", "gstreamer-check", + "itertools 0.12.1", "quinn", "quinn-proto", "rcgen", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index ad7e1cd3..bd2c4a52 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -5115,6 +5115,7 @@ "description": "Multiplexes multiple streams and datagram for QUIC", "hierarchy": [ "GstQuinnQuicMux", + "GstAggregator", "GstElement", "GstObject", "GInitiallyUnowned", @@ -5125,7 +5126,8 @@ "datagram": { "caps": "ANY", "direction": "sink", - "presence": "request" + "presence": "request", + "type": "GstAggregatorPad" }, "src": { "caps": "ANY", @@ -5896,6 +5898,33 @@ "value": "1" } ] + }, + "QuinnQuicMuxPad": { + "hierarchy": [ + "QuinnQuicMuxPad", + "GstAggregatorPad", + "GstPad", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "kind": "object", + "properties": { + "priority": { + "blurb": "Priority of the stream", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "2147483647", + "min": "-2147483648", + "mutable": "null", + "readable": true, + "type": "gint", + "writable": true + } + } } }, "package": "gst-plugin-quinn", diff --git a/net/quinn/Cargo.toml b/net/quinn/Cargo.toml index 5aac314c..994cfc52 100644 --- a/net/quinn/Cargo.toml +++ b/net/quinn/Cargo.toml @@ -25,6 +25,7 @@ rcgen = "0.13" bytes = "1.5.0" thiserror = "2" async-channel = "2.3" +itertools = "0.12" [dev-dependencies] gst-check = { workspace = true, features = ["v1_20"] } diff --git a/net/quinn/src/quinnquicmux/imp.rs b/net/quinn/src/quinnquicmux/imp.rs index 1effc09d..f3714692 100644 --- a/net/quinn/src/quinnquicmux/imp.rs +++ b/net/quinn/src/quinnquicmux/imp.rs @@ -41,7 +41,7 @@ pub(crate) struct QuinnQuicMuxPad { impl ObjectSubclass for QuinnQuicMuxPad { const NAME: &'static str = "QuinnQuicMuxPad"; type Type = super::QuinnQuicMuxPad; - type ParentType = gst::Pad; + type ParentType = gst_base::AggregatorPad; } impl ObjectImpl for QuinnQuicMuxPad { @@ -85,16 +85,18 @@ impl PadImpl for QuinnQuicMuxPad {} impl ProxyPadImpl for QuinnQuicMuxPad {} +impl AggregatorPadImpl for QuinnQuicMuxPad {} + #[derive(Default)] struct State { stream_uni_conns: u64, datagram_requested: bool, stream_id_map: HashMap, + segment_updated: bool, } pub struct QuinnQuicMux { state: Mutex, - srcpad: gst::Pad, } impl GstObjectImpl for QuinnQuicMux {} @@ -124,11 +126,12 @@ impl ElementImpl for QuinnQuicMux { ) .unwrap(); - let datagram_pad_template = gst::PadTemplate::new( + let datagram_pad_template = gst::PadTemplate::with_gtype( "datagram", gst::PadDirection::Sink, gst::PadPresence::Request, &gst::Caps::new_any(), + gst_base::AggregatorPad::static_type(), ) .unwrap(); @@ -171,13 +174,6 @@ impl ElementImpl for QuinnQuicMux { let stream_pad = gst::PadBuilder::::from_template(templ) .name(stream_pad_name) - .chain_function(|pad, parent, buffer| { - QuinnQuicMux::catch_panic_pad_function( - parent, - || Err(gst::FlowError::Error), - |this| this.stream_uni_sink_chain(pad, buffer), - ) - }) .flags(gst::PadFlags::FIXED_CAPS) .build(); @@ -198,13 +194,6 @@ impl ElementImpl for QuinnQuicMux { let datagram_pad = gst::Pad::builder_from_template(templ) .name("datagram") - .chain_function(|pad, parent, buffer| { - QuinnQuicMux::catch_panic_pad_function( - parent, - || Err(gst::FlowError::Error), - |this| this.datagram_sink_chain(pad, buffer), - ) - }) .flags(gst::PadFlags::FIXED_CAPS) .build(); @@ -223,15 +212,13 @@ impl ElementImpl for QuinnQuicMux { fn release_pad(&self, pad: &gst::Pad) { let mut state = self.state.lock().unwrap(); - pad.set_active(false).unwrap(); - if pad.name() == "datagram" { state.datagram_requested = false; } else if pad.name().starts_with("stream") { self.close_stream_for_pad(pad, &mut state); } - self.obj().remove_pad(pad).unwrap(); + self.parent_release_pad(pad) } fn change_state( @@ -240,7 +227,7 @@ impl ElementImpl for QuinnQuicMux { ) -> Result { if let gst::StateChange::NullToReady = transition { for pad in self.obj().sink_pads() { - if pad.name() == "datagram" && !request_datagram(&self.srcpad) { + if pad.name() == "datagram" && !request_datagram(&self.srcpad()) { gst::warning!(CAT, imp = self, "Datagram unsupported by the peer"); return Err(gst::StateChangeError); @@ -252,13 +239,140 @@ impl ElementImpl for QuinnQuicMux { } } +impl AggregatorImpl for QuinnQuicMux { + fn aggregate(&self, timeout: bool) -> Result { + gst::trace!(CAT, imp = self, "Aggregate (timeout: {timeout})"); + + let all_eos = self.obj().sink_pads().iter().all(|pad| { + pad.downcast_ref::() + .expect("Not a QuinnQuicMux pad") + .is_eos() + }); + + if all_eos { + gst::debug!(CAT, imp = self, "All pads are EOS now"); + return Err(gst::FlowError::Eos); + } + + // Buffers might arrive with different PTS on the pads. Consider + // a scenario where a buffer with PTS 1s arrives after a buffer + // with PTS 2s on one of the pads. Buffer with PTS 2s gets pushed + // and then buffer with PTS 1s gets pushed. Sink will wait to sync + // on 2s PTS. This effectively creates a scenario similar to HOL + // blocking which we want to avoid with QUIC streams in the first + // place. + // + // To avoid this, we order the pads based on the buffer PTS and + // then push the buffers downstream. + let mut pad_pts: Vec<(Option, gst::Pad)> = Vec::new(); + let mut state = self.state.lock().unwrap(); + + for pad in self.obj().sink_pads() { + let muxpad = pad + .downcast_ref::() + .expect("Not a QuinnQuicMux pad"); + + if muxpad.is_eos() { + continue; + } + + match muxpad.peek_buffer() { + Some(buffer) => { + pad_pts.push((buffer.pts(), pad)); + } + None => continue, + } + } + + let buffers_sorted_by_pts = pad_pts.into_iter().sorted_by_key(|x| x.0); + gst::trace!( + CAT, + imp = self, + "Buffers PTS sorted: {buffers_sorted_by_pts:?}" + ); + + for (pts, pad) in buffers_sorted_by_pts { + if !state.segment_updated { + state.segment_updated = true; + let mut segment = gst::FormattedSegment::::new(); + segment.set_start(pts); + self.obj().update_segment(&segment); + } + + let muxpad = pad + .downcast_ref::() + .expect("Not a QuinnQuicMux pad"); + + if muxpad.name().starts_with("stream_") { + let stream_id = match state.stream_id_map.get(&pad) { + Some(stream_id) => *stream_id, + None => { + let mux_pad_settings = muxpad.imp().settings.lock().unwrap(); + let priority = mux_pad_settings.priority; + drop(mux_pad_settings); + + gst::info!( + CAT, + obj = pad, + "Requesting stream connection with priority {priority}" + ); + + match request_stream(&self.srcpad(), priority) { + Some(stream_id) => { + state.stream_id_map.insert(pad.clone(), stream_id); + stream_id + } + None => { + gst::error!(CAT, obj = pad, "Failed to request stream"); + + return Err(gst::FlowError::Error); + } + } + } + }; + + let buffer = { + let mut buffer = muxpad.pop_buffer().unwrap(); + let outbuf = buffer.make_mut(); + + QuinnQuicMeta::add(outbuf, stream_id, false); + + buffer + }; + + gst::trace!( + CAT, + obj = pad, + "Finishing buffer {buffer:?} for stream {stream_id}" + ); + + self.obj().finish_buffer(buffer)?; + + gst::trace!(CAT, obj = pad, "Finished buffer for stream {stream_id}"); + } else { + let buffer = { + let mut buffer = muxpad.pop_buffer().unwrap(); + let outbuf = buffer.make_mut(); + + QuinnQuicMeta::add(outbuf, 0, true); + + buffer + }; + + gst::trace!(CAT, obj = pad, "Finishing buffer {buffer:?} for datagram"); + + self.obj().finish_buffer(buffer)?; + } + } + + Ok(gst::FlowSuccess::Ok) + } +} + impl ObjectImpl for QuinnQuicMux { fn constructed(&self) { self.parent_constructed(); - - self.obj() - .add_pad(&self.srcpad) - .expect("Failed to add source pad"); + self.obj().set_force_live(true); } } @@ -266,15 +380,11 @@ impl ObjectImpl for QuinnQuicMux { impl ObjectSubclass for QuinnQuicMux { const NAME: &'static str = "GstQuinnQuicMux"; type Type = super::QuinnQuicMux; - type ParentType = gst::Element; - - fn with_class(klass: &Self::Class) -> Self { - let templ = klass.pad_template("src").unwrap(); - let srcpad = gst::Pad::builder_from_template(&templ).build(); + type ParentType = gst_base::Aggregator; + fn with_class(_klass: &Self::Class) -> Self { Self { state: Mutex::new(State::default()), - srcpad, } } } @@ -305,75 +415,18 @@ impl ChildProxyImpl for QuinnQuicMux { } impl QuinnQuicMux { - fn stream_uni_sink_chain( - &self, - pad: &super::QuinnQuicMuxPad, - mut buffer: gst::Buffer, - ) -> Result { - let mut state = self.state.lock().unwrap(); - let gst_pad = pad.upcast_ref::(); - - let stream_id = match state.stream_id_map.get(gst_pad) { - Some(stream_id) => *stream_id, - None => { - let mux_pad_settings = pad.imp().settings.lock().unwrap(); - let priority = mux_pad_settings.priority; - drop(mux_pad_settings); - - gst::info!( - CAT, - obj = pad, - "Requesting stream connection with priority {priority}" - ); - - match request_stream(&self.srcpad, priority) { - Some(stream_id) => { - state.stream_id_map.insert(gst_pad.clone(), stream_id); - stream_id - } - None => { - gst::error!(CAT, obj = pad, "Failed to request stream"); - - return Err(gst::FlowError::Error); - } - } - } - }; - - gst::trace!( - CAT, - obj = pad, - "Sending buffer {buffer:?} for stream {stream_id}" - ); - - let outbuf = buffer.make_mut(); - - QuinnQuicMeta::add(outbuf, stream_id, false); - - self.srcpad.push(outbuf.to_owned()) - } - - fn datagram_sink_chain( - &self, - pad: &gst::Pad, - mut buffer: gst::Buffer, - ) -> Result { - gst::trace!(CAT, obj = pad, "Handling buffer for datagram {:?}", buffer); - - let outbuf = buffer.make_mut(); - - QuinnQuicMeta::add(outbuf, 0, true); - - self.srcpad.push(outbuf.to_owned()) - } - fn close_stream_for_pad(&self, pad: &gst::Pad, state: &mut State) { if let Some(stream_id) = state.stream_id_map.remove(pad) { - if close_stream(&self.srcpad, stream_id) { + if close_stream(&self.srcpad(), stream_id) { gst::info!(CAT, obj = pad, "Closed connection"); } else { gst::warning!(CAT, obj = pad, "Failed to close connection"); } } } + + fn srcpad(&self) -> gst::Pad { + let obj = self.obj(); + obj.src_pad().upcast_ref::().clone() + } } diff --git a/net/quinn/src/quinnquicmux/mod.rs b/net/quinn/src/quinnquicmux/mod.rs index ac8e28b7..347c1106 100644 --- a/net/quinn/src/quinnquicmux/mod.rs +++ b/net/quinn/src/quinnquicmux/mod.rs @@ -22,10 +22,14 @@ glib::wrapper! { } glib::wrapper! { - pub(crate) struct QuinnQuicMuxPad(ObjectSubclass) @extends gst::ProxyPad, gst::Pad, gst::Object; + pub(crate) struct QuinnQuicMuxPad(ObjectSubclass) @extends gst_base::AggregatorPad, gst::ProxyPad, gst::Pad, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + QuinnQuicMuxPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } gst::Element::register( Some(plugin), "quinnquicmux",