diff --git a/net/quinn/src/quinnwtserversink/imp.rs b/net/quinn/src/quinnwtserversink/imp.rs index 89c9be0cd..0c36ea986 100644 --- a/net/quinn/src/quinnwtserversink/imp.rs +++ b/net/quinn/src/quinnwtserversink/imp.rs @@ -11,6 +11,8 @@ // // SPDX-License-Identifier: MPL-2.0 +use crate::quinnquicmeta::*; +use crate::quinnquicquery::*; use crate::utils::{ client_endpoint, get_stats, make_socket_addr, server_endpoint, wait, QuinnQuicEndpointConfig, WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, @@ -21,6 +23,7 @@ use futures::future; use gst::{glib, prelude::*, subclass::prelude::*}; use gst_base::subclass::prelude::*; use quinn::{Connection, TransportConfig}; +use std::collections::HashMap; use std::path::PathBuf; use std::sync::{LazyLock, Mutex}; use web_transport_quinn::{Request, SendStream, Session}; @@ -36,6 +39,8 @@ static CAT: LazyLock = LazyLock::new(|| { struct Started { session: Session, stream: Option, + stream_map: HashMap, + stream_idx: u64, } #[derive(Default)] @@ -73,7 +78,7 @@ impl Default for Settings { port: DEFAULT_PORT, server_name: DEFAULT_SERVER_NAME.to_string(), timeout: 0, - use_datagram: false, + use_datagram: DEFAULT_USE_DATAGRAM, certificate_file: None, private_key_file: None, secure_conn: DEFAULT_SECURE_CONNECTION, @@ -383,8 +388,13 @@ impl BaseSinkImpl for QuinnWebTransportServerSink { } match wait(&self.canceller, self.init_connection(), timeout) { - Ok(Ok((session, stream))) => { - *state = State::Started(Started { session, stream }); + Ok(Ok(session)) => { + *state = State::Started(Started { + session, + stream: None, + stream_map: HashMap::new(), + stream_idx: 0, + }); gst::info!(CAT, imp = self, "Started"); Ok(()) } @@ -420,36 +430,19 @@ impl BaseSinkImpl for QuinnWebTransportServerSink { let mut state = self.state.lock().unwrap(); if let State::Started(ref mut state) = *state { - let session = &state.session; - let mut close_msg = CONNECTION_CLOSE_MSG.to_string(); - if !use_datagram { - let send = &mut state.stream.as_mut().unwrap(); - - // Shutdown stream gracefully - // send.finish() may fail, but the error is harmless. - let _ = send.finish(); - match wait(&self.canceller, send.stopped(), timeout) { - Ok(r) => { - if let Err(e) = r { - close_msg = format!("Stream finish request error: {}", e); - gst::error!(CAT, imp = self, "{}", close_msg); - } - } - Err(e) => match e { - WaitError::FutureAborted => { - close_msg = "Stream finish request aborted".to_string(); - gst::warning!(CAT, imp = self, "{}", close_msg); - } - WaitError::FutureError(e) => { - close_msg = format!("Stream finish request future error: {}", e); - gst::error!(CAT, imp = self, "{}", close_msg); - } - }, - }; + if let Some(ref mut send) = state.stream.take() { + self.close_stream(send, timeout); + } } - session.close(CONNECTION_CLOSE_CODE, close_msg.as_bytes()); + for stream in state.stream_map.values_mut() { + self.close_stream(stream, timeout); + } + + state + .session + .close(CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG.as_bytes()); } *state = State::Stopped; @@ -472,7 +465,9 @@ impl BaseSinkImpl for QuinnWebTransportServerSink { gst::FlowError::Error })?; - match self.send_buffer(&map) { + let meta = buffer.meta::(); + + match self.send_buffer(&map, meta) { Ok(_) => Ok(gst::FlowSuccess::Ok), Err(err) => match err { Some(error_message) => { @@ -488,6 +483,13 @@ impl BaseSinkImpl for QuinnWebTransportServerSink { } } + fn query(&self, query: &mut gst::QueryRef) -> bool { + match query.view_mut() { + gst::QueryViewMut::Custom(q) => self.sink_query(q), + _ => BaseSinkImplExt::parent_query(self, query), + } + } + fn unlock(&self) -> Result<(), gst::ErrorMessage> { let mut canceller = self.canceller.lock().unwrap(); canceller.abort(); @@ -501,10 +503,42 @@ impl BaseSinkImpl for QuinnWebTransportServerSink { } Ok(()) } + + fn event(&self, event: gst::Event) -> bool { + use gst::EventView; + + gst::debug!(CAT, imp = self, "Handling event {:?}", event); + + let settings = self.settings.lock().unwrap(); + let timeout = settings.timeout; + drop(settings); + + let mut state = self.state.lock().unwrap(); + if let State::Started(ref mut state) = *state { + if let EventView::CustomDownstream(ev) = event.view() { + if let Some(s) = ev.structure() { + if s.name() == QUIC_STREAM_CLOSE_CUSTOMDOWNSTREAM_EVENT { + if let Ok(stream_id) = s.get::(QUIC_STREAM_ID) { + if let Some(mut stream) = state.stream_map.remove(&stream_id) { + self.close_stream(&mut stream, timeout); + return true; + } + } + } + } + } + } + + self.parent_event(event) + } } impl QuinnWebTransportServerSink { - fn send_buffer(&self, src: &[u8]) -> Result<(), Option> { + fn send_buffer( + &self, + src: &[u8], + meta: Option>, + ) -> Result<(), Option> { let settings = self.settings.lock().unwrap(); let timeout = settings.timeout; let use_datagram = settings.use_datagram; @@ -513,11 +547,8 @@ impl QuinnWebTransportServerSink { let mut state = self.state.lock().unwrap(); - let (session, stream) = match *state { - State::Started(Started { - ref session, - ref mut stream, - }) => (session, stream), + let started = match *state { + State::Started(ref mut started) => started, State::Stopped => { return Err(Some(gst::error_msg!( gst::LibraryError::Failed, @@ -525,79 +556,64 @@ impl QuinnWebTransportServerSink { ))); } }; + let session = started.session.clone(); - if use_datagram { - let size = session.max_datagram_size(); - if src.len() > size { - if drop_buffer_for_datagram { - gst::warning!( - CAT, - imp = self, - "Buffer dropped, current max datagram size: {size} > buffer size: {}", - src.len() - ); - return Ok(()); + if let Some(m) = meta { + if m.is_datagram() { + self.write_datagram(session, src, drop_buffer_for_datagram) + } else { + let stream_id = m.stream_id(); + + if let Some(send) = started.stream_map.get_mut(&stream_id) { + gst::trace!(CAT, imp = self, "Writing buffer for stream {stream_id:?}"); + self.write_stream(send, src, timeout) } else { - return Err(Some(gst::error_msg!( - gst::ResourceError::Failed, - ["Sending data failed, current max datagram size: {size}, buffer size: {}", src.len()] - ))); + Err(Some(gst::error_msg!( + gst::ResourceError::Failed, + ["No stream for buffer with stream id {}", stream_id] + ))) + } + } + } else if use_datagram { + self.write_datagram(session, src, drop_buffer_for_datagram) + } else { + { + if started.stream.is_none() { + match self.open_stream(session, timeout) { + Ok(stream) => { + gst::debug!(CAT, imp = self, "Opened stream: {:?}", stream); + started.stream = Some(stream); + } + Err(err) => return Err(Some(err)), + } } } - match session.send_datagram(Bytes::copy_from_slice(src)) { - Ok(_) => Ok(()), - Err(e) => Err(Some(gst::error_msg!( - gst::ResourceError::Failed, - ["Sending data failed: {}", e] - ))), - } - } else { - let send = &mut stream.as_mut().unwrap(); - - match wait(&self.canceller, send.write_all(src), timeout) { - Ok(Ok(_)) => Ok(()), - Ok(Err(e)) => Err(Some(gst::error_msg!( - gst::ResourceError::Failed, - ["Sending data failed: {}", e] - ))), - Err(e) => match e { - WaitError::FutureAborted => { - gst::warning!(CAT, imp = self, "Sending aborted"); - Ok(()) - } - WaitError::FutureError(e) => Err(Some(gst::error_msg!( - gst::ResourceError::Failed, - ["Sending data failed: {}", e] - ))), - }, - } + let send = started.stream.as_mut().expect("Stream must be valid here"); + self.write_stream(send, src, timeout) } } - async fn init_connection(&self) -> Result<(Session, Option), WaitError> { - let (use_datagram, endpoint_config) = { + async fn init_connection(&self) -> Result { + let endpoint_config = { let settings = self.settings.lock().unwrap(); let secure_conn = settings.secure_conn; let server_addr = make_socket_addr(format!("{}:{}", settings.address, settings.port).as_str())?; - ( - settings.use_datagram, - QuinnQuicEndpointConfig { - server_addr, - server_name: settings.server_name.clone(), - client_addr: None, - secure_conn, - alpns: vec![HTTP3_ALPN.to_string()], - certificate_file: settings.certificate_file.clone(), - private_key_file: settings.private_key_file.clone(), - keep_alive_interval: 0, - transport_config: settings.transport_config, - with_client_auth: false, - }, - ) + QuinnQuicEndpointConfig { + server_addr, + server_name: settings.server_name.clone(), + client_addr: None, + secure_conn, + alpns: vec![HTTP3_ALPN.to_string()], + certificate_file: settings.certificate_file.clone(), + private_key_file: settings.private_key_file.clone(), + keep_alive_interval: 0, + transport_config: settings.transport_config, + with_client_auth: false, + } }; let endpoint = server_endpoint(&endpoint_config).map_err(|err| { @@ -637,23 +653,204 @@ impl QuinnWebTransportServerSink { gst::info!(CAT, imp = self, "accepted session"); - let stream = if !use_datagram { - let (stream, _) = session.open_bi().await.map_err(|err| { - WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Failed to open stream: {}", err] - )) - })?; - Ok(Some(stream)) - } else { - let max_datagram_size = session.max_datagram_size(); - gst::info!( + Ok(session) + } + + fn sink_query(&self, query: &mut gst::QueryRef) -> bool { + gst::debug!(CAT, imp = self, "Handling sink query: {query:?}"); + + let s = query.structure_mut(); + + match s.name().as_str() { + QUIC_DATAGRAM_PROBE => self.handle_datagram_query(s), + QUIC_STREAM_OPEN => self.handle_open_stream_query(s), + _ => false, + } + } + + fn handle_open_stream_query(&self, s: &mut gst::StructureRef) -> bool { + gst::debug!(CAT, imp = self, "Handling open stream query: {s:?}"); + + let settings = self.settings.lock().unwrap(); + let timeout = settings.timeout; + drop(settings); + + let mut state = self.state.lock().unwrap(); + if let State::Started(ref mut state) = *state { + let session = state.session.clone(); + + gst::debug!( CAT, imp = self, - "Datagram size reported by peer: {max_datagram_size}" + "Attempting to open stream for stream query: {s:?}" ); - Ok(None) - }?; - Ok((session, stream)) + + match self.open_stream(session, timeout) { + Ok(stream) => { + let index = state.stream_idx; + + if let Ok(priority) = s.get::(QUIC_STREAM_PRIORITY) { + // Default value of priority for Stream is already 0. + if priority != 0 { + let _ = stream.set_priority(priority); + } + } + + gst::debug!( + CAT, + imp = self, + "Opened stream for query: {s:?}, stream: {:?}, priority: {:?}", + stream, + stream.priority() + ); + + state.stream_map.insert(index, stream); + s.set_value(QUIC_STREAM_ID, index.to_send_value()); + + state.stream_idx += 1; + + return true; + } + Err(err) => { + gst::error!( + CAT, + imp = self, + "Failed to handle open stream query, {err:?}" + ); + return false; + } + } + } + + false + } + + fn handle_datagram_query(&self, s: &mut gst::StructureRef) -> bool { + gst::debug!(CAT, imp = self, "Handling datagram query: {s:?}"); + + let state = self.state.lock().unwrap(); + if let State::Started(ref state) = *state { + if state.session.max_datagram_size() > 0 { + return true; + } + + gst::warning!(CAT, imp = self, "Datagram unsupported by peer"); + } + + false + } + + fn open_stream(&self, session: Session, timeout: u32) -> Result { + match wait(&self.canceller, session.open_uni(), timeout) { + Ok(Ok(stream)) => { + gst::debug!(CAT, imp = self, "Opened stream: {:?}", stream); + + Ok(stream) + } + Ok(Err(err)) => { + gst::error!(CAT, imp = self, "Failed to open stream {err}"); + Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to open stream, {err}"] + )) + } + Err(err) => { + gst::error!(CAT, imp = self, "Failed to open stream {err}"); + Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to open stream, {err}"] + )) + } + } + } + + fn write_datagram( + &self, + session: Session, + src: &[u8], + drop_buffer_for_datagram: bool, + ) -> Result<(), Option> { + let size = session.max_datagram_size(); + if src.len() > size { + if drop_buffer_for_datagram { + gst::warning!( + CAT, + imp = self, + "Buffer dropped, current max datagram size: {size} > buffer size: {}", + src.len() + ); + return Ok(()); + } else { + return Err(Some(gst::error_msg!( + gst::ResourceError::Failed, + [ + "Sending data failed, current max datagram size: {size}, buffer size: {}", + src.len() + ] + ))); + } + } + + match session.send_datagram(Bytes::copy_from_slice(src)) { + Ok(_) => Ok(()), + Err(e) => Err(Some(gst::error_msg!( + gst::ResourceError::Failed, + ["Sending data failed: {}", e] + ))), + } + } + + fn write_stream( + &self, + send: &mut SendStream, + src: &[u8], + timeout: u32, + ) -> Result<(), Option> { + match wait(&self.canceller, send.write_all(src), timeout) { + Ok(Ok(_)) => Ok(()), + Ok(Err(e)) => Err(Some(gst::error_msg!( + gst::ResourceError::Failed, + ["Sending data failed: {}", e] + ))), + Err(e) => match e { + WaitError::FutureAborted => { + gst::warning!(CAT, imp = self, "Sending aborted"); + Ok(()) + } + WaitError::FutureError(e) => Err(Some(gst::error_msg!( + gst::ResourceError::Failed, + ["Sending data failed: {}", e] + ))), + }, + } + } + + fn close_stream(&self, stream: &mut SendStream, timeout: u32) { + /* + * Shutdown stream gracefully + * send.finish() may fail, but the error is harmless. + */ + let _ = stream.finish(); + + match wait(&self.canceller, stream.stopped(), timeout) { + Ok(r) => { + if let Err(e) = r { + let err_msg = format!("Stream finish request error: {}", e); + gst::error!(CAT, imp = self, "{}", err_msg); + } else { + gst::info!(CAT, imp = self, "Stream {:?} finished", stream); + } + } + Err(e) => match e { + WaitError::FutureAborted => { + let err_msg = "Stream finish request aborted".to_string(); + gst::warning!(CAT, imp = self, "{}", err_msg); + } + WaitError::FutureError(e) => { + let err_msg = format!("Stream finish request future error: {}", e); + gst::error!(CAT, imp = self, "{}", err_msg); + } + }, + } } }