net/quic: Use separate property for address and port

While at it, do not duplicate call to settings lock in property
getter and setter for every property.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1036>
This commit is contained in:
Sanchayan Maity 2024-04-30 15:57:09 +05:30
parent befd8d4bd2
commit 8b64c734e7
4 changed files with 116 additions and 131 deletions

View file

@ -16,12 +16,14 @@ use gst::{glib, prelude::*, subclass::prelude::*};
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use quinn::{Connection, SendStream}; use quinn::{Connection, SendStream};
use std::net::SocketAddr;
use std::sync::Mutex; use std::sync::Mutex;
static DEFAULT_SERVER_NAME: &str = "localhost"; static DEFAULT_SERVER_NAME: &str = "localhost";
static DEFAULT_SERVER_ADDR: &str = "127.0.0.1:5000"; static DEFAULT_SERVER_ADDR: &str = "127.0.0.1";
static DEFAULT_CLIENT_ADDR: &str = "127.0.0.1:5001"; static DEFAULT_SERVER_PORT: u16 = 5000;
static DEFAULT_CLIENT_ADDR: &str = "127.0.0.1";
static DEFAULT_CLIENT_PORT: u16 = 5001;
/* /*
* For QUIC transport parameters * For QUIC transport parameters
* <https://datatracker.ietf.org/doc/html/rfc9000#section-7.4> * <https://datatracker.ietf.org/doc/html/rfc9000#section-7.4>
@ -52,8 +54,10 @@ enum State {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct Settings { struct Settings {
client_address: SocketAddr, client_address: String,
server_address: SocketAddr, client_port: u16,
server_address: String,
server_port: u16,
server_name: String, server_name: String,
alpns: Vec<String>, alpns: Vec<String>,
timeout: u32, timeout: u32,
@ -64,8 +68,10 @@ struct Settings {
impl Default for Settings { impl Default for Settings {
fn default() -> Self { fn default() -> Self {
Settings { Settings {
client_address: DEFAULT_CLIENT_ADDR.parse::<SocketAddr>().unwrap(), client_address: DEFAULT_CLIENT_ADDR.to_string(),
server_address: DEFAULT_SERVER_ADDR.parse::<SocketAddr>().unwrap(), client_port: DEFAULT_CLIENT_PORT,
server_address: DEFAULT_SERVER_ADDR.to_string(),
server_port: DEFAULT_SERVER_PORT,
server_name: DEFAULT_SERVER_NAME.to_string(), server_name: DEFAULT_SERVER_NAME.to_string(),
alpns: vec![DEFAULT_ALPN.to_string()], alpns: vec![DEFAULT_ALPN.to_string()],
timeout: DEFAULT_TIMEOUT, timeout: DEFAULT_TIMEOUT,
@ -137,11 +143,25 @@ impl ObjectImpl for QuicSink {
.build(), .build(),
glib::ParamSpecString::builder("server-address") glib::ParamSpecString::builder("server-address")
.nick("QUIC server address") .nick("QUIC server address")
.blurb("Address of the QUIC server to connect to e.g. 127.0.0.1:5000") .blurb("Address of the QUIC server to connect to e.g. 127.0.0.1")
.build(),
glib::ParamSpecUInt::builder("server-port")
.nick("QUIC server port")
.blurb("Port of the QUIC server to connect to e.g. 5000")
.maximum(65535)
.default_value(DEFAULT_SERVER_PORT as u32)
.readwrite()
.build(), .build(),
glib::ParamSpecString::builder("client-address") glib::ParamSpecString::builder("client-address")
.nick("QUIC client address") .nick("QUIC client address")
.blurb("Address to be used by this QUIC client e.g. 127.0.0.1:5001") .blurb("Address to be used by this QUIC client e.g. 127.0.0.1")
.build(),
glib::ParamSpecUInt::builder("client-port")
.nick("QUIC client port")
.blurb("Port to be used by this QUIC client e.g. 5001")
.maximum(65535)
.default_value(DEFAULT_CLIENT_PORT as u32)
.readwrite()
.build(), .build(),
gst::ParamSpecArray::builder("alpn-protocols") gst::ParamSpecArray::builder("alpn-protocols")
.nick("QUIC ALPN values") .nick("QUIC ALPN values")
@ -172,43 +192,25 @@ impl ObjectImpl for QuicSink {
} }
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"server-name" => { "server-name" => {
let mut settings = self.settings.lock().unwrap();
settings.server_name = value.get::<String>().expect("type checked upstream"); settings.server_name = value.get::<String>().expect("type checked upstream");
} }
"server-address" => { "server-address" => {
let addr = value.get::<String>().expect("type checked upstream"); settings.server_address = value.get::<String>().expect("type checked upstream");
let addr = make_socket_addr(&addr); }
match addr { "server-port" => {
Ok(server_address) => { settings.server_port = value.get::<u32>().expect("type checked upstream") as u16;
let mut settings = self.settings.lock().unwrap();
settings.server_address = server_address;
}
Err(e) => gst::element_imp_error!(
self,
gst::ResourceError::Failed,
["Invalid server address: {}", e]
),
}
} }
"client-address" => { "client-address" => {
let addr = value.get::<String>().expect("type checked upstream"); settings.client_address = value.get::<String>().expect("type checked upstream");
let addr = make_socket_addr(&addr); }
match addr { "client-port" => {
Ok(client_address) => { settings.client_port = value.get::<u32>().expect("type checked upstream") as u16;
let mut settings = self.settings.lock().unwrap();
settings.client_address = client_address;
}
Err(e) => gst::element_imp_error!(
self,
gst::ResourceError::Failed,
["Invalid client address: {}", e]
),
}
} }
"alpn-protocols" => { "alpn-protocols" => {
let mut settings = self.settings.lock().unwrap();
settings.alpns = value settings.alpns = value
.get::<gst::ArrayRef>() .get::<gst::ArrayRef>()
.expect("type checked upstream") .expect("type checked upstream")
@ -222,15 +224,12 @@ impl ObjectImpl for QuicSink {
.collect::<Vec<_>>(); .collect::<Vec<_>>();
} }
"timeout" => { "timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.timeout = value.get().expect("type checked upstream"); settings.timeout = value.get().expect("type checked upstream");
} }
"secure-connection" => { "secure-connection" => {
let mut settings = self.settings.lock().unwrap();
settings.secure_conn = value.get().expect("type checked upstream"); settings.secure_conn = value.get().expect("type checked upstream");
} }
"use-datagram" => { "use-datagram" => {
let mut settings = self.settings.lock().unwrap();
settings.use_datagram = value.get().expect("type checked upstream"); settings.use_datagram = value.get().expect("type checked upstream");
} }
_ => unimplemented!(), _ => unimplemented!(),
@ -238,36 +237,27 @@ impl ObjectImpl for QuicSink {
} }
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"server-name" => { "server-name" => settings.server_name.to_value(),
let settings = self.settings.lock().unwrap(); "server-address" => settings.server_address.to_string().to_value(),
settings.server_name.to_value() "server-port" => {
let port = settings.server_port as u32;
port.to_value()
} }
"server-address" => { "client-address" => settings.client_address.to_string().to_value(),
let settings = self.settings.lock().unwrap(); "client-port" => {
settings.server_address.to_string().to_value() let port = settings.client_port as u32;
} port.to_value()
"client-address" => {
let settings = self.settings.lock().unwrap();
settings.client_address.to_string().to_value()
} }
"alpn-protocols" => { "alpn-protocols" => {
let settings = self.settings.lock().unwrap();
let alpns = settings.alpns.iter().map(|v| v.as_str()); let alpns = settings.alpns.iter().map(|v| v.as_str());
gst::Array::new(alpns).to_value() gst::Array::new(alpns).to_value()
} }
"timeout" => { "timeout" => settings.timeout.to_value(),
let settings = self.settings.lock().unwrap(); "secure-connection" => settings.secure_conn.to_value(),
settings.timeout.to_value() "use-datagram" => settings.use_datagram.to_value(),
}
"secure-connection" => {
let settings = self.settings.lock().unwrap();
settings.secure_conn.to_value()
}
"use-datagram" => {
let settings = self.settings.lock().unwrap();
settings.use_datagram.to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -465,8 +455,14 @@ impl QuicSink {
{ {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
client_addr = settings.client_address;
server_addr = settings.server_address; client_addr = make_socket_addr(
format!("{}:{}", settings.client_address, settings.client_port).as_str(),
)?;
server_addr = make_socket_addr(
format!("{}:{}", settings.server_address, settings.server_port).as_str(),
)?;
server_name = settings.server_name.clone(); server_name = settings.server_name.clone();
alpns = settings.alpns.clone(); alpns = settings.alpns.clone();
use_datagram = settings.use_datagram; use_datagram = settings.use_datagram;

View file

@ -19,12 +19,13 @@ use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*; use gst_base::subclass::prelude::*;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use quinn::{Connection, ConnectionError, RecvStream}; use quinn::{Connection, ConnectionError, RecvStream};
use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Mutex; use std::sync::Mutex;
static DEFAULT_SERVER_NAME: &str = "localhost"; static DEFAULT_SERVER_NAME: &str = "localhost";
static DEFAULT_SERVER_ADDR: &str = "127.0.0.1:5000"; static DEFAULT_SERVER_ADDR: &str = "127.0.0.1";
static DEFAULT_SERVER_PORT: u16 = 5000;
/* /*
* For QUIC transport parameters * For QUIC transport parameters
* <https://datatracker.ietf.org/doc/html/rfc9000#section-7.4> * <https://datatracker.ietf.org/doc/html/rfc9000#section-7.4>
@ -60,7 +61,8 @@ enum State {
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
struct Settings { struct Settings {
server_address: SocketAddr, server_address: String,
server_port: u16,
server_name: String, server_name: String,
alpns: Vec<String>, alpns: Vec<String>,
timeout: u32, timeout: u32,
@ -74,7 +76,8 @@ struct Settings {
impl Default for Settings { impl Default for Settings {
fn default() -> Self { fn default() -> Self {
Settings { Settings {
server_address: DEFAULT_SERVER_ADDR.parse::<SocketAddr>().unwrap(), server_address: DEFAULT_SERVER_ADDR.to_string(),
server_port: DEFAULT_SERVER_PORT,
server_name: DEFAULT_SERVER_NAME.to_string(), server_name: DEFAULT_SERVER_NAME.to_string(),
alpns: vec![DEFAULT_ALPN.to_string()], alpns: vec![DEFAULT_ALPN.to_string()],
timeout: DEFAULT_TIMEOUT, timeout: DEFAULT_TIMEOUT,
@ -176,7 +179,14 @@ impl ObjectImpl for QuicSrc {
.build(), .build(),
glib::ParamSpecString::builder("server-address") glib::ParamSpecString::builder("server-address")
.nick("QUIC server address") .nick("QUIC server address")
.blurb("Address of the QUIC server to connect to e.g. 127.0.0.1:5000") .blurb("Address of the QUIC server e.g. 127.0.0.1")
.build(),
glib::ParamSpecUInt::builder("server-port")
.nick("QUIC server port")
.blurb("Port of the QUIC server e.g. 5000")
.maximum(65535)
.default_value(DEFAULT_SERVER_PORT as u32)
.readwrite()
.build(), .build(),
gst::ParamSpecArray::builder("alpn-protocols") gst::ParamSpecArray::builder("alpn-protocols")
.nick("QUIC ALPN values") .nick("QUIC ALPN values")
@ -219,28 +229,19 @@ impl ObjectImpl for QuicSrc {
} }
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"server-name" => { "server-name" => {
let mut settings = self.settings.lock().unwrap();
settings.server_name = value.get::<String>().expect("type checked upstream"); settings.server_name = value.get::<String>().expect("type checked upstream");
} }
"server-address" => { "server-address" => {
let addr = value.get::<String>().expect("type checked upstream"); settings.server_address = value.get::<String>().expect("type checked upstream");
let addr = make_socket_addr(&addr); }
match addr { "server-port" => {
Ok(server_address) => { settings.server_port = value.get::<u32>().expect("type checked upstream") as u16;
let mut settings = self.settings.lock().unwrap();
settings.server_address = server_address;
}
Err(e) => gst::element_imp_error!(
self,
gst::ResourceError::Failed,
["Invalid server address: {}", e]
),
}
} }
"alpn-protocols" => { "alpn-protocols" => {
let mut settings = self.settings.lock().unwrap();
settings.alpns = value settings.alpns = value
.get::<gst::ArrayRef>() .get::<gst::ArrayRef>()
.expect("type checked upstream") .expect("type checked upstream")
@ -254,7 +255,6 @@ impl ObjectImpl for QuicSrc {
.collect::<Vec<String>>() .collect::<Vec<String>>()
} }
"caps" => { "caps" => {
let mut settings = self.settings.lock().unwrap();
settings.caps = value settings.caps = value
.get::<Option<gst::Caps>>() .get::<Option<gst::Caps>>()
.expect("type checked upstream") .expect("type checked upstream")
@ -264,24 +264,19 @@ impl ObjectImpl for QuicSrc {
srcpad.mark_reconfigure(); srcpad.mark_reconfigure();
} }
"timeout" => { "timeout" => {
let mut settings = self.settings.lock().unwrap();
settings.timeout = value.get().expect("type checked upstream"); settings.timeout = value.get().expect("type checked upstream");
} }
"secure-connection" => { "secure-connection" => {
let mut settings = self.settings.lock().unwrap();
settings.secure_conn = value.get().expect("type checked upstream"); settings.secure_conn = value.get().expect("type checked upstream");
} }
"certificate-path" => { "certificate-path" => {
let value: String = value.get().unwrap(); let value: String = value.get().unwrap();
let mut settings = self.settings.lock().unwrap();
settings.certificate_path = Some(value.into()); settings.certificate_path = Some(value.into());
} }
"use-datagram" => { "use-datagram" => {
let mut settings = self.settings.lock().unwrap();
settings.use_datagram = value.get().expect("type checked upstream"); settings.use_datagram = value.get().expect("type checked upstream");
} }
"private-key-type" => { "private-key-type" => {
let mut settings = self.settings.lock().unwrap();
settings.private_key_type = value settings.private_key_type = value
.get::<QuicPrivateKeyType>() .get::<QuicPrivateKeyType>()
.expect("type checked upstream"); .expect("type checked upstream");
@ -291,45 +286,28 @@ impl ObjectImpl for QuicSrc {
} }
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() { match pspec.name() {
"server-name" => { "server-name" => settings.server_name.to_value(),
let settings = self.settings.lock().unwrap(); "server-address" => settings.server_address.to_string().to_value(),
settings.server_name.to_value() "server-port" => {
} let port = settings.server_port as u32;
"server-address" => { port.to_value()
let settings = self.settings.lock().unwrap();
settings.server_address.to_string().to_value()
} }
"alpn-protocols" => { "alpn-protocols" => {
let settings = self.settings.lock().unwrap();
let alpns = settings.alpns.iter().map(|v| v.as_str()); let alpns = settings.alpns.iter().map(|v| v.as_str());
gst::Array::new(alpns).to_value() gst::Array::new(alpns).to_value()
} }
"caps" => { "caps" => settings.caps.to_value(),
let settings = self.settings.lock().unwrap(); "timeout" => settings.timeout.to_value(),
settings.caps.to_value() "secure-connection" => settings.secure_conn.to_value(),
}
"timeout" => {
let settings = self.settings.lock().unwrap();
settings.timeout.to_value()
}
"secure-connection" => {
let settings = self.settings.lock().unwrap();
settings.secure_conn.to_value()
}
"certificate-path" => { "certificate-path" => {
let settings = self.settings.lock().unwrap();
let certpath = settings.certificate_path.as_ref(); let certpath = settings.certificate_path.as_ref();
certpath.and_then(|file| file.to_str()).to_value() certpath.and_then(|file| file.to_str()).to_value()
} }
"use-datagram" => { "use-datagram" => settings.use_datagram.to_value(),
let settings = self.settings.lock().unwrap(); "private-key-type" => settings.private_key_type.to_value(),
settings.use_datagram.to_value()
}
"private-key-type" => {
let settings = self.settings.lock().unwrap();
settings.private_key_type.to_value()
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -563,7 +541,11 @@ impl QuicSrc {
{ {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
server_addr = settings.server_address;
server_addr = make_socket_addr(
format!("{}:{}", settings.server_address, settings.server_port).as_str(),
)?;
server_name = settings.server_name.clone(); server_name = settings.server_name.clone();
alpns = settings.alpns.clone(); alpns = settings.alpns.clone();
use_datagram = settings.use_datagram; use_datagram = settings.use_datagram;

View file

@ -16,7 +16,7 @@ use quinn::{ClientConfig, Endpoint, ServerConfig};
use std::error::Error; use std::error::Error;
use std::fs::File; use std::fs::File;
use std::io::BufReader; use std::io::BufReader;
use std::net::{AddrParseError, SocketAddr}; use std::net::SocketAddr;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
@ -102,13 +102,19 @@ where
res res
} }
pub fn make_socket_addr(addr: &str) -> Result<SocketAddr, WaitError> {
match addr.parse::<SocketAddr>() {
Ok(address) => Ok(address),
Err(e) => Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Invalid address: {}", e]
))),
}
}
/* /*
* Following functions are taken from Quinn documentation/repository * Following functions are taken from Quinn documentation/repository
*/ */
pub fn make_socket_addr(addr: &str) -> Result<SocketAddr, AddrParseError> {
addr.parse::<SocketAddr>()
}
struct SkipServerVerification; struct SkipServerVerification;
impl SkipServerVerification { impl SkipServerVerification {

View file

@ -35,7 +35,8 @@ fn test_send_receive_without_datagram() {
let content = "Hello, world!\n".as_bytes(); let content = "Hello, world!\n".as_bytes();
thread::spawn(move || { thread::spawn(move || {
let mut h1 = gst_check::Harness::new("quicsink"); let mut h1 = gst_check::Harness::new_empty();
h1.add_parse("quicsink secure-connection=false");
h1.set_src_caps(gst::Caps::builder("text/plain").build()); h1.set_src_caps(gst::Caps::builder("text/plain").build());
@ -50,7 +51,8 @@ fn test_send_receive_without_datagram() {
drop(h1); drop(h1);
}); });
let mut h2 = gst_check::Harness::new("quicsrc"); let mut h2 = gst_check::Harness::new_empty();
h2.add_parse("quicsrc secure-connection=false");
h2.play(); h2.play();
@ -77,8 +79,8 @@ fn test_send_receive_with_datagram() {
// in the other test. We get a address already in use error otherwise. // in the other test. We get a address already in use error otherwise.
thread::spawn(move || { thread::spawn(move || {
let mut h1 = gst_check::Harness::new_empty(); let mut h1 = gst_check::Harness::new_empty();
h1.add_parse("quicsrc use-datagram=true server-address=127.0.0.1 server-port=6000 secure-connection=false");
h1.add_parse(format!("quicsrc use-datagram=true server-address=127.0.0.1:6000").as_str());
h1.play(); h1.play();
let buf = h1.pull_until_eos().unwrap().unwrap(); let buf = h1.pull_until_eos().unwrap().unwrap();
@ -94,8 +96,7 @@ fn test_send_receive_with_datagram() {
}); });
let mut h2 = gst_check::Harness::new_empty(); let mut h2 = gst_check::Harness::new_empty();
h2.add_parse("quicsink use-datagram=true client-address=127.0.0.1 client-port=6001 server-address=127.0.0.1 server-port=6000 secure-connection=false");
h2.add_parse(format!("quicsink use-datagram=true client-address=127.0.0.1:6001 server-address=127.0.0.1:6000").as_str());
h2.set_src_caps(gst::Caps::builder("text/plain").build()); h2.set_src_caps(gst::Caps::builder("text/plain").build());