net/quinn: Allow setting some parameters from TransportConfig

As of now, we expose the below four properties from `TransportConfig`.
- Initial MTU
- Minimum MTU
- Datagram receive buffer size
- Datagram send buffer size

Maximum UDP payload size from `EndpointConfig` and upper bound from
`MtuDiscoveryConfig` are also exposed as properties.

See the below documentation for further details.
- https://docs.rs/quinn/latest/quinn/struct.TransportConfig.html
- https://docs.rs/quinn/latest/quinn/struct.MtuDiscoveryConfig.html
- https://docs.rs/quinn/latest/quinn/struct.EndpointConfig.html

While at it, also clean up passing function parameters to the functions
in utils.rs.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1613>
This commit is contained in:
Sanchayan Maity 2024-06-13 21:36:50 +05:30
parent bc5ed023e4
commit cf7172248c
5 changed files with 558 additions and 201 deletions

View file

@ -4071,6 +4071,48 @@
"type": "guint",
"writable": true
},
"datagram-receive-buffer-size": {
"blurb": "Maximum number of incoming application datagram bytes to buffer",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1250000",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
},
"datagram-send-buffer-size": {
"blurb": "Maximum number of outgoing application datagram bytes to buffer",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1048576",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
},
"initial-mtu": {
"blurb": "Initial value to be used as the maximum UDP payload size",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1200",
"max": "-1",
"min": "1200",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"keep-alive-interval": {
"blurb": "Keeps QUIC connection alive by periodically pinging the server. Value set in ms, 0 disables this feature",
"conditionally-available": false,
@ -4085,6 +4127,34 @@
"type": "guint64",
"writable": true
},
"max-udp-payload-size": {
"blurb": "Maximum UDP payload size accepted from peers (excluding UDP and IP overhead)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "65527",
"max": "65527",
"min": "1200",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"min-mtu": {
"blurb": "Maximum UDP payload size guaranteed to be supported by the network, must be <= initial-mtu",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1200",
"max": "-1",
"min": "1200",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"port": {
"blurb": "Port of the QUIC server e.g. 5000",
"conditionally-available": false,
@ -4187,6 +4257,20 @@
"type": "guint",
"writable": true
},
"upper-bound-mtu": {
"blurb": "Upper bound to the max UDP payload size that MTU discovery will search for",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1452",
"max": "65527",
"min": "1452",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"use-datagram": {
"blurb": "Use datagram for lower latency, unreliable messaging",
"conditionally-available": false,
@ -4295,6 +4379,48 @@
"type": "gchararray",
"writable": true
},
"datagram-receive-buffer-size": {
"blurb": "Maximum number of incoming application datagram bytes to buffer",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1250000",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
},
"datagram-send-buffer-size": {
"blurb": "Maximum number of outgoing application datagram bytes to buffer",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1048576",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
},
"initial-mtu": {
"blurb": "Initial value to be used as the maximum UDP payload size",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1200",
"max": "-1",
"min": "1200",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"keep-alive-interval": {
"blurb": "Keeps QUIC connection alive by periodically pinging the server. Value set in ms, 0 disables this feature",
"conditionally-available": false,
@ -4309,6 +4435,34 @@
"type": "guint64",
"writable": true
},
"max-udp-payload-size": {
"blurb": "Maximum UDP payload size accepted from peers (excluding UDP and IP overhead)",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "65527",
"max": "65527",
"min": "1200",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"min-mtu": {
"blurb": "Maximum UDP payload size guaranteed to be supported by the network, must be <= initial-mtu",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1200",
"max": "-1",
"min": "1200",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"port": {
"blurb": "Port of the QUIC server e.g. 5000",
"conditionally-available": false,
@ -4411,6 +4565,20 @@
"type": "guint",
"writable": true
},
"upper-bound-mtu": {
"blurb": "Upper bound to the max UDP payload size that MTU discovery will search for",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "1452",
"max": "65527",
"min": "1452",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"use-datagram": {
"blurb": "Use datagram for lower latency, unreliable messaging",
"conditionally-available": false,

View file

@ -13,6 +13,13 @@ pub(crate) static DEFAULT_ADDR: &str = "127.0.0.1";
pub(crate) static DEFAULT_PORT: u16 = 5000;
pub(crate) static DEFAULT_BIND_ADDR: &str = "0.0.0.0";
pub(crate) static DEFAULT_BIND_PORT: u16 = 0;
pub(crate) static DEFAULT_INITIAL_MTU: u16 = 1200;
pub(crate) static DEFAULT_MINIMUM_MTU: u16 = 1200;
pub(crate) static DEFAULT_UPPER_BOUND_MTU: u16 = 1452;
pub(crate) static DEFAULT_MAX_UPPER_BOUND_MTU: u16 = 65527;
pub(crate) static DEFAULT_UDP_PAYLOAD_SIZE: u16 = 1452;
pub(crate) static DEFAULT_MIN_UDP_PAYLOAD_SIZE: u16 = 1200;
pub(crate) static DEFAULT_MAX_UDP_PAYLOAD_SIZE: u16 = 65527;
/*
* For QUIC transport parameters
@ -36,3 +43,33 @@ pub enum QuinnQuicRole {
#[enum_value(name = "Client: Act as QUIC client.", nick = "client")]
Client,
}
#[derive(Debug, PartialEq, Clone, Copy)]
pub struct QuinnQuicTransportConfig {
pub datagram_receive_buffer_size: usize,
pub datagram_send_buffer_size: usize,
pub initial_mtu: u16,
pub max_udp_payload_size: u16,
pub min_mtu: u16,
pub upper_bound_mtu: u16,
}
impl Default for QuinnQuicTransportConfig {
fn default() -> Self {
// Copied from Quinn::TransportConfig defaults
const EXPECTED_RTT: u32 = 100; // ms
const MAX_STREAM_BANDWIDTH: u32 = 12500 * 1000; // bytes/s
// Window size needed to avoid pipeline
// stalls
const STREAM_RWND: u32 = MAX_STREAM_BANDWIDTH / 1000 * EXPECTED_RTT;
Self {
datagram_receive_buffer_size: STREAM_RWND as usize,
datagram_send_buffer_size: 1024 * 1024,
initial_mtu: DEFAULT_INITIAL_MTU,
max_udp_payload_size: DEFAULT_MAX_UDP_PAYLOAD_SIZE,
min_mtu: DEFAULT_MINIMUM_MTU,
upper_bound_mtu: DEFAULT_UPPER_BOUND_MTU,
}
}
}

View file

@ -8,8 +8,8 @@
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{
client_endpoint, make_socket_addr, server_endpoint, wait, WaitError, CONNECTION_CLOSE_CODE,
CONNECTION_CLOSE_MSG,
client_endpoint, make_socket_addr, server_endpoint, wait, QuinnQuicEndpointConfig, WaitError,
CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG,
};
use crate::{common::*, utils};
use bytes::Bytes;
@ -17,7 +17,7 @@ use futures::future;
use gst::{glib, prelude::*, subclass::prelude::*};
use gst_base::subclass::prelude::*;
use once_cell::sync::Lazy;
use quinn::{Connection, SendStream};
use quinn::{Connection, SendStream, TransportConfig};
use std::path::PathBuf;
use std::sync::Mutex;
@ -43,7 +43,7 @@ enum State {
Started(Started),
}
#[derive(Clone, Debug)]
#[derive(Debug)]
struct Settings {
bind_address: String,
bind_port: u16,
@ -58,6 +58,7 @@ struct Settings {
use_datagram: bool,
certificate_file: Option<PathBuf>,
private_key_file: Option<PathBuf>,
transport_config: QuinnQuicTransportConfig,
}
impl Default for Settings {
@ -76,6 +77,7 @@ impl Default for Settings {
use_datagram: false,
certificate_file: None,
private_key_file: None,
transport_config: QuinnQuicTransportConfig::default(),
}
}
}
@ -228,6 +230,40 @@ impl ObjectImpl for QuinnQuicSink {
.blurb("Use datagram for lower latency, unreliable messaging")
.default_value(false)
.build(),
glib::ParamSpecUInt::builder("initial-mtu")
.nick("Initial MTU")
.blurb("Initial value to be used as the maximum UDP payload size")
.minimum(DEFAULT_INITIAL_MTU.into())
.default_value(DEFAULT_INITIAL_MTU.into())
.build(),
glib::ParamSpecUInt::builder("min-mtu")
.nick("Minimum MTU")
.blurb("Maximum UDP payload size guaranteed to be supported by the network, must be <= initial-mtu")
.minimum(DEFAULT_MINIMUM_MTU.into())
.default_value(DEFAULT_MINIMUM_MTU.into())
.build(),
glib::ParamSpecUInt::builder("upper-bound-mtu")
.nick("Upper bound MTU")
.blurb("Upper bound to the max UDP payload size that MTU discovery will search for")
.minimum(DEFAULT_UPPER_BOUND_MTU.into())
.maximum(DEFAULT_MAX_UPPER_BOUND_MTU.into())
.default_value(DEFAULT_UPPER_BOUND_MTU.into())
.build(),
glib::ParamSpecUInt::builder("max-udp-payload-size")
.nick("Maximum UDP payload size")
.blurb("Maximum UDP payload size accepted from peers (excluding UDP and IP overhead)")
.minimum(DEFAULT_MIN_UDP_PAYLOAD_SIZE.into())
.maximum(DEFAULT_MAX_UDP_PAYLOAD_SIZE.into())
.default_value(DEFAULT_UDP_PAYLOAD_SIZE.into())
.build(),
glib::ParamSpecUInt64::builder("datagram-receive-buffer-size")
.nick("Datagram Receiver Buffer Size")
.blurb("Maximum number of incoming application datagram bytes to buffer")
.build(),
glib::ParamSpecUInt64::builder("datagram-send-buffer-size")
.nick("Datagram Send Buffer Size")
.blurb("Maximum number of outgoing application datagram bytes to buffer")
.build(),
]
});
@ -289,6 +325,32 @@ impl ObjectImpl for QuinnQuicSink {
"use-datagram" => {
settings.use_datagram = value.get().expect("type checked upstream");
}
"initial-mtu" => {
let value = value.get::<u32>().expect("type checked upstream");
settings.transport_config.initial_mtu =
value.max(DEFAULT_INITIAL_MTU.into()) as u16;
}
"min-mtu" => {
let value = value.get::<u32>().expect("type checked upstream");
let initial_mtu = settings.transport_config.initial_mtu;
settings.transport_config.min_mtu = value.min(initial_mtu.into()) as u16;
}
"upper-bound-mtu" => {
let value = value.get::<u32>().expect("type checked upstream");
settings.transport_config.upper_bound_mtu = value as u16;
}
"max-udp-payload-size" => {
let value = value.get::<u32>().expect("type checked upstream");
settings.transport_config.max_udp_payload_size = value as u16;
}
"datagram-receive-buffer-size" => {
let value = value.get::<u64>().expect("type checked upstream");
settings.transport_config.datagram_receive_buffer_size = value as usize;
}
"datagram-send-buffer-size" => {
let value = value.get::<u64>().expect("type checked upstream");
settings.transport_config.datagram_send_buffer_size = value as usize;
}
_ => unimplemented!(),
}
}
@ -325,6 +387,18 @@ impl ObjectImpl for QuinnQuicSink {
privkey.and_then(|file| file.to_str()).to_value()
}
"use-datagram" => settings.use_datagram.to_value(),
"initial-mtu" => (settings.transport_config.initial_mtu as u32).to_value(),
"min-mtu" => (settings.transport_config.min_mtu as u32).to_value(),
"upper-bound-mtu" => (settings.transport_config.upper_bound_mtu as u32).to_value(),
"max-udp-payload-size" => {
(settings.transport_config.max_udp_payload_size as u32).to_value()
}
"datagram-receive-buffer-size" => {
(settings.transport_config.datagram_receive_buffer_size as u64).to_value()
}
"datagram-send-buffer-size" => {
(settings.transport_config.datagram_send_buffer_size as u64).to_value()
}
_ => unimplemented!(),
}
}
@ -503,7 +577,7 @@ impl QuinnQuicSink {
if src.len() > size {
return Err(Some(gst::error_msg!(
gst::ResourceError::Failed,
["Sending data failed, current max datagram size: {size}"]
["Sending data failed, current max datagram size: {size}, buffer size: {}", src.len()]
)));
}
@ -545,93 +619,80 @@ impl QuinnQuicSink {
}
async fn init_connection(&self) -> Result<(Connection, Option<SendStream>), WaitError> {
let client_addr;
let server_addr;
let server_name;
let alpns;
let role;
let use_datagram;
let keep_alive_interval;
let secure_conn;
let cert_file;
let private_key_file;
{
let (role, use_datagram, endpoint_config) = {
let settings = self.settings.lock().unwrap();
client_addr = make_socket_addr(
let client_addr = make_socket_addr(
format!("{}:{}", settings.bind_address, settings.bind_port).as_str(),
)?;
server_addr =
let server_addr =
make_socket_addr(format!("{}:{}", settings.address, settings.port).as_str())?;
server_name = settings.server_name.clone();
alpns = settings.alpns.clone();
role = settings.role;
use_datagram = settings.use_datagram;
keep_alive_interval = settings.keep_alive_interval;
secure_conn = settings.secure_conn;
cert_file = settings.certificate_file.clone();
private_key_file = settings.private_key_file.clone();
}
let server_name = settings.server_name.clone();
let alpns = settings.alpns.clone();
let role = settings.role;
let use_datagram = settings.use_datagram;
let keep_alive_interval = settings.keep_alive_interval;
let secure_conn = settings.secure_conn;
let certificate_file = settings.certificate_file.clone();
let private_key_file = settings.private_key_file.clone();
let transport_config = settings.transport_config;
let connection;
match role {
QuinnQuicRole::Server => {
let endpoint = server_endpoint(
(
role,
use_datagram,
QuinnQuicEndpointConfig {
server_addr,
&server_name,
server_name,
client_addr,
secure_conn,
alpns,
cert_file,
certificate_file,
private_key_file,
)
.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to configure endpoint: {}", err]
))
})?;
keep_alive_interval,
transport_config,
},
)
};
let endpoint = match role {
QuinnQuicRole::Server => server_endpoint(&endpoint_config).map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to configure endpoint: {}", err]
))
})?,
QuinnQuicRole::Client => client_endpoint(&endpoint_config).map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to configure endpoint: {}", err]
))
})?,
};
let connection = match role {
QuinnQuicRole::Server => {
let incoming_conn = endpoint.accept().await.unwrap();
connection = incoming_conn.await.map_err(|err| {
incoming_conn.await.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Connection error: {}", err]
))
})?;
})?
}
QuinnQuicRole::Client => {
let endpoint = client_endpoint(
client_addr,
secure_conn,
alpns,
cert_file,
private_key_file,
keep_alive_interval,
)
QuinnQuicRole::Client => endpoint
.connect(endpoint_config.server_addr, &endpoint_config.server_name)
.unwrap()
.await
.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to configure endpoint: {}", err]
["Connection error: {}", err]
))
})?;
connection = endpoint
.connect(server_addr, &server_name)
.unwrap()
.await
.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Connection error: {}", err]
))
})?;
}
}
})?,
};
let stream = if !use_datagram {
let res = connection.open_uni().await.map_err(|err| {

View file

@ -8,8 +8,8 @@
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{
client_endpoint, make_socket_addr, server_endpoint, wait, Canceller, WaitError,
CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG,
client_endpoint, make_socket_addr, server_endpoint, wait, Canceller, QuinnQuicEndpointConfig,
WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG,
};
use crate::{common::*, utils};
use bytes::Bytes;
@ -19,7 +19,7 @@ use gst_base::prelude::*;
use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*;
use once_cell::sync::Lazy;
use quinn::{Connection, ConnectionError, ReadError, RecvStream};
use quinn::{Connection, ConnectionError, ReadError, RecvStream, TransportConfig};
use std::path::PathBuf;
use std::sync::Mutex;
@ -45,7 +45,7 @@ enum State {
Started(Started),
}
#[derive(Clone, Debug)]
#[derive(Debug)]
struct Settings {
address: String,
port: u16,
@ -61,6 +61,7 @@ struct Settings {
use_datagram: bool,
certificate_file: Option<PathBuf>,
private_key_file: Option<PathBuf>,
transport_config: QuinnQuicTransportConfig,
}
impl Default for Settings {
@ -80,6 +81,7 @@ impl Default for Settings {
use_datagram: false,
certificate_file: None,
private_key_file: None,
transport_config: QuinnQuicTransportConfig::default(),
}
}
}
@ -237,6 +239,40 @@ impl ObjectImpl for QuinnQuicSrc {
.blurb("Use datagram for lower latency, unreliable messaging")
.default_value(false)
.build(),
glib::ParamSpecUInt::builder("initial-mtu")
.nick("Initial MTU")
.blurb("Initial value to be used as the maximum UDP payload size")
.minimum(DEFAULT_INITIAL_MTU.into())
.default_value(DEFAULT_INITIAL_MTU.into())
.build(),
glib::ParamSpecUInt::builder("min-mtu")
.nick("Minimum MTU")
.blurb("Maximum UDP payload size guaranteed to be supported by the network, must be <= initial-mtu")
.minimum(DEFAULT_MINIMUM_MTU.into())
.default_value(DEFAULT_MINIMUM_MTU.into())
.build(),
glib::ParamSpecUInt::builder("upper-bound-mtu")
.nick("Upper bound MTU")
.blurb("Upper bound to the max UDP payload size that MTU discovery will search for")
.minimum(DEFAULT_UPPER_BOUND_MTU.into())
.maximum(DEFAULT_MAX_UPPER_BOUND_MTU.into())
.default_value(DEFAULT_UPPER_BOUND_MTU.into())
.build(),
glib::ParamSpecUInt::builder("max-udp-payload-size")
.nick("Maximum UDP payload size")
.blurb("Maximum UDP payload size accepted from peers (excluding UDP and IP overhead)")
.minimum(DEFAULT_MIN_UDP_PAYLOAD_SIZE.into())
.maximum(DEFAULT_MAX_UDP_PAYLOAD_SIZE.into())
.default_value(DEFAULT_UDP_PAYLOAD_SIZE.into())
.build(),
glib::ParamSpecUInt64::builder("datagram-receive-buffer-size")
.nick("Datagram Receiver Buffer Size")
.blurb("Maximum number of incoming application datagram bytes to buffer")
.build(),
glib::ParamSpecUInt64::builder("datagram-send-buffer-size")
.nick("Datagram Send Buffer Size")
.blurb("Maximum number of outgoing application datagram bytes to buffer")
.build(),
]
});
@ -307,6 +343,32 @@ impl ObjectImpl for QuinnQuicSrc {
"use-datagram" => {
settings.use_datagram = value.get().expect("type checked upstream");
}
"initial-mtu" => {
let value = value.get::<u32>().expect("type checked upstream");
settings.transport_config.initial_mtu =
value.max(DEFAULT_INITIAL_MTU.into()) as u16;
}
"min-mtu" => {
let value = value.get::<u32>().expect("type checked upstream");
let initial_mtu = settings.transport_config.initial_mtu;
settings.transport_config.min_mtu = value.min(initial_mtu.into()) as u16;
}
"upper-bound-mtu" => {
let value = value.get::<u32>().expect("type checked upstream");
settings.transport_config.upper_bound_mtu = value as u16;
}
"max-udp-payload-size" => {
let value = value.get::<u32>().expect("type checked upstream");
settings.transport_config.max_udp_payload_size = value as u16;
}
"datagram-receive-buffer-size" => {
let value = value.get::<u64>().expect("type checked upstream");
settings.transport_config.datagram_receive_buffer_size = value as usize;
}
"datagram-send-buffer-size" => {
let value = value.get::<u64>().expect("type checked upstream");
settings.transport_config.datagram_send_buffer_size = value as usize;
}
_ => unimplemented!(),
}
}
@ -344,6 +406,18 @@ impl ObjectImpl for QuinnQuicSrc {
privkey.and_then(|file| file.to_str()).to_value()
}
"use-datagram" => settings.use_datagram.to_value(),
"initial-mtu" => (settings.transport_config.initial_mtu as u32).to_value(),
"min-mtu" => (settings.transport_config.min_mtu as u32).to_value(),
"upper-bound-mtu" => (settings.transport_config.upper_bound_mtu as u32).to_value(),
"max-udp-payload-size" => {
(settings.transport_config.max_udp_payload_size as u32).to_value()
}
"datagram-receive-buffer-size" => {
(settings.transport_config.datagram_receive_buffer_size as u64).to_value()
}
"datagram-send-buffer-size" => {
(settings.transport_config.datagram_send_buffer_size as u64).to_value()
}
_ => unimplemented!(),
}
}
@ -592,93 +666,80 @@ impl QuinnQuicSrc {
}
async fn init_connection(&self) -> Result<(Connection, Option<RecvStream>), WaitError> {
let server_addr;
let server_name;
let client_addr;
let alpns;
let role;
let use_datagram;
let keep_alive_interval;
let secure_conn;
let cert_file;
let private_key_file;
{
let (role, use_datagram, endpoint_config) = {
let settings = self.settings.lock().unwrap();
client_addr = make_socket_addr(
let client_addr = make_socket_addr(
format!("{}:{}", settings.bind_address, settings.bind_port).as_str(),
)?;
server_addr =
let server_addr =
make_socket_addr(format!("{}:{}", settings.address, settings.port).as_str())?;
server_name = settings.server_name.clone();
alpns = settings.alpns.clone();
role = settings.role;
use_datagram = settings.use_datagram;
keep_alive_interval = settings.keep_alive_interval;
secure_conn = settings.secure_conn;
cert_file = settings.certificate_file.clone();
private_key_file = settings.private_key_file.clone();
}
let server_name = settings.server_name.clone();
let alpns = settings.alpns.clone();
let role = settings.role;
let use_datagram = settings.use_datagram;
let keep_alive_interval = settings.keep_alive_interval;
let secure_conn = settings.secure_conn;
let certificate_file = settings.certificate_file.clone();
let private_key_file = settings.private_key_file.clone();
let transport_config = settings.transport_config;
let connection;
match role {
QuinnQuicRole::Server => {
let endpoint = server_endpoint(
(
role,
use_datagram,
QuinnQuicEndpointConfig {
server_addr,
&server_name,
server_name,
client_addr,
secure_conn,
alpns,
cert_file,
certificate_file,
private_key_file,
)
.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to configure endpoint: {}", err]
))
})?;
keep_alive_interval,
transport_config,
},
)
};
let endpoint = match role {
QuinnQuicRole::Server => server_endpoint(&endpoint_config).map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to configure endpoint: {}", err]
))
})?,
QuinnQuicRole::Client => client_endpoint(&endpoint_config).map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to configure endpoint: {}", err]
))
})?,
};
let connection = match role {
QuinnQuicRole::Server => {
let incoming_conn = endpoint.accept().await.unwrap();
connection = incoming_conn.await.map_err(|err| {
incoming_conn.await.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Connection error: {}", err]
))
})?;
})?
}
QuinnQuicRole::Client => {
let endpoint = client_endpoint(
client_addr,
secure_conn,
alpns,
cert_file,
private_key_file,
keep_alive_interval,
)
QuinnQuicRole::Client => endpoint
.connect(endpoint_config.server_addr, &endpoint_config.server_name)
.unwrap()
.await
.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to configure endpoint: {}", err]
["Connection error: {}", err]
))
})?;
connection = endpoint
.connect(server_addr, &server_name)
.unwrap()
.await
.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Connection error: {}", err]
))
})?;
}
}
})?,
};
let stream = if !use_datagram {
let res = connection.accept_uni().await.map_err(|err| {

View file

@ -7,13 +7,14 @@
//
// SPDX-License-Identifier: MPL-2.0
use crate::common::*;
use futures::future;
use futures::prelude::*;
use gst::ErrorMessage;
use once_cell::sync::Lazy;
use quinn::{
crypto::rustls::QuicClientConfig, crypto::rustls::QuicServerConfig, ClientConfig, Endpoint,
ServerConfig, TransportConfig,
crypto::rustls::QuicClientConfig, crypto::rustls::QuicServerConfig, default_runtime,
ClientConfig, Endpoint, EndpointConfig, MtuDiscoveryConfig, ServerConfig, TransportConfig,
};
use std::error::Error;
use std::fs::File;
@ -28,6 +29,19 @@ use tokio::runtime;
pub const CONNECTION_CLOSE_CODE: u32 = 0;
pub const CONNECTION_CLOSE_MSG: &str = "Stopped";
#[derive(Debug)]
pub struct QuinnQuicEndpointConfig {
pub server_addr: SocketAddr,
pub server_name: String,
pub client_addr: SocketAddr,
pub secure_conn: bool,
pub alpns: Vec<String>,
pub certificate_file: Option<PathBuf>,
pub private_key_file: Option<PathBuf>,
pub keep_alive_interval: u64,
pub transport_config: QuinnQuicTransportConfig,
}
#[derive(Error, Debug)]
pub enum WaitError {
#[error("Future aborted")]
@ -196,15 +210,12 @@ impl rustls::client::danger::ServerCertVerifier for SkipServerVerification {
}
}
fn configure_client(
secure_conn: bool,
certificate_file: Option<PathBuf>,
private_key_file: Option<PathBuf>,
alpns: Vec<String>,
keep_alive_interval_ms: u64,
) -> Result<ClientConfig, Box<dyn Error>> {
let mut crypto = if secure_conn {
let (certs, key) = read_certs_from_file(certificate_file, private_key_file)?;
fn configure_client(ep_config: &QuinnQuicEndpointConfig) -> Result<ClientConfig, Box<dyn Error>> {
let mut crypto = if ep_config.secure_conn {
let (certs, key) = read_certs_from_file(
ep_config.certificate_file.clone(),
ep_config.private_key_file.clone(),
)?;
let mut cert_store = rustls::RootCertStore::empty();
cert_store.add_parsable_certificates(certs.clone());
@ -218,20 +229,41 @@ fn configure_client(
.with_no_client_auth()
};
let alpn_protocols: Vec<Vec<u8>> = alpns
let alpn_protocols: Vec<Vec<u8>> = ep_config
.alpns
.iter()
.map(|x| x.as_bytes().to_vec())
.collect::<Vec<_>>();
crypto.alpn_protocols = alpn_protocols;
crypto.key_log = Arc::new(rustls::KeyLogFile::new());
let mut client_config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto)?));
if keep_alive_interval_ms > 0 {
let transport_config = {
let mtu_config = MtuDiscoveryConfig::default()
.upper_bound(ep_config.transport_config.upper_bound_mtu)
.to_owned();
let mut transport_config = TransportConfig::default();
transport_config.keep_alive_interval(Some(Duration::from_millis(keep_alive_interval_ms)));
client_config.transport_config(Arc::new(transport_config));
}
if ep_config.keep_alive_interval > 0 {
transport_config
.keep_alive_interval(Some(Duration::from_millis(ep_config.keep_alive_interval)));
}
transport_config.initial_mtu(ep_config.transport_config.initial_mtu);
transport_config.min_mtu(ep_config.transport_config.min_mtu);
transport_config.datagram_receive_buffer_size(Some(
ep_config.transport_config.datagram_receive_buffer_size,
));
transport_config
.datagram_send_buffer_size(ep_config.transport_config.datagram_send_buffer_size);
transport_config.max_concurrent_bidi_streams(0u32.into());
transport_config.max_concurrent_uni_streams(1u32.into());
transport_config.mtu_discovery_config(Some(mtu_config));
transport_config
};
let client_config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto)?))
.transport_config(Arc::new(transport_config))
.to_owned();
Ok(client_config)
}
@ -294,17 +326,16 @@ fn read_certs_from_file(
}
fn configure_server(
server_name: &str,
secure_conn: bool,
certificate_file: Option<PathBuf>,
private_key_file: Option<PathBuf>,
alpns: Vec<String>,
ep_config: &QuinnQuicEndpointConfig,
) -> Result<(ServerConfig, Vec<rustls_pki_types::CertificateDer>), Box<dyn Error>> {
let (certs, key) = if secure_conn {
read_certs_from_file(certificate_file, private_key_file)?
let (certs, key) = if ep_config.secure_conn {
read_certs_from_file(
ep_config.certificate_file.clone(),
ep_config.private_key_file.clone(),
)?
} else {
let rcgen::CertifiedKey { cert: _, key_pair } =
rcgen::generate_simple_self_signed(vec![server_name.into()]).unwrap();
rcgen::generate_simple_self_signed(vec![ep_config.server_name.clone()]).unwrap();
let cert_der = key_pair.serialize_der();
let priv_key = rustls_pki_types::PrivateKeyDer::try_from(cert_der.clone()).unwrap();
let cert_chain = vec![rustls_pki_types::CertificateDer::from(cert_der)];
@ -312,7 +343,7 @@ fn configure_server(
(cert_chain, priv_key)
};
let mut crypto = if secure_conn {
let mut crypto = if ep_config.secure_conn {
let mut cert_store = rustls::RootCertStore::empty();
cert_store.add_parsable_certificates(certs.clone());
@ -328,59 +359,58 @@ fn configure_server(
.with_single_cert(certs.clone(), key)
}?;
let alpn_protocols: Vec<Vec<u8>> = alpns
let alpn_protocols: Vec<Vec<u8>> = ep_config
.alpns
.iter()
.map(|x| x.as_bytes().to_vec())
.collect::<Vec<_>>();
crypto.alpn_protocols = alpn_protocols;
crypto.key_log = Arc::new(rustls::KeyLogFile::new());
let mut server_config =
ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(crypto)?));
Arc::get_mut(&mut server_config.transport)
.unwrap()
.max_concurrent_bidi_streams(0_u8.into())
.max_concurrent_uni_streams(1_u8.into());
let transport_config = {
let mtu_config = MtuDiscoveryConfig::default()
.upper_bound(ep_config.transport_config.upper_bound_mtu)
.to_owned();
let mut transport_config = TransportConfig::default();
transport_config.initial_mtu(ep_config.transport_config.initial_mtu);
transport_config.min_mtu(ep_config.transport_config.min_mtu);
transport_config.datagram_receive_buffer_size(Some(
ep_config.transport_config.datagram_receive_buffer_size,
));
transport_config
.datagram_send_buffer_size(ep_config.transport_config.datagram_send_buffer_size);
transport_config.max_concurrent_bidi_streams(0u32.into());
transport_config.max_concurrent_uni_streams(1u32.into());
transport_config.mtu_discovery_config(Some(mtu_config));
transport_config
};
let server_config = ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(crypto)?))
.transport_config(Arc::new(transport_config))
.to_owned();
Ok((server_config, certs))
}
pub fn server_endpoint(
server_addr: SocketAddr,
server_name: &str,
secure_conn: bool,
alpns: Vec<String>,
certificate_file: Option<PathBuf>,
private_key_file: Option<PathBuf>,
) -> Result<Endpoint, Box<dyn Error>> {
let (server_config, _) = configure_server(
server_name,
secure_conn,
certificate_file,
private_key_file,
alpns,
)?;
let endpoint = Endpoint::server(server_config, server_addr)?;
pub fn server_endpoint(ep_config: &QuinnQuicEndpointConfig) -> Result<Endpoint, Box<dyn Error>> {
let (server_config, _) = configure_server(ep_config)?;
let socket = std::net::UdpSocket::bind(ep_config.server_addr)?;
let runtime = default_runtime()
.ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "No async runtime found"))?;
let endpoint_config = EndpointConfig::default()
.max_udp_payload_size(ep_config.transport_config.max_udp_payload_size)
.unwrap()
.to_owned();
let endpoint = Endpoint::new(endpoint_config, Some(server_config), socket, runtime)?;
Ok(endpoint)
}
pub fn client_endpoint(
client_addr: SocketAddr,
secure_conn: bool,
alpns: Vec<String>,
certificate_file: Option<PathBuf>,
private_key_file: Option<PathBuf>,
keep_alive_interval_ms: u64,
) -> Result<Endpoint, Box<dyn Error>> {
let client_cfg = configure_client(
secure_conn,
certificate_file,
private_key_file,
alpns,
keep_alive_interval_ms,
)?;
let mut endpoint = Endpoint::client(client_addr)?;
pub fn client_endpoint(ep_config: &QuinnQuicEndpointConfig) -> Result<Endpoint, Box<dyn Error>> {
let client_cfg = configure_client(ep_config)?;
let mut endpoint = Endpoint::client(ep_config.client_addr)?;
endpoint.set_default_client_config(client_cfg);