diff --git a/net/quinn/src/quinnquicsink/imp.rs b/net/quinn/src/quinnquicsink/imp.rs index 2fac3429..fedcf3ad 100644 --- a/net/quinn/src/quinnquicsink/imp.rs +++ b/net/quinn/src/quinnquicsink/imp.rs @@ -377,6 +377,7 @@ impl BaseSinkImpl for QuinnQuicSink { fn stop(&self) -> Result<(), gst::ErrorMessage> { let settings = self.settings.lock().unwrap(); + let timeout = settings.timeout; let use_datagram = settings.use_datagram; drop(settings); @@ -384,14 +385,32 @@ impl BaseSinkImpl for QuinnQuicSink { if let State::Started(ref mut state) = *state { let connection = &state.connection; - let close_msg = CONNECTION_CLOSE_MSG.to_string(); + let mut close_msg = CONNECTION_CLOSE_MSG.to_string(); if !use_datagram { let send = &mut state.stream.as_mut().unwrap(); // Shutdown stream gracefully - // This may fail, but the error is harmless. + // send.finish() may fail, but the error is harmless. let _ = send.finish(); + match wait(&self.canceller, send.stopped(), timeout) { + Ok(r) => { + if let Err(e) = r { + close_msg = format!("Stream finish request error: {}", e); + gst::error!(CAT, imp: self, "{}", close_msg); + } + } + Err(e) => match e { + WaitError::FutureAborted => { + close_msg = "Stream finish request aborted".to_string(); + gst::warning!(CAT, imp: self, "{}", close_msg); + } + WaitError::FutureError(e) => { + close_msg = format!("Stream finish request future error: {}", e); + gst::error!(CAT, imp: self, "{}", close_msg); + } + }, + }; } connection.close(CONNECTION_CLOSE_CODE.into(), close_msg.as_bytes()); diff --git a/net/quinn/src/quinnquicsrc/imp.rs b/net/quinn/src/quinnquicsrc/imp.rs index 9af42d23..cf22ca1b 100644 --- a/net/quinn/src/quinnquicsrc/imp.rs +++ b/net/quinn/src/quinnquicsrc/imp.rs @@ -17,7 +17,7 @@ use gst_base::prelude::*; use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; use once_cell::sync::Lazy; -use quinn::{Connection, ConnectionError, RecvStream}; +use quinn::{Connection, ConnectionError, ReadError, RecvStream}; use std::path::PathBuf; use std::sync::Mutex; @@ -485,8 +485,14 @@ impl QuinnQuicSrc { match conn.read_datagram().await { Ok(bytes) => Ok(bytes), Err(err) => match err { - ConnectionError::ApplicationClosed(_) - | ConnectionError::ConnectionClosed(_) => Ok(Bytes::new()), + ConnectionError::ApplicationClosed(ac) => { + gst::info!(CAT, imp: self, "Application closed connection, {}", ac); + Ok(Bytes::new()) + } + ConnectionError::ConnectionClosed(cc) => { + gst::info!(CAT, imp: self, "Transport closed connection, {}", cc); + Ok(Bytes::new()) + } _ => Err(WaitError::FutureError(gst::error_msg!( gst::ResourceError::Failed, ["Datagram read error: {}", err] @@ -499,10 +505,30 @@ impl QuinnQuicSrc { match recv.read_chunk(length as usize, true).await { Ok(Some(chunk)) => Ok(chunk.bytes), Ok(None) => Ok(Bytes::new()), - Err(err) => Err(WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Stream read error: {}", err] - ))), + Err(err) => match err { + ReadError::ConnectionLost(conn_err) => match conn_err { + ConnectionError::ConnectionClosed(cc) => { + gst::info!(CAT, imp: self, "Transport closed connection, {}", cc); + Ok(Bytes::new()) + } + ConnectionError::ApplicationClosed(ac) => { + gst::info!(CAT, imp: self, "Application closed connection, {}", ac); + Ok(Bytes::new()) + } + _ => Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Stream read error: {}", conn_err] + ))), + }, + ReadError::ClosedStream => { + gst::info!(CAT, imp: self, "Stream closed"); + Ok(Bytes::new()) + } + _ => Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Stream read error: {}", err] + ))), + }, } } };