From cf7172248cfe4e52a339789325742893c14d9ce8 Mon Sep 17 00:00:00 2001 From: Sanchayan Maity Date: Thu, 13 Jun 2024 21:36:50 +0530 Subject: [PATCH] net/quinn: Allow setting some parameters from TransportConfig As of now, we expose the below four properties from `TransportConfig`. - Initial MTU - Minimum MTU - Datagram receive buffer size - Datagram send buffer size Maximum UDP payload size from `EndpointConfig` and upper bound from `MtuDiscoveryConfig` are also exposed as properties. See the below documentation for further details. - https://docs.rs/quinn/latest/quinn/struct.TransportConfig.html - https://docs.rs/quinn/latest/quinn/struct.MtuDiscoveryConfig.html - https://docs.rs/quinn/latest/quinn/struct.EndpointConfig.html While at it, also clean up passing function parameters to the functions in utils.rs. Part-of: --- docs/plugins/gst_plugins_cache.json | 168 ++++++++++++++++++++++++ net/quinn/src/common.rs | 37 ++++++ net/quinn/src/quinnquicsink/imp.rs | 197 ++++++++++++++++++---------- net/quinn/src/quinnquicsrc/imp.rs | 195 +++++++++++++++++---------- net/quinn/src/utils.rs | 162 +++++++++++++---------- 5 files changed, 558 insertions(+), 201 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index f72f0657..13164fbf 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -4071,6 +4071,48 @@ "type": "guint", "writable": true }, + "datagram-receive-buffer-size": { + "blurb": "Maximum number of incoming application datagram bytes to buffer", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1250000", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": true + }, + "datagram-send-buffer-size": { + "blurb": "Maximum number of outgoing application datagram bytes to buffer", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1048576", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": true + }, + "initial-mtu": { + "blurb": "Initial value to be used as the maximum UDP payload size", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1200", + "max": "-1", + "min": "1200", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, "keep-alive-interval": { "blurb": "Keeps QUIC connection alive by periodically pinging the server. Value set in ms, 0 disables this feature", "conditionally-available": false, @@ -4085,6 +4127,34 @@ "type": "guint64", "writable": true }, + "max-udp-payload-size": { + "blurb": "Maximum UDP payload size accepted from peers (excluding UDP and IP overhead)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "65527", + "max": "65527", + "min": "1200", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, + "min-mtu": { + "blurb": "Maximum UDP payload size guaranteed to be supported by the network, must be <= initial-mtu", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1200", + "max": "-1", + "min": "1200", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, "port": { "blurb": "Port of the QUIC server e.g. 5000", "conditionally-available": false, @@ -4187,6 +4257,20 @@ "type": "guint", "writable": true }, + "upper-bound-mtu": { + "blurb": "Upper bound to the max UDP payload size that MTU discovery will search for", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1452", + "max": "65527", + "min": "1452", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, "use-datagram": { "blurb": "Use datagram for lower latency, unreliable messaging", "conditionally-available": false, @@ -4295,6 +4379,48 @@ "type": "gchararray", "writable": true }, + "datagram-receive-buffer-size": { + "blurb": "Maximum number of incoming application datagram bytes to buffer", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1250000", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": true + }, + "datagram-send-buffer-size": { + "blurb": "Maximum number of outgoing application datagram bytes to buffer", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1048576", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": true + }, + "initial-mtu": { + "blurb": "Initial value to be used as the maximum UDP payload size", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1200", + "max": "-1", + "min": "1200", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, "keep-alive-interval": { "blurb": "Keeps QUIC connection alive by periodically pinging the server. Value set in ms, 0 disables this feature", "conditionally-available": false, @@ -4309,6 +4435,34 @@ "type": "guint64", "writable": true }, + "max-udp-payload-size": { + "blurb": "Maximum UDP payload size accepted from peers (excluding UDP and IP overhead)", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "65527", + "max": "65527", + "min": "1200", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, + "min-mtu": { + "blurb": "Maximum UDP payload size guaranteed to be supported by the network, must be <= initial-mtu", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1200", + "max": "-1", + "min": "1200", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, "port": { "blurb": "Port of the QUIC server e.g. 5000", "conditionally-available": false, @@ -4411,6 +4565,20 @@ "type": "guint", "writable": true }, + "upper-bound-mtu": { + "blurb": "Upper bound to the max UDP payload size that MTU discovery will search for", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "1452", + "max": "65527", + "min": "1452", + "mutable": "null", + "readable": true, + "type": "guint", + "writable": true + }, "use-datagram": { "blurb": "Use datagram for lower latency, unreliable messaging", "conditionally-available": false, diff --git a/net/quinn/src/common.rs b/net/quinn/src/common.rs index 35424e51..8f23dbb3 100644 --- a/net/quinn/src/common.rs +++ b/net/quinn/src/common.rs @@ -13,6 +13,13 @@ pub(crate) static DEFAULT_ADDR: &str = "127.0.0.1"; pub(crate) static DEFAULT_PORT: u16 = 5000; pub(crate) static DEFAULT_BIND_ADDR: &str = "0.0.0.0"; pub(crate) static DEFAULT_BIND_PORT: u16 = 0; +pub(crate) static DEFAULT_INITIAL_MTU: u16 = 1200; +pub(crate) static DEFAULT_MINIMUM_MTU: u16 = 1200; +pub(crate) static DEFAULT_UPPER_BOUND_MTU: u16 = 1452; +pub(crate) static DEFAULT_MAX_UPPER_BOUND_MTU: u16 = 65527; +pub(crate) static DEFAULT_UDP_PAYLOAD_SIZE: u16 = 1452; +pub(crate) static DEFAULT_MIN_UDP_PAYLOAD_SIZE: u16 = 1200; +pub(crate) static DEFAULT_MAX_UDP_PAYLOAD_SIZE: u16 = 65527; /* * For QUIC transport parameters @@ -36,3 +43,33 @@ pub enum QuinnQuicRole { #[enum_value(name = "Client: Act as QUIC client.", nick = "client")] Client, } + +#[derive(Debug, PartialEq, Clone, Copy)] +pub struct QuinnQuicTransportConfig { + pub datagram_receive_buffer_size: usize, + pub datagram_send_buffer_size: usize, + pub initial_mtu: u16, + pub max_udp_payload_size: u16, + pub min_mtu: u16, + pub upper_bound_mtu: u16, +} + +impl Default for QuinnQuicTransportConfig { + fn default() -> Self { + // Copied from Quinn::TransportConfig defaults + const EXPECTED_RTT: u32 = 100; // ms + const MAX_STREAM_BANDWIDTH: u32 = 12500 * 1000; // bytes/s + // Window size needed to avoid pipeline + // stalls + const STREAM_RWND: u32 = MAX_STREAM_BANDWIDTH / 1000 * EXPECTED_RTT; + + Self { + datagram_receive_buffer_size: STREAM_RWND as usize, + datagram_send_buffer_size: 1024 * 1024, + initial_mtu: DEFAULT_INITIAL_MTU, + max_udp_payload_size: DEFAULT_MAX_UDP_PAYLOAD_SIZE, + min_mtu: DEFAULT_MINIMUM_MTU, + upper_bound_mtu: DEFAULT_UPPER_BOUND_MTU, + } + } +} diff --git a/net/quinn/src/quinnquicsink/imp.rs b/net/quinn/src/quinnquicsink/imp.rs index f731df7f..59ebf567 100644 --- a/net/quinn/src/quinnquicsink/imp.rs +++ b/net/quinn/src/quinnquicsink/imp.rs @@ -8,8 +8,8 @@ // SPDX-License-Identifier: MPL-2.0 use crate::utils::{ - client_endpoint, make_socket_addr, server_endpoint, wait, WaitError, CONNECTION_CLOSE_CODE, - CONNECTION_CLOSE_MSG, + client_endpoint, make_socket_addr, server_endpoint, wait, QuinnQuicEndpointConfig, WaitError, + CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, }; use crate::{common::*, utils}; use bytes::Bytes; @@ -17,7 +17,7 @@ use futures::future; use gst::{glib, prelude::*, subclass::prelude::*}; use gst_base::subclass::prelude::*; use once_cell::sync::Lazy; -use quinn::{Connection, SendStream}; +use quinn::{Connection, SendStream, TransportConfig}; use std::path::PathBuf; use std::sync::Mutex; @@ -43,7 +43,7 @@ enum State { Started(Started), } -#[derive(Clone, Debug)] +#[derive(Debug)] struct Settings { bind_address: String, bind_port: u16, @@ -58,6 +58,7 @@ struct Settings { use_datagram: bool, certificate_file: Option, private_key_file: Option, + transport_config: QuinnQuicTransportConfig, } impl Default for Settings { @@ -76,6 +77,7 @@ impl Default for Settings { use_datagram: false, certificate_file: None, private_key_file: None, + transport_config: QuinnQuicTransportConfig::default(), } } } @@ -228,6 +230,40 @@ impl ObjectImpl for QuinnQuicSink { .blurb("Use datagram for lower latency, unreliable messaging") .default_value(false) .build(), + glib::ParamSpecUInt::builder("initial-mtu") + .nick("Initial MTU") + .blurb("Initial value to be used as the maximum UDP payload size") + .minimum(DEFAULT_INITIAL_MTU.into()) + .default_value(DEFAULT_INITIAL_MTU.into()) + .build(), + glib::ParamSpecUInt::builder("min-mtu") + .nick("Minimum MTU") + .blurb("Maximum UDP payload size guaranteed to be supported by the network, must be <= initial-mtu") + .minimum(DEFAULT_MINIMUM_MTU.into()) + .default_value(DEFAULT_MINIMUM_MTU.into()) + .build(), + glib::ParamSpecUInt::builder("upper-bound-mtu") + .nick("Upper bound MTU") + .blurb("Upper bound to the max UDP payload size that MTU discovery will search for") + .minimum(DEFAULT_UPPER_BOUND_MTU.into()) + .maximum(DEFAULT_MAX_UPPER_BOUND_MTU.into()) + .default_value(DEFAULT_UPPER_BOUND_MTU.into()) + .build(), + glib::ParamSpecUInt::builder("max-udp-payload-size") + .nick("Maximum UDP payload size") + .blurb("Maximum UDP payload size accepted from peers (excluding UDP and IP overhead)") + .minimum(DEFAULT_MIN_UDP_PAYLOAD_SIZE.into()) + .maximum(DEFAULT_MAX_UDP_PAYLOAD_SIZE.into()) + .default_value(DEFAULT_UDP_PAYLOAD_SIZE.into()) + .build(), + glib::ParamSpecUInt64::builder("datagram-receive-buffer-size") + .nick("Datagram Receiver Buffer Size") + .blurb("Maximum number of incoming application datagram bytes to buffer") + .build(), + glib::ParamSpecUInt64::builder("datagram-send-buffer-size") + .nick("Datagram Send Buffer Size") + .blurb("Maximum number of outgoing application datagram bytes to buffer") + .build(), ] }); @@ -289,6 +325,32 @@ impl ObjectImpl for QuinnQuicSink { "use-datagram" => { settings.use_datagram = value.get().expect("type checked upstream"); } + "initial-mtu" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.initial_mtu = + value.max(DEFAULT_INITIAL_MTU.into()) as u16; + } + "min-mtu" => { + let value = value.get::().expect("type checked upstream"); + let initial_mtu = settings.transport_config.initial_mtu; + settings.transport_config.min_mtu = value.min(initial_mtu.into()) as u16; + } + "upper-bound-mtu" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.upper_bound_mtu = value as u16; + } + "max-udp-payload-size" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.max_udp_payload_size = value as u16; + } + "datagram-receive-buffer-size" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.datagram_receive_buffer_size = value as usize; + } + "datagram-send-buffer-size" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.datagram_send_buffer_size = value as usize; + } _ => unimplemented!(), } } @@ -325,6 +387,18 @@ impl ObjectImpl for QuinnQuicSink { privkey.and_then(|file| file.to_str()).to_value() } "use-datagram" => settings.use_datagram.to_value(), + "initial-mtu" => (settings.transport_config.initial_mtu as u32).to_value(), + "min-mtu" => (settings.transport_config.min_mtu as u32).to_value(), + "upper-bound-mtu" => (settings.transport_config.upper_bound_mtu as u32).to_value(), + "max-udp-payload-size" => { + (settings.transport_config.max_udp_payload_size as u32).to_value() + } + "datagram-receive-buffer-size" => { + (settings.transport_config.datagram_receive_buffer_size as u64).to_value() + } + "datagram-send-buffer-size" => { + (settings.transport_config.datagram_send_buffer_size as u64).to_value() + } _ => unimplemented!(), } } @@ -503,7 +577,7 @@ impl QuinnQuicSink { if src.len() > size { return Err(Some(gst::error_msg!( gst::ResourceError::Failed, - ["Sending data failed, current max datagram size: {size}"] + ["Sending data failed, current max datagram size: {size}, buffer size: {}", src.len()] ))); } @@ -545,93 +619,80 @@ impl QuinnQuicSink { } async fn init_connection(&self) -> Result<(Connection, Option), WaitError> { - let client_addr; - let server_addr; - let server_name; - let alpns; - let role; - let use_datagram; - let keep_alive_interval; - let secure_conn; - let cert_file; - let private_key_file; - - { + let (role, use_datagram, endpoint_config) = { let settings = self.settings.lock().unwrap(); - client_addr = make_socket_addr( + let client_addr = make_socket_addr( format!("{}:{}", settings.bind_address, settings.bind_port).as_str(), )?; - server_addr = + let server_addr = make_socket_addr(format!("{}:{}", settings.address, settings.port).as_str())?; - server_name = settings.server_name.clone(); - alpns = settings.alpns.clone(); - role = settings.role; - use_datagram = settings.use_datagram; - keep_alive_interval = settings.keep_alive_interval; - secure_conn = settings.secure_conn; - cert_file = settings.certificate_file.clone(); - private_key_file = settings.private_key_file.clone(); - } + let server_name = settings.server_name.clone(); + let alpns = settings.alpns.clone(); + let role = settings.role; + let use_datagram = settings.use_datagram; + let keep_alive_interval = settings.keep_alive_interval; + let secure_conn = settings.secure_conn; + let certificate_file = settings.certificate_file.clone(); + let private_key_file = settings.private_key_file.clone(); + let transport_config = settings.transport_config; - let connection; - - match role { - QuinnQuicRole::Server => { - let endpoint = server_endpoint( + ( + role, + use_datagram, + QuinnQuicEndpointConfig { server_addr, - &server_name, + server_name, + client_addr, secure_conn, alpns, - cert_file, + certificate_file, private_key_file, - ) - .map_err(|err| { - WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Failed to configure endpoint: {}", err] - )) - })?; + keep_alive_interval, + transport_config, + }, + ) + }; + let endpoint = match role { + QuinnQuicRole::Server => server_endpoint(&endpoint_config).map_err(|err| { + WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to configure endpoint: {}", err] + )) + })?, + QuinnQuicRole::Client => client_endpoint(&endpoint_config).map_err(|err| { + WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to configure endpoint: {}", err] + )) + })?, + }; + + let connection = match role { + QuinnQuicRole::Server => { let incoming_conn = endpoint.accept().await.unwrap(); - connection = incoming_conn.await.map_err(|err| { + incoming_conn.await.map_err(|err| { WaitError::FutureError(gst::error_msg!( gst::ResourceError::Failed, ["Connection error: {}", err] )) - })?; + })? } - QuinnQuicRole::Client => { - let endpoint = client_endpoint( - client_addr, - secure_conn, - alpns, - cert_file, - private_key_file, - keep_alive_interval, - ) + QuinnQuicRole::Client => endpoint + .connect(endpoint_config.server_addr, &endpoint_config.server_name) + .unwrap() + .await .map_err(|err| { WaitError::FutureError(gst::error_msg!( gst::ResourceError::Failed, - ["Failed to configure endpoint: {}", err] + ["Connection error: {}", err] )) - })?; - - connection = endpoint - .connect(server_addr, &server_name) - .unwrap() - .await - .map_err(|err| { - WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Connection error: {}", err] - )) - })?; - } - } + })?, + }; let stream = if !use_datagram { let res = connection.open_uni().await.map_err(|err| { diff --git a/net/quinn/src/quinnquicsrc/imp.rs b/net/quinn/src/quinnquicsrc/imp.rs index 5bdd8cc2..627de638 100644 --- a/net/quinn/src/quinnquicsrc/imp.rs +++ b/net/quinn/src/quinnquicsrc/imp.rs @@ -8,8 +8,8 @@ // SPDX-License-Identifier: MPL-2.0 use crate::utils::{ - client_endpoint, make_socket_addr, server_endpoint, wait, Canceller, WaitError, - CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, + client_endpoint, make_socket_addr, server_endpoint, wait, Canceller, QuinnQuicEndpointConfig, + WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, }; use crate::{common::*, utils}; use bytes::Bytes; @@ -19,7 +19,7 @@ use gst_base::prelude::*; use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; use once_cell::sync::Lazy; -use quinn::{Connection, ConnectionError, ReadError, RecvStream}; +use quinn::{Connection, ConnectionError, ReadError, RecvStream, TransportConfig}; use std::path::PathBuf; use std::sync::Mutex; @@ -45,7 +45,7 @@ enum State { Started(Started), } -#[derive(Clone, Debug)] +#[derive(Debug)] struct Settings { address: String, port: u16, @@ -61,6 +61,7 @@ struct Settings { use_datagram: bool, certificate_file: Option, private_key_file: Option, + transport_config: QuinnQuicTransportConfig, } impl Default for Settings { @@ -80,6 +81,7 @@ impl Default for Settings { use_datagram: false, certificate_file: None, private_key_file: None, + transport_config: QuinnQuicTransportConfig::default(), } } } @@ -237,6 +239,40 @@ impl ObjectImpl for QuinnQuicSrc { .blurb("Use datagram for lower latency, unreliable messaging") .default_value(false) .build(), + glib::ParamSpecUInt::builder("initial-mtu") + .nick("Initial MTU") + .blurb("Initial value to be used as the maximum UDP payload size") + .minimum(DEFAULT_INITIAL_MTU.into()) + .default_value(DEFAULT_INITIAL_MTU.into()) + .build(), + glib::ParamSpecUInt::builder("min-mtu") + .nick("Minimum MTU") + .blurb("Maximum UDP payload size guaranteed to be supported by the network, must be <= initial-mtu") + .minimum(DEFAULT_MINIMUM_MTU.into()) + .default_value(DEFAULT_MINIMUM_MTU.into()) + .build(), + glib::ParamSpecUInt::builder("upper-bound-mtu") + .nick("Upper bound MTU") + .blurb("Upper bound to the max UDP payload size that MTU discovery will search for") + .minimum(DEFAULT_UPPER_BOUND_MTU.into()) + .maximum(DEFAULT_MAX_UPPER_BOUND_MTU.into()) + .default_value(DEFAULT_UPPER_BOUND_MTU.into()) + .build(), + glib::ParamSpecUInt::builder("max-udp-payload-size") + .nick("Maximum UDP payload size") + .blurb("Maximum UDP payload size accepted from peers (excluding UDP and IP overhead)") + .minimum(DEFAULT_MIN_UDP_PAYLOAD_SIZE.into()) + .maximum(DEFAULT_MAX_UDP_PAYLOAD_SIZE.into()) + .default_value(DEFAULT_UDP_PAYLOAD_SIZE.into()) + .build(), + glib::ParamSpecUInt64::builder("datagram-receive-buffer-size") + .nick("Datagram Receiver Buffer Size") + .blurb("Maximum number of incoming application datagram bytes to buffer") + .build(), + glib::ParamSpecUInt64::builder("datagram-send-buffer-size") + .nick("Datagram Send Buffer Size") + .blurb("Maximum number of outgoing application datagram bytes to buffer") + .build(), ] }); @@ -307,6 +343,32 @@ impl ObjectImpl for QuinnQuicSrc { "use-datagram" => { settings.use_datagram = value.get().expect("type checked upstream"); } + "initial-mtu" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.initial_mtu = + value.max(DEFAULT_INITIAL_MTU.into()) as u16; + } + "min-mtu" => { + let value = value.get::().expect("type checked upstream"); + let initial_mtu = settings.transport_config.initial_mtu; + settings.transport_config.min_mtu = value.min(initial_mtu.into()) as u16; + } + "upper-bound-mtu" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.upper_bound_mtu = value as u16; + } + "max-udp-payload-size" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.max_udp_payload_size = value as u16; + } + "datagram-receive-buffer-size" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.datagram_receive_buffer_size = value as usize; + } + "datagram-send-buffer-size" => { + let value = value.get::().expect("type checked upstream"); + settings.transport_config.datagram_send_buffer_size = value as usize; + } _ => unimplemented!(), } } @@ -344,6 +406,18 @@ impl ObjectImpl for QuinnQuicSrc { privkey.and_then(|file| file.to_str()).to_value() } "use-datagram" => settings.use_datagram.to_value(), + "initial-mtu" => (settings.transport_config.initial_mtu as u32).to_value(), + "min-mtu" => (settings.transport_config.min_mtu as u32).to_value(), + "upper-bound-mtu" => (settings.transport_config.upper_bound_mtu as u32).to_value(), + "max-udp-payload-size" => { + (settings.transport_config.max_udp_payload_size as u32).to_value() + } + "datagram-receive-buffer-size" => { + (settings.transport_config.datagram_receive_buffer_size as u64).to_value() + } + "datagram-send-buffer-size" => { + (settings.transport_config.datagram_send_buffer_size as u64).to_value() + } _ => unimplemented!(), } } @@ -592,93 +666,80 @@ impl QuinnQuicSrc { } async fn init_connection(&self) -> Result<(Connection, Option), WaitError> { - let server_addr; - let server_name; - let client_addr; - let alpns; - let role; - let use_datagram; - let keep_alive_interval; - let secure_conn; - let cert_file; - let private_key_file; - - { + let (role, use_datagram, endpoint_config) = { let settings = self.settings.lock().unwrap(); - client_addr = make_socket_addr( + let client_addr = make_socket_addr( format!("{}:{}", settings.bind_address, settings.bind_port).as_str(), )?; - server_addr = + let server_addr = make_socket_addr(format!("{}:{}", settings.address, settings.port).as_str())?; - server_name = settings.server_name.clone(); - alpns = settings.alpns.clone(); - role = settings.role; - use_datagram = settings.use_datagram; - keep_alive_interval = settings.keep_alive_interval; - secure_conn = settings.secure_conn; - cert_file = settings.certificate_file.clone(); - private_key_file = settings.private_key_file.clone(); - } + let server_name = settings.server_name.clone(); + let alpns = settings.alpns.clone(); + let role = settings.role; + let use_datagram = settings.use_datagram; + let keep_alive_interval = settings.keep_alive_interval; + let secure_conn = settings.secure_conn; + let certificate_file = settings.certificate_file.clone(); + let private_key_file = settings.private_key_file.clone(); + let transport_config = settings.transport_config; - let connection; - - match role { - QuinnQuicRole::Server => { - let endpoint = server_endpoint( + ( + role, + use_datagram, + QuinnQuicEndpointConfig { server_addr, - &server_name, + server_name, + client_addr, secure_conn, alpns, - cert_file, + certificate_file, private_key_file, - ) - .map_err(|err| { - WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Failed to configure endpoint: {}", err] - )) - })?; + keep_alive_interval, + transport_config, + }, + ) + }; + let endpoint = match role { + QuinnQuicRole::Server => server_endpoint(&endpoint_config).map_err(|err| { + WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to configure endpoint: {}", err] + )) + })?, + QuinnQuicRole::Client => client_endpoint(&endpoint_config).map_err(|err| { + WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to configure endpoint: {}", err] + )) + })?, + }; + + let connection = match role { + QuinnQuicRole::Server => { let incoming_conn = endpoint.accept().await.unwrap(); - connection = incoming_conn.await.map_err(|err| { + incoming_conn.await.map_err(|err| { WaitError::FutureError(gst::error_msg!( gst::ResourceError::Failed, ["Connection error: {}", err] )) - })?; + })? } - QuinnQuicRole::Client => { - let endpoint = client_endpoint( - client_addr, - secure_conn, - alpns, - cert_file, - private_key_file, - keep_alive_interval, - ) + QuinnQuicRole::Client => endpoint + .connect(endpoint_config.server_addr, &endpoint_config.server_name) + .unwrap() + .await .map_err(|err| { WaitError::FutureError(gst::error_msg!( gst::ResourceError::Failed, - ["Failed to configure endpoint: {}", err] + ["Connection error: {}", err] )) - })?; - - connection = endpoint - .connect(server_addr, &server_name) - .unwrap() - .await - .map_err(|err| { - WaitError::FutureError(gst::error_msg!( - gst::ResourceError::Failed, - ["Connection error: {}", err] - )) - })?; - } - } + })?, + }; let stream = if !use_datagram { let res = connection.accept_uni().await.map_err(|err| { diff --git a/net/quinn/src/utils.rs b/net/quinn/src/utils.rs index 39a05c9e..e5a1533e 100644 --- a/net/quinn/src/utils.rs +++ b/net/quinn/src/utils.rs @@ -7,13 +7,14 @@ // // SPDX-License-Identifier: MPL-2.0 +use crate::common::*; use futures::future; use futures::prelude::*; use gst::ErrorMessage; use once_cell::sync::Lazy; use quinn::{ - crypto::rustls::QuicClientConfig, crypto::rustls::QuicServerConfig, ClientConfig, Endpoint, - ServerConfig, TransportConfig, + crypto::rustls::QuicClientConfig, crypto::rustls::QuicServerConfig, default_runtime, + ClientConfig, Endpoint, EndpointConfig, MtuDiscoveryConfig, ServerConfig, TransportConfig, }; use std::error::Error; use std::fs::File; @@ -28,6 +29,19 @@ use tokio::runtime; pub const CONNECTION_CLOSE_CODE: u32 = 0; pub const CONNECTION_CLOSE_MSG: &str = "Stopped"; +#[derive(Debug)] +pub struct QuinnQuicEndpointConfig { + pub server_addr: SocketAddr, + pub server_name: String, + pub client_addr: SocketAddr, + pub secure_conn: bool, + pub alpns: Vec, + pub certificate_file: Option, + pub private_key_file: Option, + pub keep_alive_interval: u64, + pub transport_config: QuinnQuicTransportConfig, +} + #[derive(Error, Debug)] pub enum WaitError { #[error("Future aborted")] @@ -196,15 +210,12 @@ impl rustls::client::danger::ServerCertVerifier for SkipServerVerification { } } -fn configure_client( - secure_conn: bool, - certificate_file: Option, - private_key_file: Option, - alpns: Vec, - keep_alive_interval_ms: u64, -) -> Result> { - let mut crypto = if secure_conn { - let (certs, key) = read_certs_from_file(certificate_file, private_key_file)?; +fn configure_client(ep_config: &QuinnQuicEndpointConfig) -> Result> { + let mut crypto = if ep_config.secure_conn { + let (certs, key) = read_certs_from_file( + ep_config.certificate_file.clone(), + ep_config.private_key_file.clone(), + )?; let mut cert_store = rustls::RootCertStore::empty(); cert_store.add_parsable_certificates(certs.clone()); @@ -218,20 +229,41 @@ fn configure_client( .with_no_client_auth() }; - let alpn_protocols: Vec> = alpns + let alpn_protocols: Vec> = ep_config + .alpns .iter() .map(|x| x.as_bytes().to_vec()) .collect::>(); crypto.alpn_protocols = alpn_protocols; crypto.key_log = Arc::new(rustls::KeyLogFile::new()); - let mut client_config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto)?)); - - if keep_alive_interval_ms > 0 { + let transport_config = { + let mtu_config = MtuDiscoveryConfig::default() + .upper_bound(ep_config.transport_config.upper_bound_mtu) + .to_owned(); let mut transport_config = TransportConfig::default(); - transport_config.keep_alive_interval(Some(Duration::from_millis(keep_alive_interval_ms))); - client_config.transport_config(Arc::new(transport_config)); - } + + if ep_config.keep_alive_interval > 0 { + transport_config + .keep_alive_interval(Some(Duration::from_millis(ep_config.keep_alive_interval))); + } + transport_config.initial_mtu(ep_config.transport_config.initial_mtu); + transport_config.min_mtu(ep_config.transport_config.min_mtu); + transport_config.datagram_receive_buffer_size(Some( + ep_config.transport_config.datagram_receive_buffer_size, + )); + transport_config + .datagram_send_buffer_size(ep_config.transport_config.datagram_send_buffer_size); + transport_config.max_concurrent_bidi_streams(0u32.into()); + transport_config.max_concurrent_uni_streams(1u32.into()); + transport_config.mtu_discovery_config(Some(mtu_config)); + + transport_config + }; + + let client_config = ClientConfig::new(Arc::new(QuicClientConfig::try_from(crypto)?)) + .transport_config(Arc::new(transport_config)) + .to_owned(); Ok(client_config) } @@ -294,17 +326,16 @@ fn read_certs_from_file( } fn configure_server( - server_name: &str, - secure_conn: bool, - certificate_file: Option, - private_key_file: Option, - alpns: Vec, + ep_config: &QuinnQuicEndpointConfig, ) -> Result<(ServerConfig, Vec), Box> { - let (certs, key) = if secure_conn { - read_certs_from_file(certificate_file, private_key_file)? + let (certs, key) = if ep_config.secure_conn { + read_certs_from_file( + ep_config.certificate_file.clone(), + ep_config.private_key_file.clone(), + )? } else { let rcgen::CertifiedKey { cert: _, key_pair } = - rcgen::generate_simple_self_signed(vec![server_name.into()]).unwrap(); + rcgen::generate_simple_self_signed(vec![ep_config.server_name.clone()]).unwrap(); let cert_der = key_pair.serialize_der(); let priv_key = rustls_pki_types::PrivateKeyDer::try_from(cert_der.clone()).unwrap(); let cert_chain = vec![rustls_pki_types::CertificateDer::from(cert_der)]; @@ -312,7 +343,7 @@ fn configure_server( (cert_chain, priv_key) }; - let mut crypto = if secure_conn { + let mut crypto = if ep_config.secure_conn { let mut cert_store = rustls::RootCertStore::empty(); cert_store.add_parsable_certificates(certs.clone()); @@ -328,59 +359,58 @@ fn configure_server( .with_single_cert(certs.clone(), key) }?; - let alpn_protocols: Vec> = alpns + let alpn_protocols: Vec> = ep_config + .alpns .iter() .map(|x| x.as_bytes().to_vec()) .collect::>(); crypto.alpn_protocols = alpn_protocols; crypto.key_log = Arc::new(rustls::KeyLogFile::new()); - let mut server_config = - ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(crypto)?)); - Arc::get_mut(&mut server_config.transport) - .unwrap() - .max_concurrent_bidi_streams(0_u8.into()) - .max_concurrent_uni_streams(1_u8.into()); + let transport_config = { + let mtu_config = MtuDiscoveryConfig::default() + .upper_bound(ep_config.transport_config.upper_bound_mtu) + .to_owned(); + let mut transport_config = TransportConfig::default(); + + transport_config.initial_mtu(ep_config.transport_config.initial_mtu); + transport_config.min_mtu(ep_config.transport_config.min_mtu); + transport_config.datagram_receive_buffer_size(Some( + ep_config.transport_config.datagram_receive_buffer_size, + )); + transport_config + .datagram_send_buffer_size(ep_config.transport_config.datagram_send_buffer_size); + transport_config.max_concurrent_bidi_streams(0u32.into()); + transport_config.max_concurrent_uni_streams(1u32.into()); + transport_config.mtu_discovery_config(Some(mtu_config)); + + transport_config + }; + + let server_config = ServerConfig::with_crypto(Arc::new(QuicServerConfig::try_from(crypto)?)) + .transport_config(Arc::new(transport_config)) + .to_owned(); Ok((server_config, certs)) } -pub fn server_endpoint( - server_addr: SocketAddr, - server_name: &str, - secure_conn: bool, - alpns: Vec, - certificate_file: Option, - private_key_file: Option, -) -> Result> { - let (server_config, _) = configure_server( - server_name, - secure_conn, - certificate_file, - private_key_file, - alpns, - )?; - let endpoint = Endpoint::server(server_config, server_addr)?; +pub fn server_endpoint(ep_config: &QuinnQuicEndpointConfig) -> Result> { + let (server_config, _) = configure_server(ep_config)?; + let socket = std::net::UdpSocket::bind(ep_config.server_addr)?; + let runtime = default_runtime() + .ok_or_else(|| std::io::Error::new(std::io::ErrorKind::Other, "No async runtime found"))?; + let endpoint_config = EndpointConfig::default() + .max_udp_payload_size(ep_config.transport_config.max_udp_payload_size) + .unwrap() + .to_owned(); + let endpoint = Endpoint::new(endpoint_config, Some(server_config), socket, runtime)?; Ok(endpoint) } -pub fn client_endpoint( - client_addr: SocketAddr, - secure_conn: bool, - alpns: Vec, - certificate_file: Option, - private_key_file: Option, - keep_alive_interval_ms: u64, -) -> Result> { - let client_cfg = configure_client( - secure_conn, - certificate_file, - private_key_file, - alpns, - keep_alive_interval_ms, - )?; - let mut endpoint = Endpoint::client(client_addr)?; +pub fn client_endpoint(ep_config: &QuinnQuicEndpointConfig) -> Result> { + let client_cfg = configure_client(ep_config)?; + let mut endpoint = Endpoint::client(ep_config.client_addr)?; endpoint.set_default_client_config(client_cfg);