net/quinn: Make QUIC role configurable

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1575>
This commit is contained in:
Tamas Levai 2024-05-15 10:53:10 +02:00
parent 8fc652f208
commit 802ff6a67c
9 changed files with 474 additions and 160 deletions

26
Cargo.lock generated
View file

@ -2680,7 +2680,7 @@ dependencies = [
"once_cell", "once_cell",
"quinn", "quinn",
"rcgen", "rcgen",
"rustls 0.23.7", "rustls 0.23.8",
"rustls-pemfile 2.1.2", "rustls-pemfile 2.1.2",
"rustls-pki-types", "rustls-pki-types",
"serial_test", "serial_test",
@ -3837,7 +3837,7 @@ dependencies = [
"httpdate", "httpdate",
"itoa", "itoa",
"pin-project-lite", "pin-project-lite",
"socket2 0.5.7", "socket2 0.4.10",
"tokio", "tokio",
"tower-service", "tower-service",
"tracing", "tracing",
@ -5469,16 +5469,16 @@ dependencies = [
[[package]] [[package]]
name = "quinn" name = "quinn"
version = "0.11.0" version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4bb80dc034523335a9fcc34271931dd97e9132d1fb078695db500339eb72e712" checksum = "904e3d3ba178131798c6d9375db2b13b34337d489b089fc5ba0825a2ff1bee73"
dependencies = [ dependencies = [
"bytes", "bytes",
"pin-project-lite", "pin-project-lite",
"quinn-proto", "quinn-proto",
"quinn-udp", "quinn-udp",
"rustc-hash", "rustc-hash",
"rustls 0.23.7", "rustls 0.23.8",
"thiserror", "thiserror",
"tokio", "tokio",
"tracing", "tracing",
@ -5486,15 +5486,15 @@ dependencies = [
[[package]] [[package]]
name = "quinn-proto" name = "quinn-proto"
version = "0.11.1" version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a063a47a1aaee4b3b1c2dd44edb7867c10107a2ef171f3543ac40ec5e9092002" checksum = "e974563a4b1c2206bbc61191ca4da9c22e4308b4c455e8906751cc7828393f08"
dependencies = [ dependencies = [
"bytes", "bytes",
"rand", "rand",
"ring", "ring",
"rustc-hash", "rustc-hash",
"rustls 0.23.7", "rustls 0.23.8",
"rustls-platform-verifier", "rustls-platform-verifier",
"slab", "slab",
"thiserror", "thiserror",
@ -5504,9 +5504,9 @@ dependencies = [
[[package]] [[package]]
name = "quinn-udp" name = "quinn-udp"
version = "0.5.0" version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7ad7bc932e4968523fa7d9c320ee135ff779de720e9350fee8728838551764" checksum = "e4f0def2590301f4f667db5a77f9694fb004f82796dc1a8b1508fafa3d0e8b72"
dependencies = [ dependencies = [
"libc", "libc",
"once_cell", "once_cell",
@ -5955,9 +5955,9 @@ dependencies = [
[[package]] [[package]]
name = "rustls" name = "rustls"
version = "0.23.7" version = "0.23.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebbbdb961df0ad3f2652da8f3fdc4b36122f568f968f45ad3316f26c025c677b" checksum = "79adb16721f56eb2d843e67676896a61ce7a0fa622dc18d3e372477a029d2740"
dependencies = [ dependencies = [
"once_cell", "once_cell",
"ring", "ring",
@ -6028,7 +6028,7 @@ dependencies = [
"jni", "jni",
"log", "log",
"once_cell", "once_cell",
"rustls 0.23.7", "rustls 0.23.8",
"rustls-native-certs 0.7.0", "rustls-native-certs 0.7.0",
"rustls-platform-verifier-android", "rustls-platform-verifier-android",
"rustls-webpki 0.102.4", "rustls-webpki 0.102.4",

View file

@ -3984,6 +3984,18 @@
} }
}, },
"properties": { "properties": {
"address": {
"blurb": "Address of the QUIC server e.g. 127.0.0.1",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "127.0.0.1",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"alpn-protocols": { "alpn-protocols": {
"blurb": "QUIC connection Application-Layer Protocol Negotiation (ALPN) values", "blurb": "QUIC connection Application-Layer Protocol Negotiation (ALPN) values",
"conditionally-available": false, "conditionally-available": false,
@ -3995,6 +4007,32 @@
"type": "GstValueArray", "type": "GstValueArray",
"writable": true "writable": true
}, },
"bind-address": {
"blurb": "Address to bind QUIC client e.g. 0.0.0.0",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0.0.0.0",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"bind-port": {
"blurb": "Port to bind QUIC client e.g. 5001",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "65535",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"certificate-file": { "certificate-file": {
"blurb": "Path to certificate chain in single file", "blurb": "Path to certificate chain in single file",
"conditionally-available": false, "conditionally-available": false,
@ -4047,6 +4085,20 @@
"type": "guint64", "type": "guint64",
"writable": true "writable": true
}, },
"port": {
"blurb": "Port of the QUIC server e.g. 5000",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "5000",
"max": "65535",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"private-key-file": { "private-key-file": {
"blurb": "Path to a PKCS8 or RSA private key file", "blurb": "Path to a PKCS8 or RSA private key file",
"conditionally-available": false, "conditionally-available": false,
@ -4059,6 +4111,18 @@
"type": "gchararray", "type": "gchararray",
"writable": true "writable": true
}, },
"role": {
"blurb": "QUIC connection role to use.",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "client (1)",
"mutable": "null",
"readable": true,
"type": "GstQuinnQuicRole",
"writable": true
},
"secure-connection": { "secure-connection": {
"blurb": "Use certificates for QUIC connection. False: Insecure connection, True: Secure connection.", "blurb": "Use certificates for QUIC connection. False: Insecure connection, True: Secure connection.",
"conditionally-available": false, "conditionally-available": false,
@ -4084,7 +4148,7 @@
"writable": true "writable": true
}, },
"server-name": { "server-name": {
"blurb": "Name of the QUIC server which is in server certificate", "blurb": "Name of the QUIC server which is in server certificate in case of server role",
"conditionally-available": false, "conditionally-available": false,
"construct": false, "construct": false,
"construct-only": false, "construct-only": false,
@ -4158,6 +4222,18 @@
} }
}, },
"properties": { "properties": {
"address": {
"blurb": "Address of the QUIC server e.g. 127.0.0.1",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "127.0.0.1",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"alpn-protocols": { "alpn-protocols": {
"blurb": "QUIC connection Application-Layer Protocol Negotiation (ALPN) values", "blurb": "QUIC connection Application-Layer Protocol Negotiation (ALPN) values",
"conditionally-available": false, "conditionally-available": false,
@ -4169,6 +4245,32 @@
"type": "GstValueArray", "type": "GstValueArray",
"writable": true "writable": true
}, },
"bind-address": {
"blurb": "Address to bind QUIC client e.g. 0.0.0.0",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0.0.0.0",
"mutable": "null",
"readable": true,
"type": "gchararray",
"writable": true
},
"bind-port": {
"blurb": "Port to bind QUIC client e.g. 5001",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "65535",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"caps": { "caps": {
"blurb": "The caps of the source pad", "blurb": "The caps of the source pad",
"conditionally-available": false, "conditionally-available": false,
@ -4193,6 +4295,34 @@
"type": "gchararray", "type": "gchararray",
"writable": true "writable": true
}, },
"keep-alive-interval": {
"blurb": "Keeps QUIC connection alive by periodically pinging the server. Value set in ms, 0 disables this feature",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
},
"port": {
"blurb": "Port of the QUIC server e.g. 5000",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "5000",
"max": "65535",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint",
"writable": true
},
"private-key-file": { "private-key-file": {
"blurb": "Path to a PKCS8 or RSA private key file", "blurb": "Path to a PKCS8 or RSA private key file",
"conditionally-available": false, "conditionally-available": false,
@ -4205,6 +4335,18 @@
"type": "gchararray", "type": "gchararray",
"writable": true "writable": true
}, },
"role": {
"blurb": "QUIC connection role to use.",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "server (0)",
"mutable": "null",
"readable": true,
"type": "GstQuinnQuicRole",
"writable": true
},
"secure-connection": { "secure-connection": {
"blurb": "Use certificates for QUIC connection. False: Insecure connection, True: Secure connection.", "blurb": "Use certificates for QUIC connection. False: Insecure connection, True: Secure connection.",
"conditionally-available": false, "conditionally-available": false,
@ -4230,7 +4372,7 @@
"writable": true "writable": true
}, },
"server-name": { "server-name": {
"blurb": "Name of the QUIC server which is in server certificate", "blurb": "Name of the QUIC server which is in server certificate in case of server role",
"conditionally-available": false, "conditionally-available": false,
"construct": false, "construct": false,
"construct-only": false, "construct-only": false,
@ -4287,7 +4429,23 @@
}, },
"filename": "gstquinn", "filename": "gstquinn",
"license": "MPL", "license": "MPL",
"other-types": {}, "other-types": {
"GstQuinnQuicRole": {
"kind": "enum",
"values": [
{
"desc": "Server: Act as QUIC server.",
"name": "server",
"value": "0"
},
{
"desc": "Client: Act as QUIC client.",
"name": "client",
"value": "1"
}
]
}
},
"package": "gst-plugin-quinn", "package": "gst-plugin-quinn",
"source": "gst-plugin-quinn", "source": "gst-plugin-quinn",
"tracers": {}, "tracers": {},

38
net/quinn/src/common.rs Normal file
View file

@ -0,0 +1,38 @@
// Copyright (C) 2024, Asymptotic Inc.
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
pub(crate) static DEFAULT_SERVER_NAME: &str = "localhost";
pub(crate) static DEFAULT_ADDR: &str = "127.0.0.1";
pub(crate) static DEFAULT_PORT: u16 = 5000;
pub(crate) static DEFAULT_BIND_ADDR: &str = "0.0.0.0";
pub(crate) static DEFAULT_BIND_PORT: u16 = 0;
/*
* For QUIC transport parameters
* <https://datatracker.ietf.org/doc/html/rfc9000#section-7.4>
*
* A HTTP client might specify "http/1.1" and/or "h2" or "h3".
* Other well-known values are listed in the at IANA registry at
* <https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#alpn-protocol-ids>.
*/
pub(crate) const DEFAULT_ALPN: &str = "gst-quinn";
pub(crate) const DEFAULT_TIMEOUT: u32 = 15;
pub(crate) const DEFAULT_SECURE_CONNECTION: bool = true;
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
#[enum_type(name = "GstQuinnQuicRole")]
pub enum QuinnQuicRole {
#[enum_value(name = "Server: Act as QUIC server.", nick = "server")]
Server,
#[enum_value(name = "Client: Act as QUIC client.", nick = "client")]
Client,
}

View file

@ -6,7 +6,11 @@
// <https://mozilla.org/MPL/2.0/>. // <https://mozilla.org/MPL/2.0/>.
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)] #![allow(
clippy::non_send_fields_in_send_ty,
unused_doc_comments,
unused_imports
)]
/** /**
* plugin-quinn: * plugin-quinn:
@ -14,11 +18,17 @@
* Since: plugins-rs-0.13.0 * Since: plugins-rs-0.13.0
*/ */
use gst::glib; use gst::glib;
use gst::prelude::*;
mod common;
mod quinnquicsink; mod quinnquicsink;
mod quinnquicsrc; mod quinnquicsrc;
mod utils; mod utils;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
#[cfg(feature = "doc")]
{
common::QuinnQuicRole::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
}
quinnquicsink::register(plugin)?; quinnquicsink::register(plugin)?;
quinnquicsrc::register(plugin)?; quinnquicsrc::register(plugin)?;

View file

@ -7,8 +7,10 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use crate::common::*;
use crate::utils::{ use crate::utils::{
client_endpoint, make_socket_addr, wait, WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, client_endpoint, make_socket_addr, server_endpoint, wait, WaitError, CONNECTION_CLOSE_CODE,
CONNECTION_CLOSE_MSG,
}; };
use bytes::Bytes; use bytes::Bytes;
use futures::future; use futures::future;
@ -19,23 +21,7 @@ use quinn::{Connection, SendStream};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Mutex; use std::sync::Mutex;
static DEFAULT_SERVER_NAME: &str = "localhost"; const DEFAULT_ROLE: QuinnQuicRole = QuinnQuicRole::Client;
static DEFAULT_SERVER_ADDR: &str = "127.0.0.1";
static DEFAULT_SERVER_PORT: u16 = 5000;
static DEFAULT_CLIENT_ADDR: &str = "127.0.0.1";
static DEFAULT_CLIENT_PORT: u16 = 5001;
/*
* For QUIC transport parameters
* <https://datatracker.ietf.org/doc/html/rfc9000#section-7.4>
*
* A HTTP client might specify "http/1.1" and/or "h2" or "h3".
* Other well-known values are listed in the at IANA registry at
* <https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#alpn-protocol-ids>.
*/
const DEFAULT_ALPN: &str = "gst-quinn";
const DEFAULT_TIMEOUT: u32 = 15;
const DEFAULT_SECURE_CONNECTION: bool = true;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new( gst::DebugCategory::new(
@ -59,12 +45,13 @@ enum State {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct Settings { struct Settings {
client_address: String, bind_address: String,
client_port: u16, bind_port: u16,
server_address: String, address: String,
server_port: u16, port: u16,
server_name: String, server_name: String,
alpns: Vec<String>, alpns: Vec<String>,
role: QuinnQuicRole,
timeout: u32, timeout: u32,
keep_alive_interval: u64, keep_alive_interval: u64,
secure_conn: bool, secure_conn: bool,
@ -76,12 +63,13 @@ struct Settings {
impl Default for Settings { impl Default for Settings {
fn default() -> Self { fn default() -> Self {
Settings { Settings {
client_address: DEFAULT_CLIENT_ADDR.to_string(), bind_address: DEFAULT_BIND_ADDR.to_string(),
client_port: DEFAULT_CLIENT_PORT, bind_port: DEFAULT_BIND_PORT,
server_address: DEFAULT_SERVER_ADDR.to_string(), address: DEFAULT_ADDR.to_string(),
server_port: DEFAULT_SERVER_PORT, port: DEFAULT_PORT,
server_name: DEFAULT_SERVER_NAME.to_string(), server_name: DEFAULT_SERVER_NAME.to_string(),
alpns: vec![DEFAULT_ALPN.to_string()], alpns: vec![DEFAULT_ALPN.to_string()],
role: DEFAULT_ROLE,
timeout: DEFAULT_TIMEOUT, timeout: DEFAULT_TIMEOUT,
keep_alive_interval: 0, keep_alive_interval: 0,
secure_conn: DEFAULT_SECURE_CONNECTION, secure_conn: DEFAULT_SECURE_CONNECTION,
@ -176,28 +164,28 @@ impl ObjectImpl for QuinnQuicSink {
vec![ vec![
glib::ParamSpecString::builder("server-name") glib::ParamSpecString::builder("server-name")
.nick("QUIC server name") .nick("QUIC server name")
.blurb("Name of the QUIC server which is in server certificate") .blurb("Name of the QUIC server which is in server certificate in case of server role")
.build(), .build(),
glib::ParamSpecString::builder("server-address") glib::ParamSpecString::builder("address")
.nick("QUIC server address") .nick("QUIC server address")
.blurb("Address of the QUIC server to connect to e.g. 127.0.0.1") .blurb("Address of the QUIC server e.g. 127.0.0.1")
.build(), .build(),
glib::ParamSpecUInt::builder("server-port") glib::ParamSpecUInt::builder("port")
.nick("QUIC server port") .nick("QUIC server port")
.blurb("Port of the QUIC server to connect to e.g. 5000") .blurb("Port of the QUIC server e.g. 5000")
.maximum(65535) .maximum(65535)
.default_value(DEFAULT_SERVER_PORT as u32) .default_value(DEFAULT_PORT as u32)
.readwrite() .readwrite()
.build(), .build(),
glib::ParamSpecString::builder("client-address") glib::ParamSpecString::builder("bind-address")
.nick("QUIC client address") .nick("QUIC client bind address")
.blurb("Address to be used by this QUIC client e.g. 127.0.0.1") .blurb("Address to bind QUIC client e.g. 0.0.0.0")
.build(), .build(),
glib::ParamSpecUInt::builder("client-port") glib::ParamSpecUInt::builder("bind-port")
.nick("QUIC client port") .nick("QUIC client port")
.blurb("Port to be used by this QUIC client e.g. 5001") .blurb("Port to bind QUIC client e.g. 5001")
.maximum(65535) .maximum(65535)
.default_value(DEFAULT_CLIENT_PORT as u32) .default_value(DEFAULT_BIND_PORT as u32)
.readwrite() .readwrite()
.build(), .build(),
gst::ParamSpecArray::builder("alpn-protocols") gst::ParamSpecArray::builder("alpn-protocols")
@ -205,6 +193,10 @@ impl ObjectImpl for QuinnQuicSink {
.blurb("QUIC connection Application-Layer Protocol Negotiation (ALPN) values") .blurb("QUIC connection Application-Layer Protocol Negotiation (ALPN) values")
.element_spec(&glib::ParamSpecString::builder("alpn-protocol").build()) .element_spec(&glib::ParamSpecString::builder("alpn-protocol").build())
.build(), .build(),
glib::ParamSpecEnum::builder_with_default("role", DEFAULT_ROLE)
.nick("QUIC role")
.blurb("QUIC connection role to use.")
.build(),
glib::ParamSpecUInt::builder("timeout") glib::ParamSpecUInt::builder("timeout")
.nick("Timeout") .nick("Timeout")
.blurb("Value in seconds to timeout QUIC endpoint requests (0 = No timeout).") .blurb("Value in seconds to timeout QUIC endpoint requests (0 = No timeout).")
@ -249,17 +241,17 @@ impl ObjectImpl for QuinnQuicSink {
"server-name" => { "server-name" => {
settings.server_name = value.get::<String>().expect("type checked upstream"); settings.server_name = value.get::<String>().expect("type checked upstream");
} }
"server-address" => { "address" => {
settings.server_address = value.get::<String>().expect("type checked upstream"); settings.address = value.get::<String>().expect("type checked upstream");
} }
"server-port" => { "port" => {
settings.server_port = value.get::<u32>().expect("type checked upstream") as u16; settings.port = value.get::<u32>().expect("type checked upstream") as u16;
} }
"client-address" => { "bind-address" => {
settings.client_address = value.get::<String>().expect("type checked upstream"); settings.bind_address = value.get::<String>().expect("type checked upstream");
} }
"client-port" => { "bind-port" => {
settings.client_port = value.get::<u32>().expect("type checked upstream") as u16; settings.bind_port = value.get::<u32>().expect("type checked upstream") as u16;
} }
"alpn-protocols" => { "alpn-protocols" => {
settings.alpns = value settings.alpns = value
@ -274,6 +266,9 @@ impl ObjectImpl for QuinnQuicSink {
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
} }
"role" => {
settings.role = value.get::<QuinnQuicRole>().expect("type checked upstream");
}
"timeout" => { "timeout" => {
settings.timeout = value.get().expect("type checked upstream"); settings.timeout = value.get().expect("type checked upstream");
} }
@ -303,20 +298,21 @@ impl ObjectImpl for QuinnQuicSink {
match pspec.name() { match pspec.name() {
"server-name" => settings.server_name.to_value(), "server-name" => settings.server_name.to_value(),
"server-address" => settings.server_address.to_string().to_value(), "address" => settings.address.to_string().to_value(),
"server-port" => { "port" => {
let port = settings.server_port as u32; let port = settings.port as u32;
port.to_value() port.to_value()
} }
"client-address" => settings.client_address.to_string().to_value(), "bind-address" => settings.bind_address.to_string().to_value(),
"client-port" => { "bind-port" => {
let port = settings.client_port as u32; let port = settings.bind_port as u32;
port.to_value() port.to_value()
} }
"alpn-protocols" => { "alpn-protocols" => {
let alpns = settings.alpns.iter().map(|v| v.as_str()); let alpns = settings.alpns.iter().map(|v| v.as_str());
gst::Array::new(alpns).to_value() gst::Array::new(alpns).to_value()
} }
"role" => settings.role.to_value(),
"timeout" => settings.timeout.to_value(), "timeout" => settings.timeout.to_value(),
"keep-alive-interval" => settings.keep_alive_interval.to_value(), "keep-alive-interval" => settings.keep_alive_interval.to_value(),
"secure-connection" => settings.secure_conn.to_value(), "secure-connection" => settings.secure_conn.to_value(),
@ -353,7 +349,7 @@ impl BaseSinkImpl for QuinnQuicSink {
unreachable!("QuicSink is already started"); unreachable!("QuicSink is already started");
} }
match wait(&self.canceller, self.establish_connection(), timeout) { match wait(&self.canceller, self.init_connection(), timeout) {
Ok(Ok((c, s))) => { Ok(Ok((c, s))) => {
*state = State::Started(Started { *state = State::Started(Started {
connection: c, connection: c,
@ -518,11 +514,12 @@ impl QuinnQuicSink {
} }
} }
async fn establish_connection(&self) -> Result<(Connection, Option<SendStream>), WaitError> { async fn init_connection(&self) -> Result<(Connection, Option<SendStream>), WaitError> {
let client_addr; let client_addr;
let server_addr; let server_addr;
let server_name; let server_name;
let alpns; let alpns;
let role;
let use_datagram; let use_datagram;
let keep_alive_interval; let keep_alive_interval;
let secure_conn; let secure_conn;
@ -533,14 +530,15 @@ impl QuinnQuicSink {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
client_addr = make_socket_addr( client_addr = make_socket_addr(
format!("{}:{}", settings.client_address, settings.client_port).as_str(), format!("{}:{}", settings.bind_address, settings.bind_port).as_str(),
)?;
server_addr = make_socket_addr(
format!("{}:{}", settings.server_address, settings.server_port).as_str(),
)?; )?;
server_addr =
make_socket_addr(format!("{}:{}", settings.address, settings.port).as_str())?;
server_name = settings.server_name.clone(); server_name = settings.server_name.clone();
alpns = settings.alpns.clone(); alpns = settings.alpns.clone();
role = settings.role;
use_datagram = settings.use_datagram; use_datagram = settings.use_datagram;
keep_alive_interval = settings.keep_alive_interval; keep_alive_interval = settings.keep_alive_interval;
secure_conn = settings.secure_conn; secure_conn = settings.secure_conn;
@ -548,6 +546,35 @@ impl QuinnQuicSink {
private_key_file = settings.private_key_file.clone(); private_key_file = settings.private_key_file.clone();
} }
let connection;
match role {
QuinnQuicRole::Server => {
let endpoint = server_endpoint(
server_addr,
&server_name,
secure_conn,
alpns,
cert_file,
private_key_file,
)
.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to configure endpoint: {}", err]
))
})?;
let incoming_conn = endpoint.accept().await.unwrap();
connection = incoming_conn.await.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Connection error: {}", err]
))
})?;
}
QuinnQuicRole::Client => {
let endpoint = client_endpoint( let endpoint = client_endpoint(
client_addr, client_addr,
secure_conn, secure_conn,
@ -563,7 +590,7 @@ impl QuinnQuicSink {
)) ))
})?; })?;
let connection = endpoint connection = endpoint
.connect(server_addr, &server_name) .connect(server_addr, &server_name)
.unwrap() .unwrap()
.await .await
@ -573,6 +600,8 @@ impl QuinnQuicSink {
["Connection error: {}", err] ["Connection error: {}", err]
)) ))
})?; })?;
}
}
let stream = if !use_datagram { let stream = if !use_datagram {
let res = connection.open_uni().await.map_err(|err| { let res = connection.open_uni().await.map_err(|err| {

View file

@ -15,8 +15,8 @@
* ```bash * ```bash
* gst-launch-1.0 -v -e audiotestsrc num-buffers=512 ! \ * gst-launch-1.0 -v -e audiotestsrc num-buffers=512 ! \
* audio/x-raw,format=S16LE,rate=48000,channels=2,layout=interleaved ! opusenc ! \ * audio/x-raw,format=S16LE,rate=48000,channels=2,layout=interleaved ! opusenc ! \
* quinnquicsink server-name="quic.net" client-address="127.0.0.1" client-port=6001 \ * quinnquicsink server-name="quic.net" bind-address="127.0.0.1" bind-port=6001 \
* server-address="127.0.0.1" server-port=6000 certificate-file="certificates/fullchain.pem" \ * address="127.0.0.1" port=6000 certificate-file="certificates/fullchain.pem" \
* private-key-file="certificates/privkey.pem" * private-key-file="certificates/privkey.pem"
* ``` * ```
*/ */

View file

@ -7,8 +7,10 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
use crate::common::*;
use crate::utils::{ use crate::utils::{
make_socket_addr, server_endpoint, wait, WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, client_endpoint, make_socket_addr, server_endpoint, wait, WaitError, CONNECTION_CLOSE_CODE,
CONNECTION_CLOSE_MSG,
}; };
use bytes::Bytes; use bytes::Bytes;
use futures::future; use futures::future;
@ -21,21 +23,7 @@ use quinn::{Connection, ConnectionError, ReadError, RecvStream};
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Mutex; use std::sync::Mutex;
static DEFAULT_SERVER_NAME: &str = "localhost"; const DEFAULT_ROLE: QuinnQuicRole = QuinnQuicRole::Server;
static DEFAULT_SERVER_ADDR: &str = "127.0.0.1";
static DEFAULT_SERVER_PORT: u16 = 5000;
/*
* For QUIC transport parameters
* <https://datatracker.ietf.org/doc/html/rfc9000#section-7.4>
*
* A HTTP client might specify "http/1.1" and/or "h2" or "h3".
* Other well-known values are listed in the at IANA registry at
* <https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#alpn-protocol-ids>.
*/
const DEFAULT_ALPN: &str = "gst-quinn";
const DEFAULT_TIMEOUT: u32 = 15;
const DEFAULT_SECURE_CONNECTION: bool = true;
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new( gst::DebugCategory::new(
@ -59,11 +47,15 @@ enum State {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct Settings { struct Settings {
server_address: String, address: String,
server_port: u16, port: u16,
server_name: String, server_name: String,
bind_address: String,
bind_port: u16,
alpns: Vec<String>, alpns: Vec<String>,
role: QuinnQuicRole,
timeout: u32, timeout: u32,
keep_alive_interval: u64,
secure_conn: bool, secure_conn: bool,
caps: gst::Caps, caps: gst::Caps,
use_datagram: bool, use_datagram: bool,
@ -74,11 +66,15 @@ struct Settings {
impl Default for Settings { impl Default for Settings {
fn default() -> Self { fn default() -> Self {
Settings { Settings {
server_address: DEFAULT_SERVER_ADDR.to_string(), address: DEFAULT_ADDR.to_string(),
server_port: DEFAULT_SERVER_PORT, port: DEFAULT_PORT,
server_name: DEFAULT_SERVER_NAME.to_string(), server_name: DEFAULT_SERVER_NAME.to_string(),
bind_address: DEFAULT_BIND_ADDR.to_string(),
bind_port: DEFAULT_BIND_PORT,
alpns: vec![DEFAULT_ALPN.to_string()], alpns: vec![DEFAULT_ALPN.to_string()],
role: DEFAULT_ROLE,
timeout: DEFAULT_TIMEOUT, timeout: DEFAULT_TIMEOUT,
keep_alive_interval: 0,
secure_conn: DEFAULT_SECURE_CONNECTION, secure_conn: DEFAULT_SECURE_CONNECTION,
caps: gst::Caps::new_any(), caps: gst::Caps::new_any(),
use_datagram: false, use_datagram: false,
@ -173,17 +169,28 @@ impl ObjectImpl for QuinnQuicSrc {
vec![ vec![
glib::ParamSpecString::builder("server-name") glib::ParamSpecString::builder("server-name")
.nick("QUIC server name") .nick("QUIC server name")
.blurb("Name of the QUIC server which is in server certificate") .blurb("Name of the QUIC server which is in server certificate in case of server role")
.build(), .build(),
glib::ParamSpecString::builder("server-address") glib::ParamSpecString::builder("address")
.nick("QUIC server address") .nick("QUIC server address")
.blurb("Address of the QUIC server e.g. 127.0.0.1") .blurb("Address of the QUIC server e.g. 127.0.0.1")
.build(), .build(),
glib::ParamSpecUInt::builder("server-port") glib::ParamSpecUInt::builder("port")
.nick("QUIC server port") .nick("QUIC server port")
.blurb("Port of the QUIC server e.g. 5000") .blurb("Port of the QUIC server e.g. 5000")
.maximum(65535) .maximum(65535)
.default_value(DEFAULT_SERVER_PORT as u32) .default_value(DEFAULT_PORT as u32)
.readwrite()
.build(),
glib::ParamSpecString::builder("bind-address")
.nick("QUIC client bind address")
.blurb("Address to bind QUIC client e.g. 0.0.0.0")
.build(),
glib::ParamSpecUInt::builder("bind-port")
.nick("QUIC client port")
.blurb("Port to bind QUIC client e.g. 5001")
.maximum(65535)
.default_value(DEFAULT_BIND_PORT as u32)
.readwrite() .readwrite()
.build(), .build(),
gst::ParamSpecArray::builder("alpn-protocols") gst::ParamSpecArray::builder("alpn-protocols")
@ -191,6 +198,10 @@ impl ObjectImpl for QuinnQuicSrc {
.blurb("QUIC connection Application-Layer Protocol Negotiation (ALPN) values") .blurb("QUIC connection Application-Layer Protocol Negotiation (ALPN) values")
.element_spec(&glib::ParamSpecString::builder("alpn-protocol").build()) .element_spec(&glib::ParamSpecString::builder("alpn-protocol").build())
.build(), .build(),
glib::ParamSpecEnum::builder_with_default("role", DEFAULT_ROLE)
.nick("QUIC role")
.blurb("QUIC connection role to use.")
.build(),
glib::ParamSpecUInt::builder("timeout") glib::ParamSpecUInt::builder("timeout")
.nick("Timeout") .nick("Timeout")
.blurb("Value in seconds to timeout QUIC endpoint requests (0 = No timeout).") .blurb("Value in seconds to timeout QUIC endpoint requests (0 = No timeout).")
@ -198,6 +209,12 @@ impl ObjectImpl for QuinnQuicSrc {
.default_value(DEFAULT_TIMEOUT) .default_value(DEFAULT_TIMEOUT)
.readwrite() .readwrite()
.build(), .build(),
glib::ParamSpecUInt64::builder("keep-alive-interval")
.nick("QUIC connection keep alive interval in ms")
.blurb("Keeps QUIC connection alive by periodically pinging the server. Value set in ms, 0 disables this feature")
.default_value(0)
.readwrite()
.build(),
glib::ParamSpecBoolean::builder("secure-connection") glib::ParamSpecBoolean::builder("secure-connection")
.nick("Use secure connection") .nick("Use secure connection")
.blurb("Use certificates for QUIC connection. False: Insecure connection, True: Secure connection.") .blurb("Use certificates for QUIC connection. False: Insecure connection, True: Secure connection.")
@ -233,11 +250,17 @@ impl ObjectImpl for QuinnQuicSrc {
"server-name" => { "server-name" => {
settings.server_name = value.get::<String>().expect("type checked upstream"); settings.server_name = value.get::<String>().expect("type checked upstream");
} }
"server-address" => { "address" => {
settings.server_address = value.get::<String>().expect("type checked upstream"); settings.address = value.get::<String>().expect("type checked upstream");
} }
"server-port" => { "port" => {
settings.server_port = value.get::<u32>().expect("type checked upstream") as u16; settings.port = value.get::<u32>().expect("type checked upstream") as u16;
}
"bind-address" => {
settings.bind_address = value.get::<String>().expect("type checked upstream");
}
"bind-port" => {
settings.bind_port = value.get::<u32>().expect("type checked upstream") as u16;
} }
"alpn-protocols" => { "alpn-protocols" => {
settings.alpns = value settings.alpns = value
@ -252,6 +275,9 @@ impl ObjectImpl for QuinnQuicSrc {
}) })
.collect::<Vec<String>>() .collect::<Vec<String>>()
} }
"role" => {
settings.role = value.get::<QuinnQuicRole>().expect("type checked upstream");
}
"caps" => { "caps" => {
settings.caps = value settings.caps = value
.get::<Option<gst::Caps>>() .get::<Option<gst::Caps>>()
@ -264,6 +290,9 @@ impl ObjectImpl for QuinnQuicSrc {
"timeout" => { "timeout" => {
settings.timeout = value.get().expect("type checked upstream"); settings.timeout = value.get().expect("type checked upstream");
} }
"keep-alive-interval" => {
settings.keep_alive_interval = value.get().expect("type checked upstream");
}
"secure-connection" => { "secure-connection" => {
settings.secure_conn = value.get().expect("type checked upstream"); settings.secure_conn = value.get().expect("type checked upstream");
} }
@ -287,17 +316,24 @@ impl ObjectImpl for QuinnQuicSrc {
match pspec.name() { match pspec.name() {
"server-name" => settings.server_name.to_value(), "server-name" => settings.server_name.to_value(),
"server-address" => settings.server_address.to_string().to_value(), "address" => settings.address.to_string().to_value(),
"server-port" => { "port" => {
let port = settings.server_port as u32; let port = settings.port as u32;
port.to_value()
}
"bind-address" => settings.bind_address.to_string().to_value(),
"bind-port" => {
let port = settings.bind_port as u32;
port.to_value() port.to_value()
} }
"alpn-protocols" => { "alpn-protocols" => {
let alpns = settings.alpns.iter().map(|v| v.as_str()); let alpns = settings.alpns.iter().map(|v| v.as_str());
gst::Array::new(alpns).to_value() gst::Array::new(alpns).to_value()
} }
"role" => settings.role.to_value(),
"caps" => settings.caps.to_value(), "caps" => settings.caps.to_value(),
"timeout" => settings.timeout.to_value(), "timeout" => settings.timeout.to_value(),
"keep-alive-interval" => settings.keep_alive_interval.to_value(),
"secure-connection" => settings.secure_conn.to_value(), "secure-connection" => settings.secure_conn.to_value(),
"certificate-file" => { "certificate-file" => {
let certfile = settings.certificate_file.as_ref(); let certfile = settings.certificate_file.as_ref();
@ -336,7 +372,7 @@ impl BaseSrcImpl for QuinnQuicSrc {
unreachable!("QuicSrc already started"); unreachable!("QuicSrc already started");
} }
match wait(&self.canceller, self.wait_for_connection(), timeout) { match wait(&self.canceller, self.init_connection(), timeout) {
Ok(Ok((c, s))) => { Ok(Ok((c, s))) => {
*state = State::Started(Started { *state = State::Started(Started {
connection: c, connection: c,
@ -556,11 +592,14 @@ impl QuinnQuicSrc {
}; };
} }
async fn wait_for_connection(&self) -> Result<(Connection, Option<RecvStream>), WaitError> { async fn init_connection(&self) -> Result<(Connection, Option<RecvStream>), WaitError> {
let server_addr; let server_addr;
let server_name; let server_name;
let client_addr;
let alpns; let alpns;
let role;
let use_datagram; let use_datagram;
let keep_alive_interval;
let secure_conn; let secure_conn;
let cert_file; let cert_file;
let private_key_file; let private_key_file;
@ -568,18 +607,27 @@ impl QuinnQuicSrc {
{ {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
server_addr = make_socket_addr( client_addr = make_socket_addr(
format!("{}:{}", settings.server_address, settings.server_port).as_str(), format!("{}:{}", settings.bind_address, settings.bind_port).as_str(),
)?; )?;
server_addr =
make_socket_addr(format!("{}:{}", settings.address, settings.port).as_str())?;
server_name = settings.server_name.clone(); server_name = settings.server_name.clone();
alpns = settings.alpns.clone(); alpns = settings.alpns.clone();
role = settings.role;
use_datagram = settings.use_datagram; use_datagram = settings.use_datagram;
keep_alive_interval = settings.keep_alive_interval;
secure_conn = settings.secure_conn; secure_conn = settings.secure_conn;
cert_file = settings.certificate_file.clone(); cert_file = settings.certificate_file.clone();
private_key_file = settings.private_key_file.clone(); private_key_file = settings.private_key_file.clone();
} }
let connection;
match role {
QuinnQuicRole::Server => {
let endpoint = server_endpoint( let endpoint = server_endpoint(
server_addr, server_addr,
&server_name, &server_name,
@ -597,12 +645,41 @@ impl QuinnQuicSrc {
let incoming_conn = endpoint.accept().await.unwrap(); let incoming_conn = endpoint.accept().await.unwrap();
let connection = incoming_conn.await.map_err(|err| { connection = incoming_conn.await.map_err(|err| {
WaitError::FutureError(gst::error_msg!( WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed, gst::ResourceError::Failed,
["Connection error: {}", err] ["Connection error: {}", err]
)) ))
})?; })?;
}
QuinnQuicRole::Client => {
let endpoint = client_endpoint(
client_addr,
secure_conn,
alpns,
cert_file,
private_key_file,
keep_alive_interval,
)
.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to configure endpoint: {}", err]
))
})?;
connection = endpoint
.connect(server_addr, &server_name)
.unwrap()
.await
.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Connection error: {}", err]
))
})?;
}
}
let stream = if !use_datagram { let stream = if !use_datagram {
let res = connection.accept_uni().await.map_err(|err| { let res = connection.accept_uni().await.map_err(|err| {

View file

@ -15,7 +15,7 @@
* ```bash * ```bash
* gst-launch-1.0 -v -e quinnquicsrc caps=audio/x-opus server-name="quic.net" \ * gst-launch-1.0 -v -e quinnquicsrc caps=audio/x-opus server-name="quic.net" \
* certificate-file="certificates/fullchain.pem" private-key-file="certificates/privkey.pem" \ * certificate-file="certificates/fullchain.pem" private-key-file="certificates/privkey.pem" \
* server-address="127.0.0.1" server-port=6000 ! opusparse ! opusdec ! \ * address="127.0.0.1" port=6000 ! opusparse ! opusdec ! \
* audio/x-raw,format=S16LE,rate=48000,channels=2,layout=interleaved ! \ * audio/x-raw,format=S16LE,rate=48000,channels=2,layout=interleaved ! \
* audioconvert ! autoaudiosink * audioconvert ! autoaudiosink
* ``` * ```

View file

@ -79,7 +79,9 @@ fn test_send_receive_with_datagram() {
// in the other test. We get a address already in use error otherwise. // in the other test. We get a address already in use error otherwise.
thread::spawn(move || { thread::spawn(move || {
let mut h1 = gst_check::Harness::new_empty(); let mut h1 = gst_check::Harness::new_empty();
h1.add_parse("quinnquicsrc use-datagram=true server-address=127.0.0.1 server-port=6000 secure-connection=false"); h1.add_parse(
"quinnquicsrc use-datagram=true address=127.0.0.1 port=6000 secure-connection=false",
);
h1.play(); h1.play();
@ -96,7 +98,7 @@ fn test_send_receive_with_datagram() {
}); });
let mut h2 = gst_check::Harness::new_empty(); let mut h2 = gst_check::Harness::new_empty();
h2.add_parse("quinnquicsink use-datagram=true client-address=127.0.0.1 client-port=6001 server-address=127.0.0.1 server-port=6000 secure-connection=false"); h2.add_parse("quinnquicsink use-datagram=true bind-address=127.0.0.1 bind-port=6001 address=127.0.0.1 port=6000 secure-connection=false");
h2.set_src_caps(gst::Caps::builder("text/plain").build()); h2.set_src_caps(gst::Caps::builder("text/plain").build());