net/quinn: Improve stream shutdown process

Co-authored-by: Sanchayan Maity <sanchayan@asymptotic.io>
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1565>
This commit is contained in:
Tamas Levai 2024-05-09 16:43:26 +02:00
parent 13c3db7857
commit 5884c00bd0
2 changed files with 54 additions and 9 deletions

View file

@ -377,6 +377,7 @@ impl BaseSinkImpl for QuinnQuicSink {
fn stop(&self) -> Result<(), gst::ErrorMessage> { fn stop(&self) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
let timeout = settings.timeout;
let use_datagram = settings.use_datagram; let use_datagram = settings.use_datagram;
drop(settings); drop(settings);
@ -384,14 +385,32 @@ impl BaseSinkImpl for QuinnQuicSink {
if let State::Started(ref mut state) = *state { if let State::Started(ref mut state) = *state {
let connection = &state.connection; let connection = &state.connection;
let close_msg = CONNECTION_CLOSE_MSG.to_string(); let mut close_msg = CONNECTION_CLOSE_MSG.to_string();
if !use_datagram { if !use_datagram {
let send = &mut state.stream.as_mut().unwrap(); let send = &mut state.stream.as_mut().unwrap();
// Shutdown stream gracefully // Shutdown stream gracefully
// This may fail, but the error is harmless. // send.finish() may fail, but the error is harmless.
let _ = send.finish(); let _ = send.finish();
match wait(&self.canceller, send.stopped(), timeout) {
Ok(r) => {
if let Err(e) = r {
close_msg = format!("Stream finish request error: {}", e);
gst::error!(CAT, imp: self, "{}", close_msg);
}
}
Err(e) => match e {
WaitError::FutureAborted => {
close_msg = "Stream finish request aborted".to_string();
gst::warning!(CAT, imp: self, "{}", close_msg);
}
WaitError::FutureError(e) => {
close_msg = format!("Stream finish request future error: {}", e);
gst::error!(CAT, imp: self, "{}", close_msg);
}
},
};
} }
connection.close(CONNECTION_CLOSE_CODE.into(), close_msg.as_bytes()); connection.close(CONNECTION_CLOSE_CODE.into(), close_msg.as_bytes());

View file

@ -17,7 +17,7 @@ use gst_base::prelude::*;
use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use quinn::{Connection, ConnectionError, RecvStream}; use quinn::{Connection, ConnectionError, ReadError, RecvStream};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Mutex; use std::sync::Mutex;
@ -485,8 +485,14 @@ impl QuinnQuicSrc {
match conn.read_datagram().await { match conn.read_datagram().await {
Ok(bytes) => Ok(bytes), Ok(bytes) => Ok(bytes),
Err(err) => match err { Err(err) => match err {
ConnectionError::ApplicationClosed(_) ConnectionError::ApplicationClosed(ac) => {
| ConnectionError::ConnectionClosed(_) => Ok(Bytes::new()), gst::info!(CAT, imp: self, "Application closed connection, {}", ac);
Ok(Bytes::new())
}
ConnectionError::ConnectionClosed(cc) => {
gst::info!(CAT, imp: self, "Transport closed connection, {}", cc);
Ok(Bytes::new())
}
_ => Err(WaitError::FutureError(gst::error_msg!( _ => Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Datagram read error: {}", err] ["Datagram read error: {}", err]
@ -499,10 +505,30 @@ impl QuinnQuicSrc {
match recv.read_chunk(length as usize, true).await { match recv.read_chunk(length as usize, true).await {
Ok(Some(chunk)) => Ok(chunk.bytes), Ok(Some(chunk)) => Ok(chunk.bytes),
Ok(None) => Ok(Bytes::new()), Ok(None) => Ok(Bytes::new()),
Err(err) => Err(WaitError::FutureError(gst::error_msg!( Err(err) => match err {
gst::ResourceError::Failed, ReadError::ConnectionLost(conn_err) => match conn_err {
["Stream read error: {}", err] ConnectionError::ConnectionClosed(cc) => {
))), gst::info!(CAT, imp: self, "Transport closed connection, {}", cc);
Ok(Bytes::new())
}
ConnectionError::ApplicationClosed(ac) => {
gst::info!(CAT, imp: self, "Application closed connection, {}", ac);
Ok(Bytes::new())
}
_ => Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Stream read error: {}", conn_err]
))),
},
ReadError::ClosedStream => {
gst::info!(CAT, imp: self, "Stream closed");
Ok(Bytes::new())
}
_ => Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Stream read error: {}", err]
))),
},
} }
} }
}; };