mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-11-22 03:21:00 +00:00
net/quinn: Support stream multiplexing in quinnquicsink
Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1634>
This commit is contained in:
parent
1cc2682b55
commit
babb6f360b
3 changed files with 401 additions and 107 deletions
|
@ -7,6 +7,7 @@
|
||||||
//
|
//
|
||||||
// SPDX-License-Identifier: MPL-2.0
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
use gst::glib;
|
use gst::glib;
|
||||||
|
use quinn::VarInt;
|
||||||
|
|
||||||
pub(crate) static DEFAULT_SERVER_NAME: &str = "localhost";
|
pub(crate) static DEFAULT_SERVER_NAME: &str = "localhost";
|
||||||
pub(crate) static DEFAULT_ADDR: &str = "127.0.0.1";
|
pub(crate) static DEFAULT_ADDR: &str = "127.0.0.1";
|
||||||
|
@ -21,6 +22,7 @@ pub(crate) static DEFAULT_UDP_PAYLOAD_SIZE: u16 = 1452;
|
||||||
pub(crate) static DEFAULT_MIN_UDP_PAYLOAD_SIZE: u16 = 1200;
|
pub(crate) static DEFAULT_MIN_UDP_PAYLOAD_SIZE: u16 = 1200;
|
||||||
pub(crate) static DEFAULT_MAX_UDP_PAYLOAD_SIZE: u16 = 65527;
|
pub(crate) static DEFAULT_MAX_UDP_PAYLOAD_SIZE: u16 = 65527;
|
||||||
pub(crate) static DEFAULT_DROP_BUFFER_FOR_DATAGRAM: bool = false;
|
pub(crate) static DEFAULT_DROP_BUFFER_FOR_DATAGRAM: bool = false;
|
||||||
|
pub(crate) static DEFAULT_MAX_CONCURRENT_UNI_STREAMS: VarInt = VarInt::from_u32(32);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* For QUIC transport parameters
|
* For QUIC transport parameters
|
||||||
|
@ -53,6 +55,8 @@ pub struct QuinnQuicTransportConfig {
|
||||||
pub max_udp_payload_size: u16,
|
pub max_udp_payload_size: u16,
|
||||||
pub min_mtu: u16,
|
pub min_mtu: u16,
|
||||||
pub upper_bound_mtu: u16,
|
pub upper_bound_mtu: u16,
|
||||||
|
pub max_concurrent_uni_streams: VarInt,
|
||||||
|
pub send_window: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for QuinnQuicTransportConfig {
|
impl Default for QuinnQuicTransportConfig {
|
||||||
|
@ -71,6 +75,8 @@ impl Default for QuinnQuicTransportConfig {
|
||||||
max_udp_payload_size: DEFAULT_MAX_UDP_PAYLOAD_SIZE,
|
max_udp_payload_size: DEFAULT_MAX_UDP_PAYLOAD_SIZE,
|
||||||
min_mtu: DEFAULT_MINIMUM_MTU,
|
min_mtu: DEFAULT_MINIMUM_MTU,
|
||||||
upper_bound_mtu: DEFAULT_UPPER_BOUND_MTU,
|
upper_bound_mtu: DEFAULT_UPPER_BOUND_MTU,
|
||||||
|
max_concurrent_uni_streams: DEFAULT_MAX_CONCURRENT_UNI_STREAMS,
|
||||||
|
send_window: (8 * STREAM_RWND).into(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -7,16 +7,22 @@
|
||||||
//
|
//
|
||||||
// SPDX-License-Identifier: MPL-2.0
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use crate::common::*;
|
||||||
|
use crate::quinnquicmeta::*;
|
||||||
|
use crate::quinnquicquery::*;
|
||||||
use crate::utils::{
|
use crate::utils::{
|
||||||
client_endpoint, get_stats, make_socket_addr, server_endpoint, wait, QuinnQuicEndpointConfig,
|
client_endpoint, get_stats, make_socket_addr, server_endpoint, wait, QuinnQuicEndpointConfig,
|
||||||
WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG,
|
WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, RUNTIME,
|
||||||
};
|
};
|
||||||
use crate::{common::*, utils};
|
use crate::{common::*, utils};
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use futures::future;
|
use futures::future;
|
||||||
use gst::{glib, prelude::*, subclass::prelude::*};
|
use gst::{glib, prelude::*, subclass::prelude::*};
|
||||||
use gst_base::subclass::prelude::*;
|
use gst_base::subclass::prelude::*;
|
||||||
use quinn::{Connection, SendStream, TransportConfig};
|
use quinn::{
|
||||||
|
Connection, SendDatagramError, SendStream, StreamId, TransportConfig, VarInt, WriteError,
|
||||||
|
};
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::LazyLock;
|
use std::sync::LazyLock;
|
||||||
use std::sync::Mutex;
|
use std::sync::Mutex;
|
||||||
|
@ -34,6 +40,7 @@ static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
|
||||||
struct Started {
|
struct Started {
|
||||||
connection: Connection,
|
connection: Connection,
|
||||||
stream: Option<SendStream>,
|
stream: Option<SendStream>,
|
||||||
|
stream_map: HashMap<u64, SendStream>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Default)]
|
#[derive(Default)]
|
||||||
|
@ -276,6 +283,17 @@ impl ObjectImpl for QuinnQuicSink {
|
||||||
.blurb("Drop buffers when using datagram if buffer size > max datagram size")
|
.blurb("Drop buffers when using datagram if buffer size > max datagram size")
|
||||||
.default_value(DEFAULT_DROP_BUFFER_FOR_DATAGRAM)
|
.default_value(DEFAULT_DROP_BUFFER_FOR_DATAGRAM)
|
||||||
.build(),
|
.build(),
|
||||||
|
glib::ParamSpecUInt64::builder("max-concurrent-uni-streams")
|
||||||
|
.nick("Maximum concurrent uni-directional streams")
|
||||||
|
.blurb("Maximum number of incoming unidirectional streams that may be open concurrently")
|
||||||
|
.default_value(DEFAULT_MAX_CONCURRENT_UNI_STREAMS.into())
|
||||||
|
.readwrite()
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecUInt64::builder("send-window")
|
||||||
|
.nick("Send Window")
|
||||||
|
.blurb("Maximum number of bytes to transmit to a peer without acknowledgment")
|
||||||
|
.readwrite()
|
||||||
|
.build(),
|
||||||
]
|
]
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -366,6 +384,15 @@ impl ObjectImpl for QuinnQuicSink {
|
||||||
"drop-buffer-for-datagram" => {
|
"drop-buffer-for-datagram" => {
|
||||||
settings.drop_buffer_for_datagram = value.get().expect("type checked upstream");
|
settings.drop_buffer_for_datagram = value.get().expect("type checked upstream");
|
||||||
}
|
}
|
||||||
|
"max-concurrent-uni-streams" => {
|
||||||
|
let value = value.get::<u64>().expect("type checked upstream");
|
||||||
|
settings.transport_config.max_concurrent_uni_streams =
|
||||||
|
VarInt::from_u64(value.max(VarInt::MAX.into())).unwrap();
|
||||||
|
}
|
||||||
|
"send-window" => {
|
||||||
|
settings.transport_config.send_window =
|
||||||
|
value.get::<u64>().expect("type checked upstream");
|
||||||
|
}
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -425,6 +452,10 @@ impl ObjectImpl for QuinnQuicSink {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
"drop-buffer-for-datagram" => settings.drop_buffer_for_datagram.to_value(),
|
"drop-buffer-for-datagram" => settings.drop_buffer_for_datagram.to_value(),
|
||||||
|
"max-concurrent-uni-streams" => {
|
||||||
|
u64::from(settings.transport_config.max_concurrent_uni_streams).to_value()
|
||||||
|
}
|
||||||
|
"send-window" => settings.transport_config.send_window.to_value(),
|
||||||
_ => unimplemented!(),
|
_ => unimplemented!(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -450,10 +481,11 @@ impl BaseSinkImpl for QuinnQuicSink {
|
||||||
}
|
}
|
||||||
|
|
||||||
match wait(&self.canceller, self.init_connection(), timeout) {
|
match wait(&self.canceller, self.init_connection(), timeout) {
|
||||||
Ok(Ok((c, s))) => {
|
Ok(Ok(c)) => {
|
||||||
*state = State::Started(Started {
|
*state = State::Started(Started {
|
||||||
connection: c,
|
connection: c,
|
||||||
stream: s,
|
stream: None,
|
||||||
|
stream_map: HashMap::new(),
|
||||||
});
|
});
|
||||||
|
|
||||||
gst::info!(CAT, imp = self, "Started");
|
gst::info!(CAT, imp = self, "Started");
|
||||||
|
@ -493,35 +525,21 @@ impl BaseSinkImpl for QuinnQuicSink {
|
||||||
|
|
||||||
if let State::Started(ref mut state) = *state {
|
if let State::Started(ref mut state) = *state {
|
||||||
let connection = &state.connection;
|
let connection = &state.connection;
|
||||||
let mut close_msg = CONNECTION_CLOSE_MSG.to_string();
|
|
||||||
|
|
||||||
if !use_datagram {
|
if !use_datagram {
|
||||||
let send = &mut state.stream.as_mut().unwrap();
|
if let Some(ref mut send) = state.stream.take() {
|
||||||
|
self.close_stream(send, timeout);
|
||||||
// Shutdown stream gracefully
|
|
||||||
// send.finish() may fail, but the error is harmless.
|
|
||||||
let _ = send.finish();
|
|
||||||
match wait(&self.canceller, send.stopped(), timeout) {
|
|
||||||
Ok(r) => {
|
|
||||||
if let Err(e) = r {
|
|
||||||
close_msg = format!("Stream finish request error: {}", e);
|
|
||||||
gst::error!(CAT, imp = self, "{}", close_msg);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => match e {
|
|
||||||
WaitError::FutureAborted => {
|
|
||||||
close_msg = "Stream finish request aborted".to_string();
|
|
||||||
gst::warning!(CAT, imp = self, "{}", close_msg);
|
|
||||||
}
|
|
||||||
WaitError::FutureError(e) => {
|
|
||||||
close_msg = format!("Stream finish request future error: {}", e);
|
|
||||||
gst::error!(CAT, imp = self, "{}", close_msg);
|
|
||||||
}
|
|
||||||
},
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
connection.close(CONNECTION_CLOSE_CODE.into(), close_msg.as_bytes());
|
for stream in state.stream_map.values_mut() {
|
||||||
|
self.close_stream(stream, timeout);
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.close(
|
||||||
|
CONNECTION_CLOSE_CODE.into(),
|
||||||
|
CONNECTION_CLOSE_MSG.as_bytes(),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
*state = State::Stopped;
|
*state = State::Stopped;
|
||||||
|
@ -544,7 +562,9 @@ impl BaseSinkImpl for QuinnQuicSink {
|
||||||
gst::FlowError::Error
|
gst::FlowError::Error
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
match self.send_buffer(&map) {
|
let meta = buffer.meta::<QuinnQuicMeta>();
|
||||||
|
|
||||||
|
match self.send_buffer(&map, meta) {
|
||||||
Ok(_) => Ok(gst::FlowSuccess::Ok),
|
Ok(_) => Ok(gst::FlowSuccess::Ok),
|
||||||
Err(err) => match err {
|
Err(err) => match err {
|
||||||
Some(error_message) => {
|
Some(error_message) => {
|
||||||
|
@ -560,6 +580,13 @@ impl BaseSinkImpl for QuinnQuicSink {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn query(&self, query: &mut gst::QueryRef) -> bool {
|
||||||
|
match query.view_mut() {
|
||||||
|
gst::QueryViewMut::Custom(q) => self.sink_query(q),
|
||||||
|
_ => BaseSinkImplExt::parent_query(self, query),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
|
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
|
||||||
let mut canceller = self.canceller.lock().unwrap();
|
let mut canceller = self.canceller.lock().unwrap();
|
||||||
canceller.abort();
|
canceller.abort();
|
||||||
|
@ -573,10 +600,42 @@ impl BaseSinkImpl for QuinnQuicSink {
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn event(&self, event: gst::Event) -> bool {
|
||||||
|
use gst::EventView;
|
||||||
|
|
||||||
|
gst::debug!(CAT, imp = self, "Handling event {:?}", event);
|
||||||
|
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
let timeout = settings.timeout;
|
||||||
|
drop(settings);
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
if let State::Started(ref mut state) = *state {
|
||||||
|
if let EventView::CustomDownstream(ev) = event.view() {
|
||||||
|
if let Some(s) = ev.structure() {
|
||||||
|
if s.name() == QUIC_STREAM_CLOSE_CUSTOMDOWNSTREAM_EVENT {
|
||||||
|
if let Ok(stream_id) = s.get::<u64>(QUIC_STREAM_ID) {
|
||||||
|
if let Some(mut stream) = state.stream_map.remove(&stream_id) {
|
||||||
|
self.close_stream(&mut stream, timeout);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.parent_event(event)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl QuinnQuicSink {
|
impl QuinnQuicSink {
|
||||||
fn send_buffer(&self, src: &[u8]) -> Result<(), Option<gst::ErrorMessage>> {
|
fn send_buffer(
|
||||||
|
&self,
|
||||||
|
src: &[u8],
|
||||||
|
meta: Option<gst::MetaRef<'_, QuinnQuicMeta>>,
|
||||||
|
) -> Result<(), Option<gst::ErrorMessage>> {
|
||||||
let settings = self.settings.lock().unwrap();
|
let settings = self.settings.lock().unwrap();
|
||||||
let timeout = settings.timeout;
|
let timeout = settings.timeout;
|
||||||
let use_datagram = settings.use_datagram;
|
let use_datagram = settings.use_datagram;
|
||||||
|
@ -585,11 +644,8 @@ impl QuinnQuicSink {
|
||||||
|
|
||||||
let mut state = self.state.lock().unwrap();
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
let (conn, stream) = match *state {
|
let started = match *state {
|
||||||
State::Started(Started {
|
State::Started(ref mut started) => started,
|
||||||
ref connection,
|
|
||||||
ref mut stream,
|
|
||||||
}) => (connection, stream),
|
|
||||||
State::Stopped => {
|
State::Stopped => {
|
||||||
return Err(Some(gst::error_msg!(
|
return Err(Some(gst::error_msg!(
|
||||||
gst::LibraryError::Failed,
|
gst::LibraryError::Failed,
|
||||||
|
@ -597,61 +653,51 @@ impl QuinnQuicSink {
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
let connection = started.connection.clone();
|
||||||
|
|
||||||
if use_datagram {
|
if let Some(m) = meta {
|
||||||
match conn.max_datagram_size() {
|
if m.is_datagram() {
|
||||||
Some(size) => {
|
self.write_datagram(connection, src, drop_buffer_for_datagram)
|
||||||
if src.len() > size {
|
|
||||||
if drop_buffer_for_datagram {
|
|
||||||
gst::warning!(CAT, imp = self, "Buffer dropped, current max datagram size: {size} > buffer size: {}", src.len());
|
|
||||||
return Ok(());
|
|
||||||
} else {
|
} else {
|
||||||
return Err(Some(gst::error_msg!(
|
let stream_id = m.stream_id();
|
||||||
gst::ResourceError::Failed,
|
|
||||||
["Sending data failed, current max datagram size: {size}, buffer size: {}", src.len()]
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
match conn.send_datagram(Bytes::copy_from_slice(src)) {
|
if let Some(send) = started.stream_map.get_mut(&stream_id) {
|
||||||
Ok(_) => Ok(()),
|
gst::trace!(CAT, imp = self, "Writing buffer for stream {stream_id:?}");
|
||||||
Err(e) => Err(Some(gst::error_msg!(
|
self.write_stream(send, src, timeout)
|
||||||
gst::ResourceError::Failed,
|
|
||||||
["Sending data failed: {}", e]
|
|
||||||
))),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
/*
|
|
||||||
* We check for datagram being unsupported by peer in
|
|
||||||
* start/init_connection, so we should never reach here.
|
|
||||||
*/
|
|
||||||
None => unreachable!(),
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
let send = &mut stream.as_mut().unwrap();
|
Err(Some(gst::error_msg!(
|
||||||
|
|
||||||
match wait(&self.canceller, send.write_all(src), timeout) {
|
|
||||||
Ok(Ok(_)) => Ok(()),
|
|
||||||
Ok(Err(e)) => Err(Some(gst::error_msg!(
|
|
||||||
gst::ResourceError::Failed,
|
gst::ResourceError::Failed,
|
||||||
["Sending data failed: {}", e]
|
["No stream for buffer with stream id {}", stream_id]
|
||||||
))),
|
)))
|
||||||
Err(e) => match e {
|
|
||||||
WaitError::FutureAborted => {
|
|
||||||
gst::warning!(CAT, imp = self, "Sending aborted");
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
WaitError::FutureError(e) => Err(Some(gst::error_msg!(
|
}
|
||||||
gst::ResourceError::Failed,
|
} else if use_datagram {
|
||||||
["Sending data failed: {}", e]
|
self.write_datagram(connection, src, drop_buffer_for_datagram)
|
||||||
))),
|
} else {
|
||||||
},
|
{
|
||||||
|
if started.stream.is_none() {
|
||||||
|
match self.open_stream(connection, timeout) {
|
||||||
|
Ok(stream) => {
|
||||||
|
gst::debug!(
|
||||||
|
CAT,
|
||||||
|
imp = self,
|
||||||
|
"Opened connection, stream: {}",
|
||||||
|
stream.id()
|
||||||
|
);
|
||||||
|
started.stream = Some(stream);
|
||||||
|
}
|
||||||
|
Err(err) => return Err(Some(err)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn init_connection(&self) -> Result<(Connection, Option<SendStream>), WaitError> {
|
let send = started.stream.as_mut().expect("Stream must be valid here");
|
||||||
let (role, use_datagram, endpoint_config) = {
|
self.write_stream(send, src, timeout)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn init_connection(&self) -> Result<Connection, WaitError> {
|
||||||
|
let (role, endpoint_config) = {
|
||||||
let settings = self.settings.lock().unwrap();
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
let client_addr = make_socket_addr(
|
let client_addr = make_socket_addr(
|
||||||
|
@ -664,7 +710,6 @@ impl QuinnQuicSink {
|
||||||
let server_name = settings.server_name.clone();
|
let server_name = settings.server_name.clone();
|
||||||
let alpns = settings.alpns.clone();
|
let alpns = settings.alpns.clone();
|
||||||
let role = settings.role;
|
let role = settings.role;
|
||||||
let use_datagram = settings.use_datagram;
|
|
||||||
let keep_alive_interval = settings.keep_alive_interval;
|
let keep_alive_interval = settings.keep_alive_interval;
|
||||||
let secure_conn = settings.secure_conn;
|
let secure_conn = settings.secure_conn;
|
||||||
let certificate_file = settings.certificate_file.clone();
|
let certificate_file = settings.certificate_file.clone();
|
||||||
|
@ -673,7 +718,6 @@ impl QuinnQuicSink {
|
||||||
|
|
||||||
(
|
(
|
||||||
role,
|
role,
|
||||||
use_datagram,
|
|
||||||
QuinnQuicEndpointConfig {
|
QuinnQuicEndpointConfig {
|
||||||
server_addr,
|
server_addr,
|
||||||
server_name,
|
server_name,
|
||||||
|
@ -726,35 +770,276 @@ impl QuinnQuicSink {
|
||||||
})?,
|
})?,
|
||||||
};
|
};
|
||||||
|
|
||||||
let stream = if !use_datagram {
|
|
||||||
let res = connection.open_uni().await.map_err(|err| {
|
|
||||||
WaitError::FutureError(gst::error_msg!(
|
|
||||||
gst::ResourceError::Failed,
|
|
||||||
["Failed to open stream: {}", err]
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
Some(res)
|
|
||||||
} else {
|
|
||||||
match connection.max_datagram_size() {
|
|
||||||
Some(datagram_size) => {
|
|
||||||
gst::info!(
|
gst::info!(
|
||||||
CAT,
|
CAT,
|
||||||
imp = self,
|
imp = self,
|
||||||
"Datagram size reported by peer: {datagram_size}"
|
"Remote connection established: {}",
|
||||||
|
connection.remote_address()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
Ok(connection)
|
||||||
}
|
}
|
||||||
None => {
|
|
||||||
return Err(WaitError::FutureError(gst::error_msg!(
|
fn handle_open_stream_query(&self, s: &mut gst::StructureRef) -> bool {
|
||||||
|
gst::debug!(CAT, imp = self, "Handling open stream query: {s:?}");
|
||||||
|
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
let timeout = settings.timeout;
|
||||||
|
drop(settings);
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
if let State::Started(ref mut state) = *state {
|
||||||
|
let connection = state.connection.clone();
|
||||||
|
|
||||||
|
gst::debug!(
|
||||||
|
CAT,
|
||||||
|
imp = self,
|
||||||
|
"Attempting to open connection for stream query: {s:?}"
|
||||||
|
);
|
||||||
|
|
||||||
|
match self.open_stream(connection, timeout) {
|
||||||
|
Ok(stream) => {
|
||||||
|
let index = stream.id().index();
|
||||||
|
|
||||||
|
if let Ok(priority) = s.get::<i32>(QUIC_STREAM_PRIORITY) {
|
||||||
|
// Default value of priority for Stream is already 0.
|
||||||
|
if priority != 0 {
|
||||||
|
let _ = stream.set_priority(priority);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
gst::debug!(
|
||||||
|
CAT,
|
||||||
|
imp = self,
|
||||||
|
"Opened connection for stream query: {s:?}, stream: {}, priority: {:?}",
|
||||||
|
stream.id(),
|
||||||
|
stream.priority()
|
||||||
|
);
|
||||||
|
|
||||||
|
state.stream_map.insert(index, stream);
|
||||||
|
s.set_value(QUIC_STREAM_ID, index.to_send_value());
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
gst::error!(
|
||||||
|
CAT,
|
||||||
|
imp = self,
|
||||||
|
"Failed to handle open stream query, {err:?}"
|
||||||
|
);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
fn handle_datagram_query(&self, s: &mut gst::StructureRef) -> bool {
|
||||||
|
gst::debug!(CAT, imp = self, "Handling datagram query: {s:?}");
|
||||||
|
|
||||||
|
let state = self.state.lock().unwrap();
|
||||||
|
if let State::Started(ref state) = *state {
|
||||||
|
if state.connection.max_datagram_size().is_some() {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
gst::warning!(CAT, imp = self, "Datagram unsupported by peer");
|
||||||
|
}
|
||||||
|
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
fn open_stream(
|
||||||
|
&self,
|
||||||
|
connection: Connection,
|
||||||
|
timeout: u32,
|
||||||
|
) -> Result<SendStream, gst::ErrorMessage> {
|
||||||
|
match wait(&self.canceller, connection.open_uni(), timeout) {
|
||||||
|
Ok(Ok(stream)) => {
|
||||||
|
gst::debug!(
|
||||||
|
CAT,
|
||||||
|
imp = self,
|
||||||
|
"Opened connection, stream: {}",
|
||||||
|
stream.id()
|
||||||
|
);
|
||||||
|
Ok(stream)
|
||||||
|
}
|
||||||
|
Ok(Err(err)) => {
|
||||||
|
gst::error!(CAT, imp = self, "Failed to open connection {err}");
|
||||||
|
Err(gst::error_msg!(
|
||||||
gst::ResourceError::Failed,
|
gst::ResourceError::Failed,
|
||||||
["Datagram unsupported by the peer"]
|
["Failed to open connection, {err}"]
|
||||||
|
))
|
||||||
|
}
|
||||||
|
Err(err) => {
|
||||||
|
gst::error!(CAT, imp = self, "Failed to open connection {err}");
|
||||||
|
Err(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Failed to open connection, {err}"]
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sink_query(&self, query: &mut gst::QueryRef) -> bool {
|
||||||
|
gst::debug!(CAT, imp = self, "Handling sink query: {query:?}");
|
||||||
|
|
||||||
|
let s = query.structure_mut();
|
||||||
|
|
||||||
|
match s.name().as_str() {
|
||||||
|
QUIC_DATAGRAM_PROBE => self.handle_datagram_query(s),
|
||||||
|
QUIC_STREAM_OPEN => self.handle_open_stream_query(s),
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn write_datagram(
|
||||||
|
&self,
|
||||||
|
conn: Connection,
|
||||||
|
src: &[u8],
|
||||||
|
drop_buffer_for_datagram: bool,
|
||||||
|
) -> Result<(), Option<gst::ErrorMessage>> {
|
||||||
|
match conn.max_datagram_size() {
|
||||||
|
Some(size) => {
|
||||||
|
if src.len() > size {
|
||||||
|
if drop_buffer_for_datagram {
|
||||||
|
gst::warning!(
|
||||||
|
CAT,
|
||||||
|
imp = self,
|
||||||
|
"Buffer dropped, current max datagram size: {size} > buffer size: {}",
|
||||||
|
src.len()
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
} else {
|
||||||
|
return Err(Some(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Sending data failed, current max datagram size: {size}, buffer size: {}", src.len()]
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
None
|
match conn.send_datagram(Bytes::copy_from_slice(src)) {
|
||||||
};
|
Ok(_) => Ok(()),
|
||||||
|
Err(err) => match err {
|
||||||
|
SendDatagramError::ConnectionLost(cerr) => Err(Some(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Sending datagram failed: {}", cerr]
|
||||||
|
))),
|
||||||
|
/*
|
||||||
|
* Sending datagram can fail due to change in
|
||||||
|
* max_datagram_size even though we checked
|
||||||
|
* just before trying to send. So check here
|
||||||
|
* again if we should drop buffers if requested
|
||||||
|
* or return an error.
|
||||||
|
*/
|
||||||
|
_ => {
|
||||||
|
if drop_buffer_for_datagram {
|
||||||
|
gst::warning!(CAT, imp = self, "Buffer dropped, error: {err:?}");
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(Some(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Sending datagram failed, error: {err:?}"]
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
gst::warning!(CAT, imp = self, "Datagram unsupported by peer");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok((connection, stream))
|
fn write_stream(
|
||||||
|
&self,
|
||||||
|
stream: &mut SendStream,
|
||||||
|
src: &[u8],
|
||||||
|
timeout: u32,
|
||||||
|
) -> Result<(), Option<gst::ErrorMessage>> {
|
||||||
|
let stream_id = stream.id().index();
|
||||||
|
|
||||||
|
match wait(&self.canceller, stream.write(src), timeout) {
|
||||||
|
Ok(Ok(bytes_written)) => {
|
||||||
|
gst::trace!(
|
||||||
|
CAT,
|
||||||
|
imp = self,
|
||||||
|
"Stream {stream_id} wrote {bytes_written} bytes"
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => match e {
|
||||||
|
/*
|
||||||
|
* We do not expect Streams to be stopped or closed by
|
||||||
|
* remote peer but add a warning and drop buffers for
|
||||||
|
* now. This can be used in future to signal an error
|
||||||
|
* on the stream on peer side and then send a query
|
||||||
|
* upstream to signal multiplexer to release the pad
|
||||||
|
* and close the stream.
|
||||||
|
*/
|
||||||
|
WriteError::Stopped(code) => {
|
||||||
|
gst::warning!(
|
||||||
|
CAT,
|
||||||
|
imp = self,
|
||||||
|
"Dropping buffer, stream {stream_id} stopped: {code}"
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
WriteError::ClosedStream => {
|
||||||
|
gst::warning!(
|
||||||
|
CAT,
|
||||||
|
imp = self,
|
||||||
|
"Dropping buffer, stream {stream_id} closed"
|
||||||
|
);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
_ => Err(Some(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Sending data for stream {stream_id} failed: {e}"]
|
||||||
|
))),
|
||||||
|
},
|
||||||
|
Err(e) => match e {
|
||||||
|
WaitError::FutureAborted => {
|
||||||
|
gst::warning!(CAT, imp = self, "Sending aborted");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
WaitError::FutureError(e) => Err(Some(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Sending for stream {stream_id} failed: {e}"]
|
||||||
|
))),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn close_stream(&self, stream: &mut SendStream, timeout: u32) {
|
||||||
|
/*
|
||||||
|
* Shutdown stream gracefully
|
||||||
|
* send.finish() may fail, but the error is harmless.
|
||||||
|
*/
|
||||||
|
let _ = stream.finish();
|
||||||
|
|
||||||
|
match wait(&self.canceller, stream.stopped(), timeout) {
|
||||||
|
Ok(r) => {
|
||||||
|
if let Err(e) = r {
|
||||||
|
let err_msg = format!("Stream finish request error: {}", e);
|
||||||
|
gst::error!(CAT, imp = self, "{}", err_msg);
|
||||||
|
} else {
|
||||||
|
gst::info!(CAT, imp = self, "Stream {} finished", stream.id());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => match e {
|
||||||
|
WaitError::FutureAborted => {
|
||||||
|
let err_msg = "Stream finish request aborted".to_string();
|
||||||
|
gst::warning!(CAT, imp = self, "{}", err_msg);
|
||||||
|
}
|
||||||
|
WaitError::FutureError(e) => {
|
||||||
|
let err_msg = format!("Stream finish request future error: {}", e);
|
||||||
|
gst::error!(CAT, imp = self, "{}", err_msg);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -258,8 +258,10 @@ fn configure_client(ep_config: &QuinnQuicEndpointConfig) -> Result<ClientConfig,
|
||||||
transport_config
|
transport_config
|
||||||
.datagram_send_buffer_size(ep_config.transport_config.datagram_send_buffer_size);
|
.datagram_send_buffer_size(ep_config.transport_config.datagram_send_buffer_size);
|
||||||
transport_config.max_concurrent_bidi_streams(0u32.into());
|
transport_config.max_concurrent_bidi_streams(0u32.into());
|
||||||
transport_config.max_concurrent_uni_streams(1u32.into());
|
transport_config
|
||||||
|
.max_concurrent_uni_streams(ep_config.transport_config.max_concurrent_uni_streams);
|
||||||
transport_config.mtu_discovery_config(Some(mtu_config));
|
transport_config.mtu_discovery_config(Some(mtu_config));
|
||||||
|
transport_config.send_window(ep_config.transport_config.send_window);
|
||||||
|
|
||||||
transport_config
|
transport_config
|
||||||
};
|
};
|
||||||
|
@ -393,7 +395,8 @@ fn configure_server(
|
||||||
transport_config
|
transport_config
|
||||||
.datagram_send_buffer_size(ep_config.transport_config.datagram_send_buffer_size);
|
.datagram_send_buffer_size(ep_config.transport_config.datagram_send_buffer_size);
|
||||||
transport_config.max_concurrent_bidi_streams(0u32.into());
|
transport_config.max_concurrent_bidi_streams(0u32.into());
|
||||||
transport_config.max_concurrent_uni_streams(1u32.into());
|
transport_config
|
||||||
|
.max_concurrent_uni_streams(ep_config.transport_config.max_concurrent_uni_streams);
|
||||||
transport_config.mtu_discovery_config(Some(mtu_config));
|
transport_config.mtu_discovery_config(Some(mtu_config));
|
||||||
|
|
||||||
transport_config
|
transport_config
|
||||||
|
|
Loading…
Reference in a new issue