diff --git a/net/quinn/src/quinnwtclientsrc/imp.rs b/net/quinn/src/quinnwtclientsrc/imp.rs index 32e924a3..47a22387 100644 --- a/net/quinn/src/quinnwtclientsrc/imp.rs +++ b/net/quinn/src/quinnwtclientsrc/imp.rs @@ -55,7 +55,6 @@ enum State { struct Settings { bind_address: String, bind_port: u16, - caps: gst::Caps, certificate_file: Option, keep_alive_interval: u64, timeout: u32, @@ -66,13 +65,14 @@ struct Settings { impl Default for Settings { fn default() -> Self { - let mut transport_config = QuinnQuicTransportConfig::default(); // Required for the WebTransport handshake - transport_config.max_concurrent_bidi_streams = 2u32.into(); - transport_config.max_concurrent_uni_streams = 1u32.into(); + let transport_config = QuinnQuicTransportConfig { + max_concurrent_bidi_streams: 2u32.into(), + max_concurrent_uni_streams: 1u32.into(), + ..Default::default() + }; Settings { - caps: gst::Caps::new_any(), bind_address: DEFAULT_BIND_ADDR.to_string(), bind_port: DEFAULT_BIND_PORT, certificate_file: None, @@ -144,10 +144,6 @@ impl ObjectImpl for QuinnWebTransportClientSrc { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: LazyLock> = LazyLock::new(|| { vec![ - glib::ParamSpecBoxed::builder::("caps") - .nick("caps") - .blurb("The caps of the source pad") - .build(), glib::ParamSpecString::builder("certificate-file") .nick("Certificate file") .blurb("Path to certificate chain in single file") @@ -189,15 +185,6 @@ impl ObjectImpl for QuinnWebTransportClientSrc { let mut settings = self.settings.lock().unwrap(); match pspec.name() { - "caps" => { - settings.caps = value - .get::>() - .expect("type checked upstream") - .unwrap_or_else(gst::Caps::new_any); - - let srcpad = self.obj().static_pad("src").expect("source pad expected"); - srcpad.mark_reconfigure(); - } "certificate-file" => { let value: String = value.get().unwrap(); settings.certificate_file = Some(value.into()); @@ -222,7 +209,6 @@ impl ObjectImpl for QuinnWebTransportClientSrc { let settings = self.settings.lock().unwrap(); match pspec.name() { - "caps" => settings.caps.to_value(), "certificate-file" => { let certfile = settings.certificate_file.as_ref(); certfile.and_then(|file| file.to_str()).to_value() @@ -247,7 +233,7 @@ impl ObjectImpl for QuinnWebTransportClientSrc { impl ObjectSubclass for QuinnWebTransportClientSrc { const NAME: &'static str = "GstQuinnWebTransportClientSrc"; type Type = super::QuinnWebTransportClientSrc; - type ParentType = gst_base::BaseSrc; + type ParentType = gst_base::PushSrc; } impl BaseSrcImpl for QuinnWebTransportClientSrc { @@ -310,28 +296,24 @@ impl BaseSrcImpl for QuinnWebTransportClientSrc { Ok(()) } - fn query(&self, query: &mut gst::QueryRef) -> bool { - if let gst::QueryViewMut::Scheduling(q) = query.view_mut() { - q.set( - gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED, - 1, - -1, - 0, - ); - q.add_scheduling_modes(&[gst::PadMode::Pull, gst::PadMode::Push]); - return true; - } - - BaseSrcImplExt::parent_query(self, query) + fn unlock(&self) -> Result<(), gst::ErrorMessage> { + let mut canceller = self.canceller.lock().unwrap(); + canceller.abort(); + Ok(()) } - fn create( - &self, - offset: u64, - buffer: Option<&mut gst::BufferRef>, - length: u32, - ) -> Result { - let data = self.get(offset, u64::from(length)); + fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> { + let mut canceller = self.canceller.lock().unwrap(); + if matches!(&*canceller, Canceller::Cancelled) { + *canceller = Canceller::None; + } + Ok(()) + } +} + +impl PushSrcImpl for QuinnWebTransportClientSrc { + fn create(&self, buffer: Option<&mut gst::BufferRef>) -> Result { + let data = self.get(); match data { Ok(bytes) => { @@ -356,43 +338,6 @@ impl BaseSrcImpl for QuinnWebTransportClientSrc { } } } - - fn unlock(&self) -> Result<(), gst::ErrorMessage> { - let mut canceller = self.canceller.lock().unwrap(); - canceller.abort(); - Ok(()) - } - - fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> { - let mut canceller = self.canceller.lock().unwrap(); - if matches!(&*canceller, Canceller::Cancelled) { - *canceller = Canceller::None; - } - Ok(()) - } - - fn caps(&self, filter: Option<&gst::Caps>) -> Option { - let settings = self.settings.lock().unwrap(); - - let mut tmp_caps = settings.caps.clone(); - - gst::debug!(CAT, imp = self, "Advertising our own caps: {:?}", &tmp_caps); - - if let Some(filter_caps) = filter { - gst::debug!( - CAT, - imp = self, - "Intersecting with filter caps: {:?}", - &filter_caps - ); - - tmp_caps = filter_caps.intersect_with_mode(&tmp_caps, gst::CapsIntersectMode::First); - }; - - gst::debug!(CAT, imp = self, "Returning caps: {:?}", &tmp_caps); - - Some(tmp_caps) - } } impl QuinnWebTransportClientSrc { @@ -459,7 +404,7 @@ impl QuinnWebTransportClientSrc { } } - fn get(&self, _offset: u64, length: u64) -> Result> { + fn get(&self) -> Result> { let settings = self.settings.lock().unwrap(); let timeout = settings.timeout; let use_datagram = settings.use_datagram; @@ -485,7 +430,7 @@ impl QuinnWebTransportClientSrc { self.read_datagram(session).await } else { let recv = stream.as_mut().unwrap(); - self.read_stream(recv, length as usize).await + self.read_stream(recv, 1024usize).await } }; diff --git a/net/quinn/src/quinnwtclientsrc/mod.rs b/net/quinn/src/quinnwtclientsrc/mod.rs index c8481af0..b8866e79 100644 --- a/net/quinn/src/quinnwtclientsrc/mod.rs +++ b/net/quinn/src/quinnwtclientsrc/mod.rs @@ -26,7 +26,7 @@ use gst::prelude::*; mod imp; glib::wrapper! { - pub struct QuinnWebTransportClientSrc(ObjectSubclass) @extends gst_base::BaseSrc, gst::Element, gst::Object; + pub struct QuinnWebTransportClientSrc(ObjectSubclass) @extends gst_base::PushSrc, gst_base::BaseSrc, gst::Element, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { diff --git a/net/quinn/src/quinnwtserversink/imp.rs b/net/quinn/src/quinnwtserversink/imp.rs index a8a09b3a..967e9af7 100644 --- a/net/quinn/src/quinnwtserversink/imp.rs +++ b/net/quinn/src/quinnwtserversink/imp.rs @@ -472,6 +472,7 @@ impl BaseSinkImpl for QuinnWebTransportServerSink { }, } } + fn unlock(&self) -> Result<(), gst::ErrorMessage> { let mut canceller = self.canceller.lock().unwrap(); canceller.abort();