From f4ecf3873b0a773af43d6c603e030bceb489c388 Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Mon, 17 Jun 2024 21:48:33 +0530 Subject: [PATCH] net/quinn: Handle multiple stream connections in quinnquicsrc While at it, use PushSrc as base class. quinnquicsrc never supported seeking and only ever operated in push mode. Length and offset for create from BaseSrc was also never really honoured. Use PushSrc as the base class which is more appropriate. Part-of: --- Cargo.lock | 1 + net/quinn/Cargo.toml | 5 +- net/quinn/src/common.rs | 4 + net/quinn/src/quinnquicsrc/imp.rs | 520 ++++++++++++++++++++---------- net/quinn/src/quinnquicsrc/mod.rs | 2 +- net/quinn/src/utils.rs | 2 + 6 files changed, 358 insertions(+), 176 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index a09a4618..2daf9800 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2844,6 +2844,7 @@ dependencies = [ name = "gst-plugin-quinn" version = "0.14.0-alpha.1" dependencies = [ + "async-channel", "bytes", "futures", "glib", diff --git a/net/quinn/Cargo.toml b/net/quinn/Cargo.toml index 7a03d218..5aac314c 100644 --- a/net/quinn/Cargo.toml +++ b/net/quinn/Cargo.toml @@ -13,8 +13,8 @@ rust-version.workspace = true [dependencies] glib.workspace = true gst.workspace = true -gst-base.workspace = true -tokio = { version = "1.36.0", default-features = false, features = ["time", "rt-multi-thread"] } +gst-base = { workspace = true, features = ["v1_22"] } +tokio = { version = "1.36.0", default-features = false, features = ["time", "rt-multi-thread", "macros"] } futures = "0.3.30" quinn = { version = "0.11.5", default-features = false, features = ["ring", "rustls", "runtime-tokio"] } quinn-proto ={ version = "0.11.8", default-features = false, features = ["rustls"] } @@ -24,6 +24,7 @@ rustls-pki-types = "1" rcgen = "0.13" bytes = "1.5.0" thiserror = "2" +async-channel = "2.3" [dev-dependencies] gst-check = { workspace = true, features = ["v1_20"] } diff --git a/net/quinn/src/common.rs b/net/quinn/src/common.rs index 5ffc1f40..89bf8e9b 100644 --- a/net/quinn/src/common.rs +++ b/net/quinn/src/common.rs @@ -57,6 +57,8 @@ pub struct QuinnQuicTransportConfig { pub upper_bound_mtu: u16, pub max_concurrent_uni_streams: VarInt, pub send_window: u64, + pub stream_receive_window: VarInt, + pub receive_window: VarInt, } impl Default for QuinnQuicTransportConfig { @@ -77,6 +79,8 @@ impl Default for QuinnQuicTransportConfig { upper_bound_mtu: DEFAULT_UPPER_BOUND_MTU, max_concurrent_uni_streams: DEFAULT_MAX_CONCURRENT_UNI_STREAMS, send_window: (8 * STREAM_RWND).into(), + stream_receive_window: STREAM_RWND.into(), + receive_window: VarInt::MAX, } } } diff --git a/net/quinn/src/quinnquicsrc/imp.rs b/net/quinn/src/quinnquicsrc/imp.rs index 5aeaabaf..b2a37efb 100644 --- a/net/quinn/src/quinnquicsrc/imp.rs +++ b/net/quinn/src/quinnquicsrc/imp.rs @@ -7,23 +7,29 @@ // // SPDX-License-Identifier: MPL-2.0 +use crate::quinnquicmeta::QuinnQuicMeta; +use crate::quinnquicquery::*; use crate::utils::{ client_endpoint, get_stats, make_socket_addr, server_endpoint, wait, Canceller, - QuinnQuicEndpointConfig, WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, + QuinnQuicEndpointConfig, WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, RUNTIME, }; use crate::{common::*, utils}; +use async_channel::{unbounded, Receiver, Sender}; use bytes::Bytes; -use futures::future; +use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use gst::{glib, prelude::*, subclass::prelude::*}; use gst_base::prelude::*; use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; -use quinn::{Connection, ConnectionError, ReadError, RecvStream, TransportConfig}; +use quinn::{Chunk, Connection, ConnectionError, ReadError, RecvStream, TransportConfig, VarInt}; use std::path::PathBuf; -use std::sync::LazyLock; -use std::sync::Mutex; +use std::sync::{LazyLock, Mutex}; +use std::thread::{spawn, Builder, JoinHandle}; +use tokio::sync::oneshot; const DEFAULT_ROLE: QuinnQuicRole = QuinnQuicRole::Server; +const DEFAULT_USE_DATAGRAM: bool = false; +const DATA_HANDLER_THREAD: &str = "data-handler"; static CAT: LazyLock = LazyLock::new(|| { gst::DebugCategory::new( @@ -33,9 +39,23 @@ static CAT: LazyLock = LazyLock::new(|| { ) }); +enum QuinnData { + Datagram(Bytes), + Stream(u64, Bytes), + Closed(u64), + Eos, +} + struct Started { connection: Connection, - stream: Option, + data_handler: Option>, + // TODO: Use tokio channel + // + // We use async-channel to keep a clone of the receive channel around + // for use in every `create` call. tokio's UnboundedReceiver does not + // implement clone. + data_rx: Option>, + thread_quit: Option>, } #[derive(Default)] @@ -78,7 +98,7 @@ impl Default for Settings { keep_alive_interval: 0, secure_conn: DEFAULT_SECURE_CONNECTION, caps: gst::Caps::new_any(), - use_datagram: false, + use_datagram: DEFAULT_USE_DATAGRAM, certificate_file: None, private_key_file: None, transport_config: QuinnQuicTransportConfig::default(), @@ -163,7 +183,9 @@ impl ElementImpl for QuinnQuicSrc { impl ObjectImpl for QuinnQuicSrc { fn constructed(&self) { self.parent_constructed(); - self.obj().set_format(gst::Format::Bytes); + self.obj().set_format(gst::Format::Time); + self.obj().set_do_timestamp(true); + self.obj().set_live(true); } fn properties() -> &'static [glib::ParamSpec] { @@ -237,7 +259,7 @@ impl ObjectImpl for QuinnQuicSrc { glib::ParamSpecBoolean::builder("use-datagram") .nick("Use datagram") .blurb("Use datagram for lower latency, unreliable messaging") - .default_value(false) + .default_value(DEFAULT_USE_DATAGRAM) .build(), glib::ParamSpecUInt::builder("initial-mtu") .nick("Initial MTU") @@ -277,7 +299,25 @@ impl ObjectImpl for QuinnQuicSrc { .nick("Connection statistics") .blurb("Connection statistics") .read_only() - .build() + .build(), + glib::ParamSpecUInt64::builder("max-concurrent-uni-streams") + .nick("Maximum concurrent uni-directional streams") + .blurb("Maximum number of incoming unidirectional streams that may be open concurrently") + .default_value(DEFAULT_MAX_CONCURRENT_UNI_STREAMS.into()) + .readwrite() + .build(), + glib::ParamSpecUInt64::builder("receive-window") + .nick("Receive Window") + .blurb("Maximum number of bytes the peer may transmit across all streams of a connection before becoming blocked") + .maximum(VarInt::MAX.into()) + .readwrite() + .build(), + glib::ParamSpecUInt64::builder("stream-receive-window") + .nick("Stream Receive Window") + .blurb("Maximum number of bytes the peer may transmit without ACK on any one stream before becoming blocked") + .maximum(VarInt::MAX.into()) + .readwrite() + .build(), ] }); @@ -374,6 +414,21 @@ impl ObjectImpl for QuinnQuicSrc { let value = value.get::().expect("type checked upstream"); settings.transport_config.datagram_send_buffer_size = value as usize; } + "max-concurrent-uni-streams" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.max_concurrent_uni_streams = + VarInt::from_u64(value.max(VarInt::MAX.into())).unwrap(); + } + "receive-window" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.receive_window = + VarInt::from_u64(value.max(VarInt::MAX.into())).unwrap(); + } + "stream-receive-window" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.stream_receive_window = + VarInt::from_u64(value.max(VarInt::MAX.into())).unwrap(); + } _ => unimplemented!(), } } @@ -433,6 +488,13 @@ impl ObjectImpl for QuinnQuicSrc { State::Stopped => get_stats(None).to_value(), } } + "max-concurrent-uni-streams" => { + u64::from(settings.transport_config.max_concurrent_uni_streams).to_value() + } + "receive-window" => u64::from(settings.transport_config.receive_window).to_value(), + "stream-receive-window" => { + u64::from(settings.transport_config.stream_receive_window).to_value() + } _ => unimplemented!(), } } @@ -442,31 +504,47 @@ impl ObjectImpl for QuinnQuicSrc { impl ObjectSubclass for QuinnQuicSrc { const NAME: &'static str = "GstQuinnQuicSrc"; type Type = super::QuinnQuicSrc; - type ParentType = gst_base::BaseSrc; + type ParentType = gst_base::PushSrc; } impl BaseSrcImpl for QuinnQuicSrc { - fn is_seekable(&self) -> bool { - false - } - fn start(&self) -> Result<(), gst::ErrorMessage> { let settings = self.settings.lock().unwrap(); let timeout = settings.timeout; drop(settings); - let mut state = self.state.lock().unwrap(); - + let state = self.state.lock().unwrap(); if let State::Started { .. } = *state { unreachable!("QuicSrc already started"); } + drop(state); match wait(&self.canceller, self.init_connection(), timeout) { - Ok(Ok((c, s))) => { + Ok(Ok(conn)) => { + let connection = conn.clone(); + + let (tx_quit, rx_quit): (oneshot::Sender<()>, oneshot::Receiver<()>) = + oneshot::channel(); + let (data_tx, data_rx): (Sender, Receiver) = unbounded(); + + let conn_clone = conn.clone(); + let self_ = self.ref_counted(); + let data_handler = Builder::new() + .name(DATA_HANDLER_THREAD.to_string()) + .spawn(move || { + self_.handle_data(conn_clone, data_tx, rx_quit); + gst::debug!(CAT, imp = self_, "Data handler thread exit"); + }) + .unwrap(); + + let mut state = self.state.lock().unwrap(); *state = State::Started(Started { - connection: c, - stream: s, + connection, + data_handler: Some(data_handler), + data_rx: Some(data_rx), + thread_quit: Some(tx_quit), }); + drop(state); gst::info!(CAT, imp = self, "Started"); @@ -489,12 +567,27 @@ impl BaseSrcImpl for QuinnQuicSrc { } fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::info!(CAT, imp = self, "Stopping"); + let mut state = self.state.lock().unwrap(); if let State::Started(ref mut state) = *state { - let connection = &state.connection; + if let Some(channel) = state.thread_quit.take() { + gst::debug!(CAT, imp = self, "Signalling threads to exit"); + let _ = channel.send(()); + } - connection.close( + gst::debug!(CAT, imp = self, "Joining data handler thread"); + if let Some(handle) = state.data_handler.take() { + match handle.join() { + Ok(_) => gst::debug!(CAT, imp = self, "Joined data handler thread"), + Err(e) => { + gst::error!(CAT, imp = self, "Failed to join data handler thread: {e:?}") + } + } + } + + state.connection.close( CONNECTION_CLOSE_CODE.into(), CONNECTION_CLOSE_MSG.as_bytes(), ); @@ -502,56 +595,11 @@ impl BaseSrcImpl for QuinnQuicSrc { *state = State::Stopped; + gst::info!(CAT, imp = self, "Stopped"); + Ok(()) } - fn query(&self, query: &mut gst::QueryRef) -> bool { - if let gst::QueryViewMut::Scheduling(q) = query.view_mut() { - q.set( - gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED, - 1, - -1, - 0, - ); - q.add_scheduling_modes(&[gst::PadMode::Pull, gst::PadMode::Push]); - return true; - } - - BaseSrcImplExt::parent_query(self, query) - } - - fn create( - &self, - offset: u64, - buffer: Option<&mut gst::BufferRef>, - length: u32, - ) -> Result { - let data = self.get(offset, u64::from(length)); - - match data { - Ok(bytes) => { - if bytes.is_empty() { - gst::debug!(CAT, imp = self, "End of stream"); - return Err(gst::FlowError::Eos); - } - - if let Some(buffer) = buffer { - if let Err(copied_bytes) = buffer.copy_from_slice(0, bytes.as_ref()) { - buffer.set_size(copied_bytes); - } - Ok(CreateSuccess::FilledBuffer) - } else { - Ok(CreateSuccess::NewBuffer(gst::Buffer::from_slice(bytes))) - } - } - Err(None) => Err(gst::FlowError::Flushing), - Err(Some(err)) => { - gst::error!(CAT, imp = self, "Could not GET: {}", err); - Err(gst::FlowError::Error) - } - } - } - fn unlock(&self) -> Result<(), gst::ErrorMessage> { let mut canceller = self.canceller.lock().unwrap(); canceller.abort(); @@ -588,22 +636,83 @@ impl BaseSrcImpl for QuinnQuicSrc { Some(tmp_caps) } + + fn is_seekable(&self) -> bool { + false + } +} + +impl PushSrcImpl for QuinnQuicSrc { + fn create( + &self, + _buffer: Option<&mut gst::BufferRef>, + ) -> Result { + loop { + // We do not want `create` to return when a stream is closed, + // but, wait for one of the other streams to receive data. + match self.get() { + Ok(Some(QuinnData::Stream(stream_id, bytes))) => { + break Ok(self.create_buffer(bytes, Some(stream_id))); + } + Ok(Some(QuinnData::Datagram(bytes))) => { + break Ok(self.create_buffer(bytes, None)); + } + Ok(Some(QuinnData::Eos)) => { + gst::debug!(CAT, imp = self, "End of stream"); + break Err(gst::FlowError::Eos); + } + Ok(None) => { + gst::debug!(CAT, imp = self, "End of stream"); + break Err(gst::FlowError::Eos); + } + Err(None) => { + gst::debug!(CAT, imp = self, "Flushing"); + break Err(gst::FlowError::Flushing); + } + Err(Some(err)) => { + gst::error!(CAT, imp = self, "Could not GET: {}", err); + break Err(gst::FlowError::Error); + } + Ok(Some(QuinnData::Closed(stream_id))) => { + // Send custom downstream event for demuxer to close + // and remove the stream. + let srcpad = self.obj().static_pad("src").expect("source pad expected"); + close_stream(&srcpad, stream_id); + } + } + } + } } impl QuinnQuicSrc { - fn get(&self, _offset: u64, length: u64) -> Result> { + fn create_buffer(&self, bytes: Bytes, stream_id: Option) -> CreateSuccess { + gst::trace!( + CAT, + imp = self, + "Pushing buffer of {} bytes for stream: {stream_id:?}", + bytes.len() + ); + + let mut buffer = gst::Buffer::from_slice(bytes); + { + let buffer = buffer.get_mut().unwrap(); + match stream_id { + Some(id) => QuinnQuicMeta::add(buffer, id, false), + None => QuinnQuicMeta::add(buffer, 0, true), + }; + } + + CreateSuccess::NewBuffer(buffer.to_owned()) + } + + fn get(&self) -> Result, Option> { let settings = self.settings.lock().unwrap(); let timeout = settings.timeout; - let use_datagram = settings.use_datagram; drop(settings); - let mut state = self.state.lock().unwrap(); - - let (conn, stream) = match *state { - State::Started(Started { - ref connection, - ref mut stream, - }) => (connection, stream), + let state = self.state.lock().unwrap(); + let rx_chan = match *state { + State::Started(ref started) => started.data_rx.clone(), State::Stopped => { return Err(Some(gst::error_msg!( gst::LibraryError::Failed, @@ -611,71 +720,17 @@ impl QuinnQuicSrc { ))); } }; + drop(state); - let future = async { - if use_datagram { - match conn.read_datagram().await { - Ok(bytes) => Ok(bytes), - Err(err) => match err { - ConnectionError::ApplicationClosed(ac) => { - gst::info!(CAT, imp = self, "Application closed connection, {}", ac); - Ok(Bytes::new()) - } - ConnectionError::ConnectionClosed(cc) => { - gst::info!(CAT, imp = self, "Transport closed connection, {}", cc); - Ok(Bytes::new()) - } - _ => Err(WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Datagram read error: {}", err] - ))), - }, - } - } else { - let recv = stream.as_mut().unwrap(); + let rx_chan = rx_chan.expect("Channel must be valid here"); - match recv.read_chunk(length as usize, true).await { - Ok(Some(chunk)) => Ok(chunk.bytes), - Ok(None) => Ok(Bytes::new()), - Err(err) => match err { - ReadError::ConnectionLost(conn_err) => match conn_err { - ConnectionError::ConnectionClosed(cc) => { - gst::info!(CAT, imp = self, "Transport closed connection, {}", cc); - Ok(Bytes::new()) - } - ConnectionError::ApplicationClosed(ac) => { - gst::info!( - CAT, - imp = self, - "Application closed connection, {}", - ac - ); - Ok(Bytes::new()) - } - _ => Err(WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Stream read error: {}", conn_err] - ))), - }, - ReadError::ClosedStream => { - gst::info!(CAT, imp = self, "Stream closed"); - Ok(Bytes::new()) - } - _ => Err(WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Stream read error: {}", err] - ))), - }, - } - } - }; - - match wait(&self.canceller, future, timeout) { - Ok(Ok(bytes)) => Ok(bytes), - Ok(Err(e)) | Err(e) => match e { + match wait(&self.canceller, rx_chan.recv(), timeout) { + Ok(Ok(bytes)) => Ok(Some(bytes)), + Ok(Err(_)) => Ok(None), + Err(e) => match e { WaitError::FutureAborted => { gst::warning!(CAT, imp = self, "Read from stream request aborted"); - Err(None) + Ok(None) } WaitError::FutureError(e) => { gst::error!(CAT, imp = self, "Failed to read from stream: {}", e); @@ -685,8 +740,158 @@ impl QuinnQuicSrc { } } - async fn init_connection(&self) -> Result<(Connection, Option), WaitError> { - let (role, use_datagram, endpoint_config) = { + fn handle_connection_error(&self, connection_err: ConnectionError) { + match connection_err { + ConnectionError::ConnectionClosed(cc) => { + gst::info!(CAT, imp = self, "Closed connection, {cc}"); + } + ConnectionError::ApplicationClosed(ac) => { + gst::info!(CAT, imp = self, "Application closed connection, {ac}"); + } + ConnectionError::LocallyClosed => { + gst::info!(CAT, imp = self, "Connection locally closed"); + } + ConnectionError::VersionMismatch => { + gst::error!(CAT, imp = self, "Version Mismatch"); + } + ConnectionError::TransportError(terr) => { + gst::error!(CAT, imp = self, "Transport error {terr:?}"); + } + ConnectionError::Reset => { + gst::error!(CAT, imp = self, "Connection Reset"); + } + ConnectionError::TimedOut => { + gst::error!(CAT, imp = self, "Connection Timedout"); + } + ConnectionError::CidsExhausted => { + gst::error!(CAT, imp = self, "Cids Exhausted"); + } + } + } + + fn handle_data( + &self, + connection: Connection, + sender: Sender, + receiver: oneshot::Receiver<()>, + ) { + // Unifies the Future return types + enum QuinnFuture { + Datagram(Bytes), + StreamData(RecvStream, QuinnData), + Stream(RecvStream), + Stop, + } + + let blocksize = self.obj().blocksize() as usize; + gst::info!(CAT, imp = self, "Using a blocksize of {blocksize} for read",); + + let incoming_stream = |conn: Connection| async move { + match conn.accept_uni().await { + Ok(recv_stream) => QuinnFuture::Stream(recv_stream), + Err(err) => { + self.handle_connection_error(err); + QuinnFuture::Stop + } + } + }; + + let datagram = |conn: Connection| async move { + match conn.read_datagram().await { + Ok(bytes) => QuinnFuture::Datagram(bytes), + Err(err) => { + self.handle_connection_error(err); + QuinnFuture::Stop + } + } + }; + + let recv_stream = |mut s: RecvStream| async move { + let stream_id = s.id().index(); + match s.read_chunk(blocksize, true).await { + Ok(Some(chunk)) => { + QuinnFuture::StreamData(s, QuinnData::Stream(stream_id, chunk.bytes)) + } + Ok(None) => QuinnFuture::StreamData(s, QuinnData::Closed(stream_id)), + Err(err) => match err { + ReadError::ClosedStream => { + gst::debug!(CAT, "Stream closed: {stream_id}"); + QuinnFuture::StreamData(s, QuinnData::Closed(stream_id)) + } + ReadError::ConnectionLost(err) => { + gst::error!(CAT, "Connection lost: {err:?}"); + QuinnFuture::StreamData(s, QuinnData::Eos) + } + rerr => { + gst::error!(CAT, "Read error on stream {stream_id}: {rerr:?}"); + QuinnFuture::StreamData(s, QuinnData::Eos) + } + }, + } + }; + + let tx_send = |sender: Sender, data: QuinnData| async move { + if let Err(err) = sender.send(data).await { + gst::error!(CAT, imp = self, "Error sending data: {err:?}"); + } + }; + + let mut tasks: FuturesUnordered> = FuturesUnordered::new(); + + tasks.push(Box::pin(datagram(connection.clone()))); + tasks.push(Box::pin(incoming_stream(connection.clone()))); + // We only ever expect to receive on this channel once, so we + // need not push this in the loop below. + tasks.push(Box::pin(async { + let _ = receiver.await; + gst::debug!(CAT, imp = self, "Quitting"); + QuinnFuture::Stop + })); + + RUNTIME.block_on(async { + while let Some(stream) = tasks.next().await { + match stream { + QuinnFuture::Stop => { + tx_send(sender.clone(), QuinnData::Eos).await; + break; + } + QuinnFuture::StreamData(s, data) => match data { + d @ QuinnData::Stream(stream_id, _) => { + gst::trace!(CAT, imp = self, "Sending data for stream: {stream_id}"); + tx_send(sender.clone(), d).await; + tasks.push(Box::pin(recv_stream(s))); + } + eos @ QuinnData::Eos => { + tx_send(sender.clone(), eos).await; + drop(s); + break; + } + c @ QuinnData::Closed(stream_id) => { + gst::trace!(CAT, imp = self, "Stream closed: {stream_id}"); + tx_send(sender.clone(), c).await; + drop(s); + } + QuinnData::Datagram(_) => unreachable!(), + }, + QuinnFuture::Stream(s) => { + gst::trace!(CAT, imp = self, "Incoming stream connection {:?}", s.id()); + tasks.push(Box::pin(recv_stream(s))); + tasks.push(Box::pin(incoming_stream(connection.clone()))); + } + QuinnFuture::Datagram(b) => { + gst::trace!(CAT, imp = self, "Received {} bytes on datagram", b.len()); + tx_send(sender.clone(), QuinnData::Datagram(b)).await; + tasks.push(Box::pin(datagram(connection.clone()))); + } + } + } + }); + + gst::info!(CAT, imp = self, "Quit data handler thread"); + } + + async fn init_connection(&self) -> Result { + let (role, endpoint_config) = { let settings = self.settings.lock().unwrap(); let client_addr = make_socket_addr( @@ -699,7 +904,6 @@ impl QuinnQuicSrc { let server_name = settings.server_name.clone(); let alpns = settings.alpns.clone(); let role = settings.role; - let use_datagram = settings.use_datagram; let keep_alive_interval = settings.keep_alive_interval; let secure_conn = settings.secure_conn; let certificate_file = settings.certificate_file.clone(); @@ -708,7 +912,6 @@ impl QuinnQuicSrc { ( role, - use_datagram, QuinnQuicEndpointConfig { server_addr, server_name, @@ -761,35 +964,6 @@ impl QuinnQuicSrc { })?, }; - let stream = if !use_datagram { - let res = connection.accept_uni().await.map_err(|err| { - WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Failed to open stream: {}", err] - )) - })?; - - Some(res) - } else { - match connection.max_datagram_size() { - Some(datagram_size) => { - gst::info!( - CAT, - imp = self, - "Datagram size reported by peer: {datagram_size}" - ); - } - None => { - return Err(WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Datagram unsupported by the peer"] - ))); - } - } - - None - }; - gst::info!( CAT, imp = self, @@ -797,6 +971,6 @@ impl QuinnQuicSrc { connection.remote_address() ); - Ok((connection, stream)) + Ok(connection) } } diff --git a/net/quinn/src/quinnquicsrc/mod.rs b/net/quinn/src/quinnquicsrc/mod.rs index 1e89c61e..321dd101 100644 --- a/net/quinn/src/quinnquicsrc/mod.rs +++ b/net/quinn/src/quinnquicsrc/mod.rs @@ -26,7 +26,7 @@ use gst::prelude::*; mod imp; glib::wrapper! { - pub struct QuinnQuicSrc(ObjectSubclass) @extends gst_base::BaseSrc, gst::Element, gst::Object; + pub struct QuinnQuicSrc(ObjectSubclass) @extends gst_base::PushSrc, gst_base::BaseSrc, gst::Element, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { diff --git a/net/quinn/src/utils.rs b/net/quinn/src/utils.rs index 54873b87..3722c776 100644 --- a/net/quinn/src/utils.rs +++ b/net/quinn/src/utils.rs @@ -398,6 +398,8 @@ fn configure_server( transport_config .max_concurrent_uni_streams(ep_config.transport_config.max_concurrent_uni_streams); transport_config.mtu_discovery_config(Some(mtu_config)); + transport_config.receive_window(ep_config.transport_config.receive_window); + transport_config.stream_receive_window(ep_config.transport_config.stream_receive_window); transport_config };