net/quinn: Move quinnwtclientsrc to PushSrc

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1867>
This commit is contained in:
Sanchayan Maity 2024-11-26 18:42:01 +05:30 committed by GStreamer Marge Bot
parent 850331699a
commit be02c0e388
3 changed files with 26 additions and 80 deletions

View file

@ -55,7 +55,6 @@ enum State {
struct Settings {
bind_address: String,
bind_port: u16,
caps: gst::Caps,
certificate_file: Option<PathBuf>,
keep_alive_interval: u64,
timeout: u32,
@ -66,13 +65,14 @@ struct Settings {
impl Default for Settings {
fn default() -> Self {
let mut transport_config = QuinnQuicTransportConfig::default();
// Required for the WebTransport handshake
transport_config.max_concurrent_bidi_streams = 2u32.into();
transport_config.max_concurrent_uni_streams = 1u32.into();
let transport_config = QuinnQuicTransportConfig {
max_concurrent_bidi_streams: 2u32.into(),
max_concurrent_uni_streams: 1u32.into(),
..Default::default()
};
Settings {
caps: gst::Caps::new_any(),
bind_address: DEFAULT_BIND_ADDR.to_string(),
bind_port: DEFAULT_BIND_PORT,
certificate_file: None,
@ -144,10 +144,6 @@ impl ObjectImpl for QuinnWebTransportClientSrc {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![
glib::ParamSpecBoxed::builder::<gst::Caps>("caps")
.nick("caps")
.blurb("The caps of the source pad")
.build(),
glib::ParamSpecString::builder("certificate-file")
.nick("Certificate file")
.blurb("Path to certificate chain in single file")
@ -189,15 +185,6 @@ impl ObjectImpl for QuinnWebTransportClientSrc {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"caps" => {
settings.caps = value
.get::<Option<gst::Caps>>()
.expect("type checked upstream")
.unwrap_or_else(gst::Caps::new_any);
let srcpad = self.obj().static_pad("src").expect("source pad expected");
srcpad.mark_reconfigure();
}
"certificate-file" => {
let value: String = value.get().unwrap();
settings.certificate_file = Some(value.into());
@ -222,7 +209,6 @@ impl ObjectImpl for QuinnWebTransportClientSrc {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"caps" => settings.caps.to_value(),
"certificate-file" => {
let certfile = settings.certificate_file.as_ref();
certfile.and_then(|file| file.to_str()).to_value()
@ -247,7 +233,7 @@ impl ObjectImpl for QuinnWebTransportClientSrc {
impl ObjectSubclass for QuinnWebTransportClientSrc {
const NAME: &'static str = "GstQuinnWebTransportClientSrc";
type Type = super::QuinnWebTransportClientSrc;
type ParentType = gst_base::BaseSrc;
type ParentType = gst_base::PushSrc;
}
impl BaseSrcImpl for QuinnWebTransportClientSrc {
@ -310,28 +296,24 @@ impl BaseSrcImpl for QuinnWebTransportClientSrc {
Ok(())
}
fn query(&self, query: &mut gst::QueryRef) -> bool {
if let gst::QueryViewMut::Scheduling(q) = query.view_mut() {
q.set(
gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED,
1,
-1,
0,
);
q.add_scheduling_modes(&[gst::PadMode::Pull, gst::PadMode::Push]);
return true;
}
BaseSrcImplExt::parent_query(self, query)
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
let mut canceller = self.canceller.lock().unwrap();
canceller.abort();
Ok(())
}
fn create(
&self,
offset: u64,
buffer: Option<&mut gst::BufferRef>,
length: u32,
) -> Result<CreateSuccess, gst::FlowError> {
let data = self.get(offset, u64::from(length));
fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> {
let mut canceller = self.canceller.lock().unwrap();
if matches!(&*canceller, Canceller::Cancelled) {
*canceller = Canceller::None;
}
Ok(())
}
}
impl PushSrcImpl for QuinnWebTransportClientSrc {
fn create(&self, buffer: Option<&mut gst::BufferRef>) -> Result<CreateSuccess, gst::FlowError> {
let data = self.get();
match data {
Ok(bytes) => {
@ -356,43 +338,6 @@ impl BaseSrcImpl for QuinnWebTransportClientSrc {
}
}
}
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
let mut canceller = self.canceller.lock().unwrap();
canceller.abort();
Ok(())
}
fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> {
let mut canceller = self.canceller.lock().unwrap();
if matches!(&*canceller, Canceller::Cancelled) {
*canceller = Canceller::None;
}
Ok(())
}
fn caps(&self, filter: Option<&gst::Caps>) -> Option<gst::Caps> {
let settings = self.settings.lock().unwrap();
let mut tmp_caps = settings.caps.clone();
gst::debug!(CAT, imp = self, "Advertising our own caps: {:?}", &tmp_caps);
if let Some(filter_caps) = filter {
gst::debug!(
CAT,
imp = self,
"Intersecting with filter caps: {:?}",
&filter_caps
);
tmp_caps = filter_caps.intersect_with_mode(&tmp_caps, gst::CapsIntersectMode::First);
};
gst::debug!(CAT, imp = self, "Returning caps: {:?}", &tmp_caps);
Some(tmp_caps)
}
}
impl QuinnWebTransportClientSrc {
@ -459,7 +404,7 @@ impl QuinnWebTransportClientSrc {
}
}
fn get(&self, _offset: u64, length: u64) -> Result<Bytes, Option<gst::ErrorMessage>> {
fn get(&self) -> Result<Bytes, Option<gst::ErrorMessage>> {
let settings = self.settings.lock().unwrap();
let timeout = settings.timeout;
let use_datagram = settings.use_datagram;
@ -485,7 +430,7 @@ impl QuinnWebTransportClientSrc {
self.read_datagram(session).await
} else {
let recv = stream.as_mut().unwrap();
self.read_stream(recv, length as usize).await
self.read_stream(recv, 1024usize).await
}
};

View file

@ -26,7 +26,7 @@ use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct QuinnWebTransportClientSrc(ObjectSubclass<imp::QuinnWebTransportClientSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
pub struct QuinnWebTransportClientSrc(ObjectSubclass<imp::QuinnWebTransportClientSrc>) @extends gst_base::PushSrc, gst_base::BaseSrc, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {

View file

@ -472,6 +472,7 @@ impl BaseSinkImpl for QuinnWebTransportServerSink {
},
}
}
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
let mut canceller = self.canceller.lock().unwrap();
canceller.abort();