mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2025-01-15 13:45:28 +00:00
net/quinn: Support stream multiplexing in quinnwtserversink
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1966>
This commit is contained in:
parent
53160cb6ab
commit
a02296eb95
1 changed files with 308 additions and 111 deletions
|
@ -11,6 +11,8 @@
|
|||
//
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
use crate::quinnquicmeta::*;
|
||||
use crate::quinnquicquery::*;
|
||||
use crate::utils::{
|
||||
client_endpoint, get_stats, make_socket_addr, server_endpoint, wait, QuinnQuicEndpointConfig,
|
||||
WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG,
|
||||
|
@ -21,6 +23,7 @@ use futures::future;
|
|||
use gst::{glib, prelude::*, subclass::prelude::*};
|
||||
use gst_base::subclass::prelude::*;
|
||||
use quinn::{Connection, TransportConfig};
|
||||
use std::collections::HashMap;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::{LazyLock, Mutex};
|
||||
use web_transport_quinn::{Request, SendStream, Session};
|
||||
|
@ -36,6 +39,8 @@ static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
|
|||
struct Started {
|
||||
session: Session,
|
||||
stream: Option<SendStream>,
|
||||
stream_map: HashMap<u64, SendStream>,
|
||||
stream_idx: u64,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
|
@ -73,7 +78,7 @@ impl Default for Settings {
|
|||
port: DEFAULT_PORT,
|
||||
server_name: DEFAULT_SERVER_NAME.to_string(),
|
||||
timeout: 0,
|
||||
use_datagram: false,
|
||||
use_datagram: DEFAULT_USE_DATAGRAM,
|
||||
certificate_file: None,
|
||||
private_key_file: None,
|
||||
secure_conn: DEFAULT_SECURE_CONNECTION,
|
||||
|
@ -383,8 +388,13 @@ impl BaseSinkImpl for QuinnWebTransportServerSink {
|
|||
}
|
||||
|
||||
match wait(&self.canceller, self.init_connection(), timeout) {
|
||||
Ok(Ok((session, stream))) => {
|
||||
*state = State::Started(Started { session, stream });
|
||||
Ok(Ok(session)) => {
|
||||
*state = State::Started(Started {
|
||||
session,
|
||||
stream: None,
|
||||
stream_map: HashMap::new(),
|
||||
stream_idx: 0,
|
||||
});
|
||||
gst::info!(CAT, imp = self, "Started");
|
||||
Ok(())
|
||||
}
|
||||
|
@ -420,36 +430,19 @@ impl BaseSinkImpl for QuinnWebTransportServerSink {
|
|||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
if let State::Started(ref mut state) = *state {
|
||||
let session = &state.session;
|
||||
let mut close_msg = CONNECTION_CLOSE_MSG.to_string();
|
||||
|
||||
if !use_datagram {
|
||||
let send = &mut state.stream.as_mut().unwrap();
|
||||
|
||||
// Shutdown stream gracefully
|
||||
// send.finish() may fail, but the error is harmless.
|
||||
let _ = send.finish();
|
||||
match wait(&self.canceller, send.stopped(), timeout) {
|
||||
Ok(r) => {
|
||||
if let Err(e) = r {
|
||||
close_msg = format!("Stream finish request error: {}", e);
|
||||
gst::error!(CAT, imp = self, "{}", close_msg);
|
||||
}
|
||||
}
|
||||
Err(e) => match e {
|
||||
WaitError::FutureAborted => {
|
||||
close_msg = "Stream finish request aborted".to_string();
|
||||
gst::warning!(CAT, imp = self, "{}", close_msg);
|
||||
}
|
||||
WaitError::FutureError(e) => {
|
||||
close_msg = format!("Stream finish request future error: {}", e);
|
||||
gst::error!(CAT, imp = self, "{}", close_msg);
|
||||
}
|
||||
},
|
||||
};
|
||||
if let Some(ref mut send) = state.stream.take() {
|
||||
self.close_stream(send, timeout);
|
||||
}
|
||||
}
|
||||
|
||||
session.close(CONNECTION_CLOSE_CODE, close_msg.as_bytes());
|
||||
for stream in state.stream_map.values_mut() {
|
||||
self.close_stream(stream, timeout);
|
||||
}
|
||||
|
||||
state
|
||||
.session
|
||||
.close(CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG.as_bytes());
|
||||
}
|
||||
|
||||
*state = State::Stopped;
|
||||
|
@ -472,7 +465,9 @@ impl BaseSinkImpl for QuinnWebTransportServerSink {
|
|||
gst::FlowError::Error
|
||||
})?;
|
||||
|
||||
match self.send_buffer(&map) {
|
||||
let meta = buffer.meta::<QuinnQuicMeta>();
|
||||
|
||||
match self.send_buffer(&map, meta) {
|
||||
Ok(_) => Ok(gst::FlowSuccess::Ok),
|
||||
Err(err) => match err {
|
||||
Some(error_message) => {
|
||||
|
@ -488,6 +483,13 @@ impl BaseSinkImpl for QuinnWebTransportServerSink {
|
|||
}
|
||||
}
|
||||
|
||||
fn query(&self, query: &mut gst::QueryRef) -> bool {
|
||||
match query.view_mut() {
|
||||
gst::QueryViewMut::Custom(q) => self.sink_query(q),
|
||||
_ => BaseSinkImplExt::parent_query(self, query),
|
||||
}
|
||||
}
|
||||
|
||||
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
|
||||
let mut canceller = self.canceller.lock().unwrap();
|
||||
canceller.abort();
|
||||
|
@ -501,10 +503,42 @@ impl BaseSinkImpl for QuinnWebTransportServerSink {
|
|||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn event(&self, event: gst::Event) -> bool {
|
||||
use gst::EventView;
|
||||
|
||||
gst::debug!(CAT, imp = self, "Handling event {:?}", event);
|
||||
|
||||
let settings = self.settings.lock().unwrap();
|
||||
let timeout = settings.timeout;
|
||||
drop(settings);
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if let State::Started(ref mut state) = *state {
|
||||
if let EventView::CustomDownstream(ev) = event.view() {
|
||||
if let Some(s) = ev.structure() {
|
||||
if s.name() == QUIC_STREAM_CLOSE_CUSTOMDOWNSTREAM_EVENT {
|
||||
if let Ok(stream_id) = s.get::<u64>(QUIC_STREAM_ID) {
|
||||
if let Some(mut stream) = state.stream_map.remove(&stream_id) {
|
||||
self.close_stream(&mut stream, timeout);
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
self.parent_event(event)
|
||||
}
|
||||
}
|
||||
|
||||
impl QuinnWebTransportServerSink {
|
||||
fn send_buffer(&self, src: &[u8]) -> Result<(), Option<gst::ErrorMessage>> {
|
||||
fn send_buffer(
|
||||
&self,
|
||||
src: &[u8],
|
||||
meta: Option<gst::MetaRef<'_, QuinnQuicMeta>>,
|
||||
) -> Result<(), Option<gst::ErrorMessage>> {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
let timeout = settings.timeout;
|
||||
let use_datagram = settings.use_datagram;
|
||||
|
@ -513,11 +547,8 @@ impl QuinnWebTransportServerSink {
|
|||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
|
||||
let (session, stream) = match *state {
|
||||
State::Started(Started {
|
||||
ref session,
|
||||
ref mut stream,
|
||||
}) => (session, stream),
|
||||
let started = match *state {
|
||||
State::Started(ref mut started) => started,
|
||||
State::Stopped => {
|
||||
return Err(Some(gst::error_msg!(
|
||||
gst::LibraryError::Failed,
|
||||
|
@ -525,79 +556,64 @@ impl QuinnWebTransportServerSink {
|
|||
)));
|
||||
}
|
||||
};
|
||||
let session = started.session.clone();
|
||||
|
||||
if use_datagram {
|
||||
let size = session.max_datagram_size();
|
||||
if src.len() > size {
|
||||
if drop_buffer_for_datagram {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
imp = self,
|
||||
"Buffer dropped, current max datagram size: {size} > buffer size: {}",
|
||||
src.len()
|
||||
);
|
||||
return Ok(());
|
||||
if let Some(m) = meta {
|
||||
if m.is_datagram() {
|
||||
self.write_datagram(session, src, drop_buffer_for_datagram)
|
||||
} else {
|
||||
let stream_id = m.stream_id();
|
||||
|
||||
if let Some(send) = started.stream_map.get_mut(&stream_id) {
|
||||
gst::trace!(CAT, imp = self, "Writing buffer for stream {stream_id:?}");
|
||||
self.write_stream(send, src, timeout)
|
||||
} else {
|
||||
return Err(Some(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Sending data failed, current max datagram size: {size}, buffer size: {}", src.len()]
|
||||
)));
|
||||
Err(Some(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["No stream for buffer with stream id {}", stream_id]
|
||||
)))
|
||||
}
|
||||
}
|
||||
} else if use_datagram {
|
||||
self.write_datagram(session, src, drop_buffer_for_datagram)
|
||||
} else {
|
||||
{
|
||||
if started.stream.is_none() {
|
||||
match self.open_stream(session, timeout) {
|
||||
Ok(stream) => {
|
||||
gst::debug!(CAT, imp = self, "Opened stream: {:?}", stream);
|
||||
started.stream = Some(stream);
|
||||
}
|
||||
Err(err) => return Err(Some(err)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
match session.send_datagram(Bytes::copy_from_slice(src)) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => Err(Some(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Sending data failed: {}", e]
|
||||
))),
|
||||
}
|
||||
} else {
|
||||
let send = &mut stream.as_mut().unwrap();
|
||||
|
||||
match wait(&self.canceller, send.write_all(src), timeout) {
|
||||
Ok(Ok(_)) => Ok(()),
|
||||
Ok(Err(e)) => Err(Some(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Sending data failed: {}", e]
|
||||
))),
|
||||
Err(e) => match e {
|
||||
WaitError::FutureAborted => {
|
||||
gst::warning!(CAT, imp = self, "Sending aborted");
|
||||
Ok(())
|
||||
}
|
||||
WaitError::FutureError(e) => Err(Some(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Sending data failed: {}", e]
|
||||
))),
|
||||
},
|
||||
}
|
||||
let send = started.stream.as_mut().expect("Stream must be valid here");
|
||||
self.write_stream(send, src, timeout)
|
||||
}
|
||||
}
|
||||
|
||||
async fn init_connection(&self) -> Result<(Session, Option<SendStream>), WaitError> {
|
||||
let (use_datagram, endpoint_config) = {
|
||||
async fn init_connection(&self) -> Result<Session, WaitError> {
|
||||
let endpoint_config = {
|
||||
let settings = self.settings.lock().unwrap();
|
||||
|
||||
let secure_conn = settings.secure_conn;
|
||||
let server_addr =
|
||||
make_socket_addr(format!("{}:{}", settings.address, settings.port).as_str())?;
|
||||
|
||||
(
|
||||
settings.use_datagram,
|
||||
QuinnQuicEndpointConfig {
|
||||
server_addr,
|
||||
server_name: settings.server_name.clone(),
|
||||
client_addr: None,
|
||||
secure_conn,
|
||||
alpns: vec![HTTP3_ALPN.to_string()],
|
||||
certificate_file: settings.certificate_file.clone(),
|
||||
private_key_file: settings.private_key_file.clone(),
|
||||
keep_alive_interval: 0,
|
||||
transport_config: settings.transport_config,
|
||||
with_client_auth: false,
|
||||
},
|
||||
)
|
||||
QuinnQuicEndpointConfig {
|
||||
server_addr,
|
||||
server_name: settings.server_name.clone(),
|
||||
client_addr: None,
|
||||
secure_conn,
|
||||
alpns: vec![HTTP3_ALPN.to_string()],
|
||||
certificate_file: settings.certificate_file.clone(),
|
||||
private_key_file: settings.private_key_file.clone(),
|
||||
keep_alive_interval: 0,
|
||||
transport_config: settings.transport_config,
|
||||
with_client_auth: false,
|
||||
}
|
||||
};
|
||||
|
||||
let endpoint = server_endpoint(&endpoint_config).map_err(|err| {
|
||||
|
@ -637,23 +653,204 @@ impl QuinnWebTransportServerSink {
|
|||
|
||||
gst::info!(CAT, imp = self, "accepted session");
|
||||
|
||||
let stream = if !use_datagram {
|
||||
let (stream, _) = session.open_bi().await.map_err(|err| {
|
||||
WaitError::FutureError(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Failed to open stream: {}", err]
|
||||
))
|
||||
})?;
|
||||
Ok(Some(stream))
|
||||
} else {
|
||||
let max_datagram_size = session.max_datagram_size();
|
||||
gst::info!(
|
||||
Ok(session)
|
||||
}
|
||||
|
||||
fn sink_query(&self, query: &mut gst::QueryRef) -> bool {
|
||||
gst::debug!(CAT, imp = self, "Handling sink query: {query:?}");
|
||||
|
||||
let s = query.structure_mut();
|
||||
|
||||
match s.name().as_str() {
|
||||
QUIC_DATAGRAM_PROBE => self.handle_datagram_query(s),
|
||||
QUIC_STREAM_OPEN => self.handle_open_stream_query(s),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
fn handle_open_stream_query(&self, s: &mut gst::StructureRef) -> bool {
|
||||
gst::debug!(CAT, imp = self, "Handling open stream query: {s:?}");
|
||||
|
||||
let settings = self.settings.lock().unwrap();
|
||||
let timeout = settings.timeout;
|
||||
drop(settings);
|
||||
|
||||
let mut state = self.state.lock().unwrap();
|
||||
if let State::Started(ref mut state) = *state {
|
||||
let session = state.session.clone();
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp = self,
|
||||
"Datagram size reported by peer: {max_datagram_size}"
|
||||
"Attempting to open stream for stream query: {s:?}"
|
||||
);
|
||||
Ok(None)
|
||||
}?;
|
||||
Ok((session, stream))
|
||||
|
||||
match self.open_stream(session, timeout) {
|
||||
Ok(stream) => {
|
||||
let index = state.stream_idx;
|
||||
|
||||
if let Ok(priority) = s.get::<i32>(QUIC_STREAM_PRIORITY) {
|
||||
// Default value of priority for Stream is already 0.
|
||||
if priority != 0 {
|
||||
let _ = stream.set_priority(priority);
|
||||
}
|
||||
}
|
||||
|
||||
gst::debug!(
|
||||
CAT,
|
||||
imp = self,
|
||||
"Opened stream for query: {s:?}, stream: {:?}, priority: {:?}",
|
||||
stream,
|
||||
stream.priority()
|
||||
);
|
||||
|
||||
state.stream_map.insert(index, stream);
|
||||
s.set_value(QUIC_STREAM_ID, index.to_send_value());
|
||||
|
||||
state.stream_idx += 1;
|
||||
|
||||
return true;
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(
|
||||
CAT,
|
||||
imp = self,
|
||||
"Failed to handle open stream query, {err:?}"
|
||||
);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
fn handle_datagram_query(&self, s: &mut gst::StructureRef) -> bool {
|
||||
gst::debug!(CAT, imp = self, "Handling datagram query: {s:?}");
|
||||
|
||||
let state = self.state.lock().unwrap();
|
||||
if let State::Started(ref state) = *state {
|
||||
if state.session.max_datagram_size() > 0 {
|
||||
return true;
|
||||
}
|
||||
|
||||
gst::warning!(CAT, imp = self, "Datagram unsupported by peer");
|
||||
}
|
||||
|
||||
false
|
||||
}
|
||||
|
||||
fn open_stream(&self, session: Session, timeout: u32) -> Result<SendStream, gst::ErrorMessage> {
|
||||
match wait(&self.canceller, session.open_uni(), timeout) {
|
||||
Ok(Ok(stream)) => {
|
||||
gst::debug!(CAT, imp = self, "Opened stream: {:?}", stream);
|
||||
|
||||
Ok(stream)
|
||||
}
|
||||
Ok(Err(err)) => {
|
||||
gst::error!(CAT, imp = self, "Failed to open stream {err}");
|
||||
Err(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Failed to open stream, {err}"]
|
||||
))
|
||||
}
|
||||
Err(err) => {
|
||||
gst::error!(CAT, imp = self, "Failed to open stream {err}");
|
||||
Err(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Failed to open stream, {err}"]
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn write_datagram(
|
||||
&self,
|
||||
session: Session,
|
||||
src: &[u8],
|
||||
drop_buffer_for_datagram: bool,
|
||||
) -> Result<(), Option<gst::ErrorMessage>> {
|
||||
let size = session.max_datagram_size();
|
||||
if src.len() > size {
|
||||
if drop_buffer_for_datagram {
|
||||
gst::warning!(
|
||||
CAT,
|
||||
imp = self,
|
||||
"Buffer dropped, current max datagram size: {size} > buffer size: {}",
|
||||
src.len()
|
||||
);
|
||||
return Ok(());
|
||||
} else {
|
||||
return Err(Some(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
[
|
||||
"Sending data failed, current max datagram size: {size}, buffer size: {}",
|
||||
src.len()
|
||||
]
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
||||
match session.send_datagram(Bytes::copy_from_slice(src)) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(e) => Err(Some(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Sending data failed: {}", e]
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn write_stream(
|
||||
&self,
|
||||
send: &mut SendStream,
|
||||
src: &[u8],
|
||||
timeout: u32,
|
||||
) -> Result<(), Option<gst::ErrorMessage>> {
|
||||
match wait(&self.canceller, send.write_all(src), timeout) {
|
||||
Ok(Ok(_)) => Ok(()),
|
||||
Ok(Err(e)) => Err(Some(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Sending data failed: {}", e]
|
||||
))),
|
||||
Err(e) => match e {
|
||||
WaitError::FutureAborted => {
|
||||
gst::warning!(CAT, imp = self, "Sending aborted");
|
||||
Ok(())
|
||||
}
|
||||
WaitError::FutureError(e) => Err(Some(gst::error_msg!(
|
||||
gst::ResourceError::Failed,
|
||||
["Sending data failed: {}", e]
|
||||
))),
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn close_stream(&self, stream: &mut SendStream, timeout: u32) {
|
||||
/*
|
||||
* Shutdown stream gracefully
|
||||
* send.finish() may fail, but the error is harmless.
|
||||
*/
|
||||
let _ = stream.finish();
|
||||
|
||||
match wait(&self.canceller, stream.stopped(), timeout) {
|
||||
Ok(r) => {
|
||||
if let Err(e) = r {
|
||||
let err_msg = format!("Stream finish request error: {}", e);
|
||||
gst::error!(CAT, imp = self, "{}", err_msg);
|
||||
} else {
|
||||
gst::info!(CAT, imp = self, "Stream {:?} finished", stream);
|
||||
}
|
||||
}
|
||||
Err(e) => match e {
|
||||
WaitError::FutureAborted => {
|
||||
let err_msg = "Stream finish request aborted".to_string();
|
||||
gst::warning!(CAT, imp = self, "{}", err_msg);
|
||||
}
|
||||
WaitError::FutureError(e) => {
|
||||
let err_msg = format!("Stream finish request future error: {}", e);
|
||||
gst::error!(CAT, imp = self, "{}", err_msg);
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue