From babb6f360be2fba37d56277762500729268b7b5d Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Mon, 10 Jun 2024 20:21:24 +0530 Subject: [PATCH] net/quinn: Support stream multiplexing in quinnquicsink Part-of: --- net/quinn/src/common.rs | 6 + net/quinn/src/quinnquicsink/imp.rs | 495 +++++++++++++++++++++++------ net/quinn/src/utils.rs | 7 +- 3 files changed, 401 insertions(+), 107 deletions(-) diff --git a/net/quinn/src/common.rs b/net/quinn/src/common.rs index e2137997..5ffc1f40 100644 --- a/net/quinn/src/common.rs +++ b/net/quinn/src/common.rs @@ -7,6 +7,7 @@ // // SPDX-License-Identifier: MPL-2.0 use gst::glib; +use quinn::VarInt; pub(crate) static DEFAULT_SERVER_NAME: &str = "localhost"; pub(crate) static DEFAULT_ADDR: &str = "127.0.0.1"; @@ -21,6 +22,7 @@ pub(crate) static DEFAULT_UDP_PAYLOAD_SIZE: u16 = 1452; pub(crate) static DEFAULT_MIN_UDP_PAYLOAD_SIZE: u16 = 1200; pub(crate) static DEFAULT_MAX_UDP_PAYLOAD_SIZE: u16 = 65527; pub(crate) static DEFAULT_DROP_BUFFER_FOR_DATAGRAM: bool = false; +pub(crate) static DEFAULT_MAX_CONCURRENT_UNI_STREAMS: VarInt = VarInt::from_u32(32); /* * For QUIC transport parameters @@ -53,6 +55,8 @@ pub struct QuinnQuicTransportConfig { pub max_udp_payload_size: u16, pub min_mtu: u16, pub upper_bound_mtu: u16, + pub max_concurrent_uni_streams: VarInt, + pub send_window: u64, } impl Default for QuinnQuicTransportConfig { @@ -71,6 +75,8 @@ impl Default for QuinnQuicTransportConfig { max_udp_payload_size: DEFAULT_MAX_UDP_PAYLOAD_SIZE, min_mtu: DEFAULT_MINIMUM_MTU, upper_bound_mtu: DEFAULT_UPPER_BOUND_MTU, + max_concurrent_uni_streams: DEFAULT_MAX_CONCURRENT_UNI_STREAMS, + send_window: (8 * STREAM_RWND).into(), } } } diff --git a/net/quinn/src/quinnquicsink/imp.rs b/net/quinn/src/quinnquicsink/imp.rs index 39ec6c54..627ffe3a 100644 --- a/net/quinn/src/quinnquicsink/imp.rs +++ b/net/quinn/src/quinnquicsink/imp.rs @@ -7,16 +7,22 @@ // // SPDX-License-Identifier: MPL-2.0 +use crate::common::*; +use crate::quinnquicmeta::*; +use crate::quinnquicquery::*; use crate::utils::{ client_endpoint, get_stats, make_socket_addr, server_endpoint, wait, QuinnQuicEndpointConfig, - WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, + WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, RUNTIME, }; use crate::{common::*, utils}; use bytes::Bytes; use futures::future; use gst::{glib, prelude::*, subclass::prelude::*}; use gst_base::subclass::prelude::*; -use quinn::{Connection, SendStream, TransportConfig}; +use quinn::{ + Connection, SendDatagramError, SendStream, StreamId, TransportConfig, VarInt, WriteError, +}; +use std::collections::HashMap; use std::path::PathBuf; use std::sync::LazyLock; use std::sync::Mutex; @@ -34,6 +40,7 @@ static CAT: LazyLock = LazyLock::new(|| { struct Started { connection: Connection, stream: Option, + stream_map: HashMap, } #[derive(Default)] @@ -276,6 +283,17 @@ impl ObjectImpl for QuinnQuicSink { .blurb("Drop buffers when using datagram if buffer size > max datagram size") .default_value(DEFAULT_DROP_BUFFER_FOR_DATAGRAM) .build(), + glib::ParamSpecUInt64::builder("max-concurrent-uni-streams") + .nick("Maximum concurrent uni-directional streams") + .blurb("Maximum number of incoming unidirectional streams that may be open concurrently") + .default_value(DEFAULT_MAX_CONCURRENT_UNI_STREAMS.into()) + .readwrite() + .build(), + glib::ParamSpecUInt64::builder("send-window") + .nick("Send Window") + .blurb("Maximum number of bytes to transmit to a peer without acknowledgment") + .readwrite() + .build(), ] }); @@ -366,6 +384,15 @@ impl ObjectImpl for QuinnQuicSink { "drop-buffer-for-datagram" => { settings.drop_buffer_for_datagram = value.get().expect("type checked upstream"); } + "max-concurrent-uni-streams" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.max_concurrent_uni_streams = + VarInt::from_u64(value.max(VarInt::MAX.into())).unwrap(); + } + "send-window" => { + settings.transport_config.send_window = + value.get::().expect("type checked upstream"); + } _ => unimplemented!(), } } @@ -425,6 +452,10 @@ impl ObjectImpl for QuinnQuicSink { } } "drop-buffer-for-datagram" => settings.drop_buffer_for_datagram.to_value(), + "max-concurrent-uni-streams" => { + u64::from(settings.transport_config.max_concurrent_uni_streams).to_value() + } + "send-window" => settings.transport_config.send_window.to_value(), _ => unimplemented!(), } } @@ -450,10 +481,11 @@ impl BaseSinkImpl for QuinnQuicSink { } match wait(&self.canceller, self.init_connection(), timeout) { - Ok(Ok((c, s))) => { + Ok(Ok(c)) => { *state = State::Started(Started { connection: c, - stream: s, + stream: None, + stream_map: HashMap::new(), }); gst::info!(CAT, imp = self, "Started"); @@ -493,35 +525,21 @@ impl BaseSinkImpl for QuinnQuicSink { if let State::Started(ref mut state) = *state { let connection = &state.connection; - let mut close_msg = CONNECTION_CLOSE_MSG.to_string(); if !use_datagram { - let send = &mut state.stream.as_mut().unwrap(); - - // Shutdown stream gracefully - // send.finish() may fail, but the error is harmless. - let _ = send.finish(); - match wait(&self.canceller, send.stopped(), timeout) { - Ok(r) => { - if let Err(e) = r { - close_msg = format!("Stream finish request error: {}", e); - gst::error!(CAT, imp = self, "{}", close_msg); - } - } - Err(e) => match e { - WaitError::FutureAborted => { - close_msg = "Stream finish request aborted".to_string(); - gst::warning!(CAT, imp = self, "{}", close_msg); - } - WaitError::FutureError(e) => { - close_msg = format!("Stream finish request future error: {}", e); - gst::error!(CAT, imp = self, "{}", close_msg); - } - }, - }; + if let Some(ref mut send) = state.stream.take() { + self.close_stream(send, timeout); + } } - connection.close(CONNECTION_CLOSE_CODE.into(), close_msg.as_bytes()); + for stream in state.stream_map.values_mut() { + self.close_stream(stream, timeout); + } + + connection.close( + CONNECTION_CLOSE_CODE.into(), + CONNECTION_CLOSE_MSG.as_bytes(), + ); } *state = State::Stopped; @@ -544,7 +562,9 @@ impl BaseSinkImpl for QuinnQuicSink { gst::FlowError::Error })?; - match self.send_buffer(&map) { + let meta = buffer.meta::(); + + match self.send_buffer(&map, meta) { Ok(_) => Ok(gst::FlowSuccess::Ok), Err(err) => match err { Some(error_message) => { @@ -560,6 +580,13 @@ impl BaseSinkImpl for QuinnQuicSink { } } + fn query(&self, query: &mut gst::QueryRef) -> bool { + match query.view_mut() { + gst::QueryViewMut::Custom(q) => self.sink_query(q), + _ => BaseSinkImplExt::parent_query(self, query), + } + } + fn unlock(&self) -> Result<(), gst::ErrorMessage> { let mut canceller = self.canceller.lock().unwrap(); canceller.abort(); @@ -573,10 +600,42 @@ impl BaseSinkImpl for QuinnQuicSink { } Ok(()) } + + fn event(&self, event: gst::Event) -> bool { + use gst::EventView; + + gst::debug!(CAT, imp = self, "Handling event {:?}", event); + + let settings = self.settings.lock().unwrap(); + let timeout = settings.timeout; + drop(settings); + + let mut state = self.state.lock().unwrap(); + if let State::Started(ref mut state) = *state { + if let EventView::CustomDownstream(ev) = event.view() { + if let Some(s) = ev.structure() { + if s.name() == QUIC_STREAM_CLOSE_CUSTOMDOWNSTREAM_EVENT { + if let Ok(stream_id) = s.get::(QUIC_STREAM_ID) { + if let Some(mut stream) = state.stream_map.remove(&stream_id) { + self.close_stream(&mut stream, timeout); + return true; + } + } + } + } + } + } + + self.parent_event(event) + } } impl QuinnQuicSink { - fn send_buffer(&self, src: &[u8]) -> Result<(), Option> { + fn send_buffer( + &self, + src: &[u8], + meta: Option>, + ) -> Result<(), Option> { let settings = self.settings.lock().unwrap(); let timeout = settings.timeout; let use_datagram = settings.use_datagram; @@ -585,11 +644,8 @@ impl QuinnQuicSink { let mut state = self.state.lock().unwrap(); - let (conn, stream) = match *state { - State::Started(Started { - ref connection, - ref mut stream, - }) => (connection, stream), + let started = match *state { + State::Started(ref mut started) => started, State::Stopped => { return Err(Some(gst::error_msg!( gst::LibraryError::Failed, @@ -597,61 +653,51 @@ impl QuinnQuicSink { ))); } }; + let connection = started.connection.clone(); - if use_datagram { - match conn.max_datagram_size() { - Some(size) => { - if src.len() > size { - if drop_buffer_for_datagram { - gst::warning!(CAT, imp = self, "Buffer dropped, current max datagram size: {size} > buffer size: {}", src.len()); - return Ok(()); - } else { - return Err(Some(gst::error_msg!( - gst::ResourceError::Failed, - ["Sending data failed, current max datagram size: {size}, buffer size: {}", src.len()] - ))); + if let Some(m) = meta { + if m.is_datagram() { + self.write_datagram(connection, src, drop_buffer_for_datagram) + } else { + let stream_id = m.stream_id(); + + if let Some(send) = started.stream_map.get_mut(&stream_id) { + gst::trace!(CAT, imp = self, "Writing buffer for stream {stream_id:?}"); + self.write_stream(send, src, timeout) + } else { + Err(Some(gst::error_msg!( + gst::ResourceError::Failed, + ["No stream for buffer with stream id {}", stream_id] + ))) + } + } + } else if use_datagram { + self.write_datagram(connection, src, drop_buffer_for_datagram) + } else { + { + if started.stream.is_none() { + match self.open_stream(connection, timeout) { + Ok(stream) => { + gst::debug!( + CAT, + imp = self, + "Opened connection, stream: {}", + stream.id() + ); + started.stream = Some(stream); } - } - - match conn.send_datagram(Bytes::copy_from_slice(src)) { - Ok(_) => Ok(()), - Err(e) => Err(Some(gst::error_msg!( - gst::ResourceError::Failed, - ["Sending data failed: {}", e] - ))), + Err(err) => return Err(Some(err)), } } - /* - * We check for datagram being unsupported by peer in - * start/init_connection, so we should never reach here. - */ - None => unreachable!(), } - } else { - let send = &mut stream.as_mut().unwrap(); - match wait(&self.canceller, send.write_all(src), timeout) { - Ok(Ok(_)) => Ok(()), - Ok(Err(e)) => Err(Some(gst::error_msg!( - gst::ResourceError::Failed, - ["Sending data failed: {}", e] - ))), - Err(e) => match e { - WaitError::FutureAborted => { - gst::warning!(CAT, imp = self, "Sending aborted"); - Ok(()) - } - WaitError::FutureError(e) => Err(Some(gst::error_msg!( - gst::ResourceError::Failed, - ["Sending data failed: {}", e] - ))), - }, - } + let send = started.stream.as_mut().expect("Stream must be valid here"); + self.write_stream(send, src, timeout) } } - async fn init_connection(&self) -> Result<(Connection, Option), WaitError> { - let (role, use_datagram, endpoint_config) = { + async fn init_connection(&self) -> Result { + let (role, endpoint_config) = { let settings = self.settings.lock().unwrap(); let client_addr = make_socket_addr( @@ -664,7 +710,6 @@ impl QuinnQuicSink { let server_name = settings.server_name.clone(); let alpns = settings.alpns.clone(); let role = settings.role; - let use_datagram = settings.use_datagram; let keep_alive_interval = settings.keep_alive_interval; let secure_conn = settings.secure_conn; let certificate_file = settings.certificate_file.clone(); @@ -673,7 +718,6 @@ impl QuinnQuicSink { ( role, - use_datagram, QuinnQuicEndpointConfig { server_addr, server_name, @@ -726,35 +770,276 @@ impl QuinnQuicSink { })?, }; - let stream = if !use_datagram { - let res = connection.open_uni().await.map_err(|err| { - WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Failed to open stream: {}", err] - )) - })?; + gst::info!( + CAT, + imp = self, + "Remote connection established: {}", + connection.remote_address() + ); - Some(res) - } else { - match connection.max_datagram_size() { - Some(datagram_size) => { - gst::info!( + Ok(connection) + } + + fn handle_open_stream_query(&self, s: &mut gst::StructureRef) -> bool { + gst::debug!(CAT, imp = self, "Handling open stream query: {s:?}"); + + let settings = self.settings.lock().unwrap(); + let timeout = settings.timeout; + drop(settings); + + let mut state = self.state.lock().unwrap(); + if let State::Started(ref mut state) = *state { + let connection = state.connection.clone(); + + gst::debug!( + CAT, + imp = self, + "Attempting to open connection for stream query: {s:?}" + ); + + match self.open_stream(connection, timeout) { + Ok(stream) => { + let index = stream.id().index(); + + if let Ok(priority) = s.get::(QUIC_STREAM_PRIORITY) { + // Default value of priority for Stream is already 0. + if priority != 0 { + let _ = stream.set_priority(priority); + } + } + + gst::debug!( CAT, imp = self, - "Datagram size reported by peer: {datagram_size}" + "Opened connection for stream query: {s:?}, stream: {}, priority: {:?}", + stream.id(), + stream.priority() ); + + state.stream_map.insert(index, stream); + s.set_value(QUIC_STREAM_ID, index.to_send_value()); + + return true; } - None => { - return Err(WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Datagram unsupported by the peer"] - ))); + Err(err) => { + gst::error!( + CAT, + imp = self, + "Failed to handle open stream query, {err:?}" + ); + return false; } } + } - None - }; + false + } - Ok((connection, stream)) + fn handle_datagram_query(&self, s: &mut gst::StructureRef) -> bool { + gst::debug!(CAT, imp = self, "Handling datagram query: {s:?}"); + + let state = self.state.lock().unwrap(); + if let State::Started(ref state) = *state { + if state.connection.max_datagram_size().is_some() { + return true; + } + + gst::warning!(CAT, imp = self, "Datagram unsupported by peer"); + } + + false + } + + fn open_stream( + &self, + connection: Connection, + timeout: u32, + ) -> Result { + match wait(&self.canceller, connection.open_uni(), timeout) { + Ok(Ok(stream)) => { + gst::debug!( + CAT, + imp = self, + "Opened connection, stream: {}", + stream.id() + ); + Ok(stream) + } + Ok(Err(err)) => { + gst::error!(CAT, imp = self, "Failed to open connection {err}"); + Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to open connection, {err}"] + )) + } + Err(err) => { + gst::error!(CAT, imp = self, "Failed to open connection {err}"); + Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to open connection, {err}"] + )) + } + } + } + + fn sink_query(&self, query: &mut gst::QueryRef) -> bool { + gst::debug!(CAT, imp = self, "Handling sink query: {query:?}"); + + let s = query.structure_mut(); + + match s.name().as_str() { + QUIC_DATAGRAM_PROBE => self.handle_datagram_query(s), + QUIC_STREAM_OPEN => self.handle_open_stream_query(s), + _ => false, + } + } + + fn write_datagram( + &self, + conn: Connection, + src: &[u8], + drop_buffer_for_datagram: bool, + ) -> Result<(), Option> { + match conn.max_datagram_size() { + Some(size) => { + if src.len() > size { + if drop_buffer_for_datagram { + gst::warning!( + CAT, + imp = self, + "Buffer dropped, current max datagram size: {size} > buffer size: {}", + src.len() + ); + return Ok(()); + } else { + return Err(Some(gst::error_msg!( + gst::ResourceError::Failed, + ["Sending data failed, current max datagram size: {size}, buffer size: {}", src.len()] + ))); + } + } + + match conn.send_datagram(Bytes::copy_from_slice(src)) { + Ok(_) => Ok(()), + Err(err) => match err { + SendDatagramError::ConnectionLost(cerr) => Err(Some(gst::error_msg!( + gst::ResourceError::Failed, + ["Sending datagram failed: {}", cerr] + ))), + /* + * Sending datagram can fail due to change in + * max_datagram_size even though we checked + * just before trying to send. So check here + * again if we should drop buffers if requested + * or return an error. + */ + _ => { + if drop_buffer_for_datagram { + gst::warning!(CAT, imp = self, "Buffer dropped, error: {err:?}"); + Ok(()) + } else { + Err(Some(gst::error_msg!( + gst::ResourceError::Failed, + ["Sending datagram failed, error: {err:?}"] + ))) + } + } + }, + } + } + None => { + gst::warning!(CAT, imp = self, "Datagram unsupported by peer"); + Ok(()) + } + } + } + + fn write_stream( + &self, + stream: &mut SendStream, + src: &[u8], + timeout: u32, + ) -> Result<(), Option> { + let stream_id = stream.id().index(); + + match wait(&self.canceller, stream.write(src), timeout) { + Ok(Ok(bytes_written)) => { + gst::trace!( + CAT, + imp = self, + "Stream {stream_id} wrote {bytes_written} bytes" + ); + Ok(()) + } + Ok(Err(e)) => match e { + /* + * We do not expect Streams to be stopped or closed by + * remote peer but add a warning and drop buffers for + * now. This can be used in future to signal an error + * on the stream on peer side and then send a query + * upstream to signal multiplexer to release the pad + * and close the stream. + */ + WriteError::Stopped(code) => { + gst::warning!( + CAT, + imp = self, + "Dropping buffer, stream {stream_id} stopped: {code}" + ); + Ok(()) + } + WriteError::ClosedStream => { + gst::warning!( + CAT, + imp = self, + "Dropping buffer, stream {stream_id} closed" + ); + Ok(()) + } + _ => Err(Some(gst::error_msg!( + gst::ResourceError::Failed, + ["Sending data for stream {stream_id} failed: {e}"] + ))), + }, + Err(e) => match e { + WaitError::FutureAborted => { + gst::warning!(CAT, imp = self, "Sending aborted"); + Ok(()) + } + WaitError::FutureError(e) => Err(Some(gst::error_msg!( + gst::ResourceError::Failed, + ["Sending for stream {stream_id} failed: {e}"] + ))), + }, + } + } + + fn close_stream(&self, stream: &mut SendStream, timeout: u32) { + /* + * Shutdown stream gracefully + * send.finish() may fail, but the error is harmless. + */ + let _ = stream.finish(); + + match wait(&self.canceller, stream.stopped(), timeout) { + Ok(r) => { + if let Err(e) = r { + let err_msg = format!("Stream finish request error: {}", e); + gst::error!(CAT, imp = self, "{}", err_msg); + } else { + gst::info!(CAT, imp = self, "Stream {} finished", stream.id()); + } + } + Err(e) => match e { + WaitError::FutureAborted => { + let err_msg = "Stream finish request aborted".to_string(); + gst::warning!(CAT, imp = self, "{}", err_msg); + } + WaitError::FutureError(e) => { + let err_msg = format!("Stream finish request future error: {}", e); + gst::error!(CAT, imp = self, "{}", err_msg); + } + }, + } } } diff --git a/net/quinn/src/utils.rs b/net/quinn/src/utils.rs index 40588693..54873b87 100644 --- a/net/quinn/src/utils.rs +++ b/net/quinn/src/utils.rs @@ -258,8 +258,10 @@ fn configure_client(ep_config: &QuinnQuicEndpointConfig) -> Result