diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index f3a72025..d014c3b2 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -5922,6 +5922,7 @@ "description": "Multiplexes multiple RTP streams over QUIC", "hierarchy": [ "GstQuinnRoqMux", + "GstAggregator", "GstElement", "GstObject", "GInitiallyUnowned", @@ -5935,16 +5936,16 @@ "presence": "request", "type": "QuinnRoqMuxPad" }, + "src": { + "caps": "ANY", + "direction": "src", + "presence": "always" + }, "stream_%%u": { "caps": "application/x-rtp:\n", "direction": "sink", "presence": "request", "type": "QuinnRoqMuxPad" - }, - "src": { - "caps": "ANY", - "direction": "src", - "presence": "always" } }, "rank": "none" @@ -5994,6 +5995,47 @@ "writable": true } } + }, + "QuinnRoqMuxPad": { + "hierarchy": [ + "QuinnRoqMuxPad", + "GstAggregatorPad", + "GstPad", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "kind": "object", + "properties": { + "flow-id": { + "blurb": "Flow identifier", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1", + "max": "4611686018427387903", + "min": "1", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": true + }, + "priority": { + "blurb": "Priority of the stream, ignored by datagrams", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "2147483647", + "min": "-2147483648", + "mutable": "null", + "readable": true, + "type": "gint", + "writable": true + } + } } }, "package": "gst-plugin-quinn", @@ -15274,4 +15316,4 @@ "tracers": {}, "url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs" } -} +} \ No newline at end of file diff --git a/net/quinn/src/quinnroqmux/imp.rs b/net/quinn/src/quinnroqmux/imp.rs index 9f229cd0..acb110b2 100644 --- a/net/quinn/src/quinnroqmux/imp.rs +++ b/net/quinn/src/quinnroqmux/imp.rs @@ -63,7 +63,7 @@ pub(crate) struct QuinnRoqMuxPad { impl ObjectSubclass for QuinnRoqMuxPad { const NAME: &'static str = "QuinnRoqMuxPad"; type Type = super::QuinnRoqMuxPad; - type ParentType = gst::Pad; + type ParentType = gst_base::AggregatorPad; } impl ObjectImpl for QuinnRoqMuxPad { @@ -125,15 +125,17 @@ impl PadImpl for QuinnRoqMuxPad {} impl ProxyPadImpl for QuinnRoqMuxPad {} +impl AggregatorPadImpl for QuinnRoqMuxPad {} + #[derive(Default)] struct State { - stream_uni_conns: u64, datagrams: u64, + stream_uni_conns: u64, + segment_updated: bool, } pub struct QuinnRoqMux { state: Mutex, - srcpad: gst::Pad, } impl GstObjectImpl for QuinnRoqMux {} @@ -204,13 +206,6 @@ impl ElementImpl for QuinnRoqMux { let sinkpad = gst::PadBuilder::::from_template(templ) .name(sink_pad_name.clone()) - .chain_function(|pad, parent, buffer| { - QuinnRoqMux::catch_panic_pad_function( - parent, - || Err(gst::FlowError::Error), - |this| this.rtp_stream_sink_chain(pad, buffer), - ) - }) .flags(gst::PadFlags::FIXED_CAPS) .build(); @@ -223,7 +218,7 @@ impl ElementImpl for QuinnRoqMux { Some(sinkpad.upcast()) } "datagram_%u" => { - if request_datagram(&self.srcpad) { + if request_datagram(&self.srcpad()) { gst::warning!(CAT, imp = self, "Datagram unsupported by peer"); return None; } @@ -234,13 +229,6 @@ impl ElementImpl for QuinnRoqMux { let sinkpad = gst::PadBuilder::::from_template(templ) .name(sink_pad_name.clone()) - .chain_function(|pad, parent, buffer| { - QuinnRoqMux::catch_panic_pad_function( - parent, - || Err(gst::FlowError::Error), - |this| this.rtp_datagram_sink_chain(pad, buffer), - ) - }) .flags(gst::PadFlags::FIXED_CAPS) .build(); @@ -257,23 +245,18 @@ impl ElementImpl for QuinnRoqMux { } fn release_pad(&self, pad: &gst::Pad) { - pad.set_active(false).unwrap(); - if pad.name().starts_with("stream") { self.close_stream_for_pad(pad); } - self.obj().remove_pad(pad).unwrap(); + self.parent_release_pad(pad) } } impl ObjectImpl for QuinnRoqMux { fn constructed(&self) { self.parent_constructed(); - - self.obj() - .add_pad(&self.srcpad) - .expect("Failed to add source pad"); + self.obj().set_force_live(true); } } @@ -281,15 +264,11 @@ impl ObjectImpl for QuinnRoqMux { impl ObjectSubclass for QuinnRoqMux { const NAME: &'static str = "GstQuinnRoqMux"; type Type = super::QuinnRoqMux; - type ParentType = gst::Element; - - fn with_class(klass: &Self::Class) -> Self { - let templ = klass.pad_template("src").unwrap(); - let srcpad = gst::Pad::builder_from_template(&templ).build(); + type ParentType = gst_base::Aggregator; + fn with_class(_klass: &Self::Class) -> Self { Self { state: Mutex::new(State::default()), - srcpad, } } } @@ -319,6 +298,83 @@ impl ChildProxyImpl for QuinnRoqMux { } } +impl AggregatorImpl for QuinnRoqMux { + fn aggregate(&self, timeout: bool) -> Result { + gst::trace!(CAT, imp = self, "Aggregate (timeout: {timeout})"); + + let all_eos = self.obj().sink_pads().iter().all(|pad| { + pad.downcast_ref::() + .expect("Not a QuinnRoqMux pad") + .is_eos() + }); + + if all_eos { + gst::debug!(CAT, imp = self, "All pads are EOS now"); + return Err(gst::FlowError::Eos); + } + + // Buffers might arrive with different PTS on the pads. Consider + // a scenario where a buffer with PTS 1s arrives after a buffer + // with PTS 2s on one of the pads. Buffer with PTS 2s gets pushed + // and then buffer with PTS 1s gets pushed. Sink will wait to sync + // on 2s PTS. This effectively creates a scenario similar to HOL + // blocking which we want to avoid with QUIC streams in the first + // place. + // + // To avoid this, we order the pads based on the buffer PTS and + // then push the buffers downstream. + let mut pad_pts: Vec<(Option, gst::Pad)> = Vec::new(); + let mut state = self.state.lock().unwrap(); + + for pad in self.obj().sink_pads() { + let muxpad = pad + .downcast_ref::() + .expect("Not a QuinnRoqMux pad"); + + if muxpad.is_eos() { + continue; + } + + match muxpad.peek_buffer() { + Some(buffer) => { + pad_pts.push((buffer.pts(), pad)); + } + None => continue, + } + } + + let buffers_sorted_by_pts = pad_pts.into_iter().sorted_by_key(|x| x.0); + gst::trace!( + CAT, + imp = self, + "Buffers PTS sorted: {buffers_sorted_by_pts:?}" + ); + + for (pts, pad) in buffers_sorted_by_pts { + if !state.segment_updated { + state.segment_updated = true; + let mut segment = gst::FormattedSegment::::new(); + segment.set_start(pts); + self.obj().update_segment(&segment); + } + + let muxpad = pad + .downcast_ref::() + .expect("Not a QuinnRoqMux pad"); + + let buffer = muxpad.pop_buffer().unwrap(); + + if muxpad.name().starts_with("stream_") { + self.rtp_stream_sink_chain(muxpad, buffer)?; + } else { + self.rtp_datagram_sink_chain(muxpad, buffer)?; + } + } + + Ok(gst::FlowSuccess::Ok) + } +} + impl QuinnRoqMux { fn rtp_datagram_sink_chain( &self, @@ -369,7 +425,7 @@ impl QuinnRoqMux { outbuf.append(buffer); - self.srcpad.push(outbuf) + self.obj().finish_buffer(outbuf) } fn rtp_stream_sink_chain( @@ -403,7 +459,7 @@ impl QuinnRoqMux { gst::info!(CAT, obj = pad, "Requesting stream with priority {priority}"); - match request_stream(&self.srcpad, priority) { + match request_stream(&self.srcpad(), priority) { Some(stream_id) => { pad_state.stream_id = Some(stream_id); stream_id @@ -436,7 +492,7 @@ impl QuinnRoqMux { QuinnQuicMeta::add(buffer, stream_id, false); } - if let Err(e) = self.srcpad.push(flow_id_buf) { + if let Err(e) = self.obj().finish_buffer(flow_id_buf) { gst::error!(CAT, obj = pad, "Failed to push flow id buffer: {e:?}"); return Err(gst::FlowError::Error); } @@ -481,7 +537,7 @@ impl QuinnRoqMux { outbuf.size(), ); - self.srcpad.push(outbuf) + self.obj().finish_buffer(outbuf) } fn close_stream_for_pad(&self, pad: &gst::Pad) { @@ -489,11 +545,16 @@ impl QuinnRoqMux { let pad_state = mux_pad.imp().state.lock().unwrap(); if let Some(stream_id) = pad_state.stream_id { - if close_stream(&self.srcpad, stream_id) { + if close_stream(&self.srcpad(), stream_id) { gst::info!(CAT, obj = pad, "Closed connection"); } else { gst::warning!(CAT, obj = pad, "Failed to close connection"); } } } + + fn srcpad(&self) -> gst::Pad { + let obj = self.obj(); + obj.src_pad().upcast_ref::().clone() + } } diff --git a/net/quinn/src/quinnroqmux/mod.rs b/net/quinn/src/quinnroqmux/mod.rs index 4806bf4d..d3153065 100644 --- a/net/quinn/src/quinnroqmux/mod.rs +++ b/net/quinn/src/quinnroqmux/mod.rs @@ -22,10 +22,14 @@ glib::wrapper! { } glib::wrapper! { - pub(crate) struct QuinnRoqMuxPad(ObjectSubclass) @extends gst::ProxyPad, gst::Pad, gst::Object; + pub(crate) struct QuinnRoqMuxPad(ObjectSubclass) @extends gst_base::AggregatorPad, gst::ProxyPad, gst::Pad, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + { + QuinnRoqMuxPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + } gst::Element::register( Some(plugin), "quinnroqmux",