diff --git a/Cargo.lock b/Cargo.lock index a09a4618..2daf9800 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2844,6 +2844,7 @@ dependencies = [ name = "gst-plugin-quinn" version = "0.14.0-alpha.1" dependencies = [ + "async-channel", "bytes", "futures", "glib", diff --git a/net/quinn/Cargo.toml b/net/quinn/Cargo.toml index 7a03d218..5aac314c 100644 --- a/net/quinn/Cargo.toml +++ b/net/quinn/Cargo.toml @@ -13,8 +13,8 @@ rust-version.workspace = true [dependencies] glib.workspace = true gst.workspace = true -gst-base.workspace = true -tokio = { version = "1.36.0", default-features = false, features = ["time", "rt-multi-thread"] } +gst-base = { workspace = true, features = ["v1_22"] } +tokio = { version = "1.36.0", default-features = false, features = ["time", "rt-multi-thread", "macros"] } futures = "0.3.30" quinn = { version = "0.11.5", default-features = false, features = ["ring", "rustls", "runtime-tokio"] } quinn-proto ={ version = "0.11.8", default-features = false, features = ["rustls"] } @@ -24,6 +24,7 @@ rustls-pki-types = "1" rcgen = "0.13" bytes = "1.5.0" thiserror = "2" +async-channel = "2.3" [dev-dependencies] gst-check = { workspace = true, features = ["v1_20"] } diff --git a/net/quinn/src/common.rs b/net/quinn/src/common.rs index 5ffc1f40..89bf8e9b 100644 --- a/net/quinn/src/common.rs +++ b/net/quinn/src/common.rs @@ -57,6 +57,8 @@ pub struct QuinnQuicTransportConfig { pub upper_bound_mtu: u16, pub max_concurrent_uni_streams: VarInt, pub send_window: u64, + pub stream_receive_window: VarInt, + pub receive_window: VarInt, } impl Default for QuinnQuicTransportConfig { @@ -77,6 +79,8 @@ impl Default for QuinnQuicTransportConfig { upper_bound_mtu: DEFAULT_UPPER_BOUND_MTU, max_concurrent_uni_streams: DEFAULT_MAX_CONCURRENT_UNI_STREAMS, send_window: (8 * STREAM_RWND).into(), + stream_receive_window: STREAM_RWND.into(), + receive_window: VarInt::MAX, } } } diff --git a/net/quinn/src/quinnquicsrc/imp.rs b/net/quinn/src/quinnquicsrc/imp.rs index 5aeaabaf..b2a37efb 100644 --- a/net/quinn/src/quinnquicsrc/imp.rs +++ b/net/quinn/src/quinnquicsrc/imp.rs @@ -7,23 +7,29 @@ // // SPDX-License-Identifier: MPL-2.0 +use crate::quinnquicmeta::QuinnQuicMeta; +use crate::quinnquicquery::*; use crate::utils::{ client_endpoint, get_stats, make_socket_addr, server_endpoint, wait, Canceller, - QuinnQuicEndpointConfig, WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, + QuinnQuicEndpointConfig, WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, RUNTIME, }; use crate::{common::*, utils}; +use async_channel::{unbounded, Receiver, Sender}; use bytes::Bytes; -use futures::future; +use futures::{future::BoxFuture, stream::FuturesUnordered, StreamExt}; use gst::{glib, prelude::*, subclass::prelude::*}; use gst_base::prelude::*; use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; -use quinn::{Connection, ConnectionError, ReadError, RecvStream, TransportConfig}; +use quinn::{Chunk, Connection, ConnectionError, ReadError, RecvStream, TransportConfig, VarInt}; use std::path::PathBuf; -use std::sync::LazyLock; -use std::sync::Mutex; +use std::sync::{LazyLock, Mutex}; +use std::thread::{spawn, Builder, JoinHandle}; +use tokio::sync::oneshot; const DEFAULT_ROLE: QuinnQuicRole = QuinnQuicRole::Server; +const DEFAULT_USE_DATAGRAM: bool = false; +const DATA_HANDLER_THREAD: &str = "data-handler"; static CAT: LazyLock = LazyLock::new(|| { gst::DebugCategory::new( @@ -33,9 +39,23 @@ static CAT: LazyLock = LazyLock::new(|| { ) }); +enum QuinnData { + Datagram(Bytes), + Stream(u64, Bytes), + Closed(u64), + Eos, +} + struct Started { connection: Connection, - stream: Option, + data_handler: Option>, + // TODO: Use tokio channel + // + // We use async-channel to keep a clone of the receive channel around + // for use in every `create` call. tokio's UnboundedReceiver does not + // implement clone. + data_rx: Option>, + thread_quit: Option>, } #[derive(Default)] @@ -78,7 +98,7 @@ impl Default for Settings { keep_alive_interval: 0, secure_conn: DEFAULT_SECURE_CONNECTION, caps: gst::Caps::new_any(), - use_datagram: false, + use_datagram: DEFAULT_USE_DATAGRAM, certificate_file: None, private_key_file: None, transport_config: QuinnQuicTransportConfig::default(), @@ -163,7 +183,9 @@ impl ElementImpl for QuinnQuicSrc { impl ObjectImpl for QuinnQuicSrc { fn constructed(&self) { self.parent_constructed(); - self.obj().set_format(gst::Format::Bytes); + self.obj().set_format(gst::Format::Time); + self.obj().set_do_timestamp(true); + self.obj().set_live(true); } fn properties() -> &'static [glib::ParamSpec] { @@ -237,7 +259,7 @@ impl ObjectImpl for QuinnQuicSrc { glib::ParamSpecBoolean::builder("use-datagram") .nick("Use datagram") .blurb("Use datagram for lower latency, unreliable messaging") - .default_value(false) + .default_value(DEFAULT_USE_DATAGRAM) .build(), glib::ParamSpecUInt::builder("initial-mtu") .nick("Initial MTU") @@ -277,7 +299,25 @@ impl ObjectImpl for QuinnQuicSrc { .nick("Connection statistics") .blurb("Connection statistics") .read_only() - .build() + .build(), + glib::ParamSpecUInt64::builder("max-concurrent-uni-streams") + .nick("Maximum concurrent uni-directional streams") + .blurb("Maximum number of incoming unidirectional streams that may be open concurrently") + .default_value(DEFAULT_MAX_CONCURRENT_UNI_STREAMS.into()) + .readwrite() + .build(), + glib::ParamSpecUInt64::builder("receive-window") + .nick("Receive Window") + .blurb("Maximum number of bytes the peer may transmit across all streams of a connection before becoming blocked") + .maximum(VarInt::MAX.into()) + .readwrite() + .build(), + glib::ParamSpecUInt64::builder("stream-receive-window") + .nick("Stream Receive Window") + .blurb("Maximum number of bytes the peer may transmit without ACK on any one stream before becoming blocked") + .maximum(VarInt::MAX.into()) + .readwrite() + .build(), ] }); @@ -374,6 +414,21 @@ impl ObjectImpl for QuinnQuicSrc { let value = value.get::().expect("type checked upstream"); settings.transport_config.datagram_send_buffer_size = value as usize; } + "max-concurrent-uni-streams" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.max_concurrent_uni_streams = + VarInt::from_u64(value.max(VarInt::MAX.into())).unwrap(); + } + "receive-window" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.receive_window = + VarInt::from_u64(value.max(VarInt::MAX.into())).unwrap(); + } + "stream-receive-window" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.stream_receive_window = + VarInt::from_u64(value.max(VarInt::MAX.into())).unwrap(); + } _ => unimplemented!(), } } @@ -433,6 +488,13 @@ impl ObjectImpl for QuinnQuicSrc { State::Stopped => get_stats(None).to_value(), } } + "max-concurrent-uni-streams" => { + u64::from(settings.transport_config.max_concurrent_uni_streams).to_value() + } + "receive-window" => u64::from(settings.transport_config.receive_window).to_value(), + "stream-receive-window" => { + u64::from(settings.transport_config.stream_receive_window).to_value() + } _ => unimplemented!(), } } @@ -442,31 +504,47 @@ impl ObjectImpl for QuinnQuicSrc { impl ObjectSubclass for QuinnQuicSrc { const NAME: &'static str = "GstQuinnQuicSrc"; type Type = super::QuinnQuicSrc; - type ParentType = gst_base::BaseSrc; + type ParentType = gst_base::PushSrc; } impl BaseSrcImpl for QuinnQuicSrc { - fn is_seekable(&self) -> bool { - false - } - fn start(&self) -> Result<(), gst::ErrorMessage> { let settings = self.settings.lock().unwrap(); let timeout = settings.timeout; drop(settings); - let mut state = self.state.lock().unwrap(); - + let state = self.state.lock().unwrap(); if let State::Started { .. } = *state { unreachable!("QuicSrc already started"); } + drop(state); match wait(&self.canceller, self.init_connection(), timeout) { - Ok(Ok((c, s))) => { + Ok(Ok(conn)) => { + let connection = conn.clone(); + + let (tx_quit, rx_quit): (oneshot::Sender<()>, oneshot::Receiver<()>) = + oneshot::channel(); + let (data_tx, data_rx): (Sender, Receiver) = unbounded(); + + let conn_clone = conn.clone(); + let self_ = self.ref_counted(); + let data_handler = Builder::new() + .name(DATA_HANDLER_THREAD.to_string()) + .spawn(move || { + self_.handle_data(conn_clone, data_tx, rx_quit); + gst::debug!(CAT, imp = self_, "Data handler thread exit"); + }) + .unwrap(); + + let mut state = self.state.lock().unwrap(); *state = State::Started(Started { - connection: c, - stream: s, + connection, + data_handler: Some(data_handler), + data_rx: Some(data_rx), + thread_quit: Some(tx_quit), }); + drop(state); gst::info!(CAT, imp = self, "Started"); @@ -489,12 +567,27 @@ impl BaseSrcImpl for QuinnQuicSrc { } fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::info!(CAT, imp = self, "Stopping"); + let mut state = self.state.lock().unwrap(); if let State::Started(ref mut state) = *state { - let connection = &state.connection; + if let Some(channel) = state.thread_quit.take() { + gst::debug!(CAT, imp = self, "Signalling threads to exit"); + let _ = channel.send(()); + } - connection.close( + gst::debug!(CAT, imp = self, "Joining data handler thread"); + if let Some(handle) = state.data_handler.take() { + match handle.join() { + Ok(_) => gst::debug!(CAT, imp = self, "Joined data handler thread"), + Err(e) => { + gst::error!(CAT, imp = self, "Failed to join data handler thread: {e:?}") + } + } + } + + state.connection.close( CONNECTION_CLOSE_CODE.into(), CONNECTION_CLOSE_MSG.as_bytes(), ); @@ -502,56 +595,11 @@ impl BaseSrcImpl for QuinnQuicSrc { *state = State::Stopped; + gst::info!(CAT, imp = self, "Stopped"); + Ok(()) } - fn query(&self, query: &mut gst::QueryRef) -> bool { - if let gst::QueryViewMut::Scheduling(q) = query.view_mut() { - q.set( - gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED, - 1, - -1, - 0, - ); - q.add_scheduling_modes(&[gst::PadMode::Pull, gst::PadMode::Push]); - return true; - } - - BaseSrcImplExt::parent_query(self, query) - } - - fn create( - &self, - offset: u64, - buffer: Option<&mut gst::BufferRef>, - length: u32, - ) -> Result { - let data = self.get(offset, u64::from(length)); - - match data { - Ok(bytes) => { - if bytes.is_empty() { - gst::debug!(CAT, imp = self, "End of stream"); - return Err(gst::FlowError::Eos); - } - - if let Some(buffer) = buffer { - if let Err(copied_bytes) = buffer.copy_from_slice(0, bytes.as_ref()) { - buffer.set_size(copied_bytes); - } - Ok(CreateSuccess::FilledBuffer) - } else { - Ok(CreateSuccess::NewBuffer(gst::Buffer::from_slice(bytes))) - } - } - Err(None) => Err(gst::FlowError::Flushing), - Err(Some(err)) => { - gst::error!(CAT, imp = self, "Could not GET: {}", err); - Err(gst::FlowError::Error) - } - } - } - fn unlock(&self) -> Result<(), gst::ErrorMessage> { let mut canceller = self.canceller.lock().unwrap(); canceller.abort(); @@ -588,22 +636,83 @@ impl BaseSrcImpl for QuinnQuicSrc { Some(tmp_caps) } + + fn is_seekable(&self) -> bool { + false + } +} + +impl PushSrcImpl for QuinnQuicSrc { + fn create( + &self, + _buffer: Option<&mut gst::BufferRef>, + ) -> Result { + loop { + // We do not want `create` to return when a stream is closed, + // but, wait for one of the other streams to receive data. + match self.get() { + Ok(Some(QuinnData::Stream(stream_id, bytes))) => { + break Ok(self.create_buffer(bytes, Some(stream_id))); + } + Ok(Some(QuinnData::Datagram(bytes))) => { + break Ok(self.create_buffer(bytes, None)); + } + Ok(Some(QuinnData::Eos)) => { + gst::debug!(CAT, imp = self, "End of stream"); + break Err(gst::FlowError::Eos); + } + Ok(None) => { + gst::debug!(CAT, imp = self, "End of stream"); + break Err(gst::FlowError::Eos); + } + Err(None) => { + gst::debug!(CAT, imp = self, "Flushing"); + break Err(gst::FlowError::Flushing); + } + Err(Some(err)) => { + gst::error!(CAT, imp = self, "Could not GET: {}", err); + break Err(gst::FlowError::Error); + } + Ok(Some(QuinnData::Closed(stream_id))) => { + // Send custom downstream event for demuxer to close + // and remove the stream. + let srcpad = self.obj().static_pad("src").expect("source pad expected"); + close_stream(&srcpad, stream_id); + } + } + } + } } impl QuinnQuicSrc { - fn get(&self, _offset: u64, length: u64) -> Result> { + fn create_buffer(&self, bytes: Bytes, stream_id: Option) -> CreateSuccess { + gst::trace!( + CAT, + imp = self, + "Pushing buffer of {} bytes for stream: {stream_id:?}", + bytes.len() + ); + + let mut buffer = gst::Buffer::from_slice(bytes); + { + let buffer = buffer.get_mut().unwrap(); + match stream_id { + Some(id) => QuinnQuicMeta::add(buffer, id, false), + None => QuinnQuicMeta::add(buffer, 0, true), + }; + } + + CreateSuccess::NewBuffer(buffer.to_owned()) + } + + fn get(&self) -> Result, Option> { let settings = self.settings.lock().unwrap(); let timeout = settings.timeout; - let use_datagram = settings.use_datagram; drop(settings); - let mut state = self.state.lock().unwrap(); - - let (conn, stream) = match *state { - State::Started(Started { - ref connection, - ref mut stream, - }) => (connection, stream), + let state = self.state.lock().unwrap(); + let rx_chan = match *state { + State::Started(ref started) => started.data_rx.clone(), State::Stopped => { return Err(Some(gst::error_msg!( gst::LibraryError::Failed, @@ -611,71 +720,17 @@ impl QuinnQuicSrc { ))); } }; + drop(state); - let future = async { - if use_datagram { - match conn.read_datagram().await { - Ok(bytes) => Ok(bytes), - Err(err) => match err { - ConnectionError::ApplicationClosed(ac) => { - gst::info!(CAT, imp = self, "Application closed connection, {}", ac); - Ok(Bytes::new()) - } - ConnectionError::ConnectionClosed(cc) => { - gst::info!(CAT, imp = self, "Transport closed connection, {}", cc); - Ok(Bytes::new()) - } - _ => Err(WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Datagram read error: {}", err] - ))), - }, - } - } else { - let recv = stream.as_mut().unwrap(); + let rx_chan = rx_chan.expect("Channel must be valid here"); - match recv.read_chunk(length as usize, true).await { - Ok(Some(chunk)) => Ok(chunk.bytes), - Ok(None) => Ok(Bytes::new()), - Err(err) => match err { - ReadError::ConnectionLost(conn_err) => match conn_err { - ConnectionError::ConnectionClosed(cc) => { - gst::info!(CAT, imp = self, "Transport closed connection, {}", cc); - Ok(Bytes::new()) - } - ConnectionError::ApplicationClosed(ac) => { - gst::info!( - CAT, - imp = self, - "Application closed connection, {}", - ac - ); - Ok(Bytes::new()) - } - _ => Err(WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Stream read error: {}", conn_err] - ))), - }, - ReadError::ClosedStream => { - gst::info!(CAT, imp = self, "Stream closed"); - Ok(Bytes::new()) - } - _ => Err(WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Stream read error: {}", err] - ))), - }, - } - } - }; - - match wait(&self.canceller, future, timeout) { - Ok(Ok(bytes)) => Ok(bytes), - Ok(Err(e)) | Err(e) => match e { + match wait(&self.canceller, rx_chan.recv(), timeout) { + Ok(Ok(bytes)) => Ok(Some(bytes)), + Ok(Err(_)) => Ok(None), + Err(e) => match e { WaitError::FutureAborted => { gst::warning!(CAT, imp = self, "Read from stream request aborted"); - Err(None) + Ok(None) } WaitError::FutureError(e) => { gst::error!(CAT, imp = self, "Failed to read from stream: {}", e); @@ -685,8 +740,158 @@ impl QuinnQuicSrc { } } - async fn init_connection(&self) -> Result<(Connection, Option), WaitError> { - let (role, use_datagram, endpoint_config) = { + fn handle_connection_error(&self, connection_err: ConnectionError) { + match connection_err { + ConnectionError::ConnectionClosed(cc) => { + gst::info!(CAT, imp = self, "Closed connection, {cc}"); + } + ConnectionError::ApplicationClosed(ac) => { + gst::info!(CAT, imp = self, "Application closed connection, {ac}"); + } + ConnectionError::LocallyClosed => { + gst::info!(CAT, imp = self, "Connection locally closed"); + } + ConnectionError::VersionMismatch => { + gst::error!(CAT, imp = self, "Version Mismatch"); + } + ConnectionError::TransportError(terr) => { + gst::error!(CAT, imp = self, "Transport error {terr:?}"); + } + ConnectionError::Reset => { + gst::error!(CAT, imp = self, "Connection Reset"); + } + ConnectionError::TimedOut => { + gst::error!(CAT, imp = self, "Connection Timedout"); + } + ConnectionError::CidsExhausted => { + gst::error!(CAT, imp = self, "Cids Exhausted"); + } + } + } + + fn handle_data( + &self, + connection: Connection, + sender: Sender, + receiver: oneshot::Receiver<()>, + ) { + // Unifies the Future return types + enum QuinnFuture { + Datagram(Bytes), + StreamData(RecvStream, QuinnData), + Stream(RecvStream), + Stop, + } + + let blocksize = self.obj().blocksize() as usize; + gst::info!(CAT, imp = self, "Using a blocksize of {blocksize} for read",); + + let incoming_stream = |conn: Connection| async move { + match conn.accept_uni().await { + Ok(recv_stream) => QuinnFuture::Stream(recv_stream), + Err(err) => { + self.handle_connection_error(err); + QuinnFuture::Stop + } + } + }; + + let datagram = |conn: Connection| async move { + match conn.read_datagram().await { + Ok(bytes) => QuinnFuture::Datagram(bytes), + Err(err) => { + self.handle_connection_error(err); + QuinnFuture::Stop + } + } + }; + + let recv_stream = |mut s: RecvStream| async move { + let stream_id = s.id().index(); + match s.read_chunk(blocksize, true).await { + Ok(Some(chunk)) => { + QuinnFuture::StreamData(s, QuinnData::Stream(stream_id, chunk.bytes)) + } + Ok(None) => QuinnFuture::StreamData(s, QuinnData::Closed(stream_id)), + Err(err) => match err { + ReadError::ClosedStream => { + gst::debug!(CAT, "Stream closed: {stream_id}"); + QuinnFuture::StreamData(s, QuinnData::Closed(stream_id)) + } + ReadError::ConnectionLost(err) => { + gst::error!(CAT, "Connection lost: {err:?}"); + QuinnFuture::StreamData(s, QuinnData::Eos) + } + rerr => { + gst::error!(CAT, "Read error on stream {stream_id}: {rerr:?}"); + QuinnFuture::StreamData(s, QuinnData::Eos) + } + }, + } + }; + + let tx_send = |sender: Sender, data: QuinnData| async move { + if let Err(err) = sender.send(data).await { + gst::error!(CAT, imp = self, "Error sending data: {err:?}"); + } + }; + + let mut tasks: FuturesUnordered> = FuturesUnordered::new(); + + tasks.push(Box::pin(datagram(connection.clone()))); + tasks.push(Box::pin(incoming_stream(connection.clone()))); + // We only ever expect to receive on this channel once, so we + // need not push this in the loop below. + tasks.push(Box::pin(async { + let _ = receiver.await; + gst::debug!(CAT, imp = self, "Quitting"); + QuinnFuture::Stop + })); + + RUNTIME.block_on(async { + while let Some(stream) = tasks.next().await { + match stream { + QuinnFuture::Stop => { + tx_send(sender.clone(), QuinnData::Eos).await; + break; + } + QuinnFuture::StreamData(s, data) => match data { + d @ QuinnData::Stream(stream_id, _) => { + gst::trace!(CAT, imp = self, "Sending data for stream: {stream_id}"); + tx_send(sender.clone(), d).await; + tasks.push(Box::pin(recv_stream(s))); + } + eos @ QuinnData::Eos => { + tx_send(sender.clone(), eos).await; + drop(s); + break; + } + c @ QuinnData::Closed(stream_id) => { + gst::trace!(CAT, imp = self, "Stream closed: {stream_id}"); + tx_send(sender.clone(), c).await; + drop(s); + } + QuinnData::Datagram(_) => unreachable!(), + }, + QuinnFuture::Stream(s) => { + gst::trace!(CAT, imp = self, "Incoming stream connection {:?}", s.id()); + tasks.push(Box::pin(recv_stream(s))); + tasks.push(Box::pin(incoming_stream(connection.clone()))); + } + QuinnFuture::Datagram(b) => { + gst::trace!(CAT, imp = self, "Received {} bytes on datagram", b.len()); + tx_send(sender.clone(), QuinnData::Datagram(b)).await; + tasks.push(Box::pin(datagram(connection.clone()))); + } + } + } + }); + + gst::info!(CAT, imp = self, "Quit data handler thread"); + } + + async fn init_connection(&self) -> Result { + let (role, endpoint_config) = { let settings = self.settings.lock().unwrap(); let client_addr = make_socket_addr( @@ -699,7 +904,6 @@ impl QuinnQuicSrc { let server_name = settings.server_name.clone(); let alpns = settings.alpns.clone(); let role = settings.role; - let use_datagram = settings.use_datagram; let keep_alive_interval = settings.keep_alive_interval; let secure_conn = settings.secure_conn; let certificate_file = settings.certificate_file.clone(); @@ -708,7 +912,6 @@ impl QuinnQuicSrc { ( role, - use_datagram, QuinnQuicEndpointConfig { server_addr, server_name, @@ -761,35 +964,6 @@ impl QuinnQuicSrc { })?, }; - let stream = if !use_datagram { - let res = connection.accept_uni().await.map_err(|err| { - WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Failed to open stream: {}", err] - )) - })?; - - Some(res) - } else { - match connection.max_datagram_size() { - Some(datagram_size) => { - gst::info!( - CAT, - imp = self, - "Datagram size reported by peer: {datagram_size}" - ); - } - None => { - return Err(WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Datagram unsupported by the peer"] - ))); - } - } - - None - }; - gst::info!( CAT, imp = self, @@ -797,6 +971,6 @@ impl QuinnQuicSrc { connection.remote_address() ); - Ok((connection, stream)) + Ok(connection) } } diff --git a/net/quinn/src/quinnquicsrc/mod.rs b/net/quinn/src/quinnquicsrc/mod.rs index 1e89c61e..321dd101 100644 --- a/net/quinn/src/quinnquicsrc/mod.rs +++ b/net/quinn/src/quinnquicsrc/mod.rs @@ -26,7 +26,7 @@ use gst::prelude::*; mod imp; glib::wrapper! { - pub struct QuinnQuicSrc(ObjectSubclass) @extends gst_base::BaseSrc, gst::Element, gst::Object; + pub struct QuinnQuicSrc(ObjectSubclass) @extends gst_base::PushSrc, gst_base::BaseSrc, gst::Element, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { diff --git a/net/quinn/src/utils.rs b/net/quinn/src/utils.rs index 54873b87..3722c776 100644 --- a/net/quinn/src/utils.rs +++ b/net/quinn/src/utils.rs @@ -398,6 +398,8 @@ fn configure_server( transport_config .max_concurrent_uni_streams(ep_config.transport_config.max_concurrent_uni_streams); transport_config.mtu_discovery_config(Some(mtu_config)); + transport_config.receive_window(ep_config.transport_config.receive_window); + transport_config.stream_receive_window(ep_config.transport_config.stream_receive_window); transport_config };