diff --git a/net/quic/src/quicsink/imp.rs b/net/quic/src/quicsink/imp.rs index 95eb2cf0..6024ec36 100644 --- a/net/quic/src/quicsink/imp.rs +++ b/net/quic/src/quicsink/imp.rs @@ -16,12 +16,14 @@ use gst::{glib, prelude::*, subclass::prelude::*}; use gst_base::subclass::prelude::*; use once_cell::sync::Lazy; use quinn::{Connection, SendStream}; -use std::net::SocketAddr; use std::sync::Mutex; static DEFAULT_SERVER_NAME: &str = "localhost"; -static DEFAULT_SERVER_ADDR: &str = "127.0.0.1:5000"; -static DEFAULT_CLIENT_ADDR: &str = "127.0.0.1:5001"; +static DEFAULT_SERVER_ADDR: &str = "127.0.0.1"; +static DEFAULT_SERVER_PORT: u16 = 5000; +static DEFAULT_CLIENT_ADDR: &str = "127.0.0.1"; +static DEFAULT_CLIENT_PORT: u16 = 5001; + /* * For QUIC transport parameters * @@ -52,8 +54,10 @@ enum State { #[derive(Clone, Debug)] struct Settings { - client_address: SocketAddr, - server_address: SocketAddr, + client_address: String, + client_port: u16, + server_address: String, + server_port: u16, server_name: String, alpns: Vec, timeout: u32, @@ -64,8 +68,10 @@ struct Settings { impl Default for Settings { fn default() -> Self { Settings { - client_address: DEFAULT_CLIENT_ADDR.parse::().unwrap(), - server_address: DEFAULT_SERVER_ADDR.parse::().unwrap(), + client_address: DEFAULT_CLIENT_ADDR.to_string(), + client_port: DEFAULT_CLIENT_PORT, + server_address: DEFAULT_SERVER_ADDR.to_string(), + server_port: DEFAULT_SERVER_PORT, server_name: DEFAULT_SERVER_NAME.to_string(), alpns: vec![DEFAULT_ALPN.to_string()], timeout: DEFAULT_TIMEOUT, @@ -137,11 +143,25 @@ impl ObjectImpl for QuicSink { .build(), glib::ParamSpecString::builder("server-address") .nick("QUIC server address") - .blurb("Address of the QUIC server to connect to e.g. 127.0.0.1:5000") + .blurb("Address of the QUIC server to connect to e.g. 127.0.0.1") + .build(), + glib::ParamSpecUInt::builder("server-port") + .nick("QUIC server port") + .blurb("Port of the QUIC server to connect to e.g. 5000") + .maximum(65535) + .default_value(DEFAULT_SERVER_PORT as u32) + .readwrite() .build(), glib::ParamSpecString::builder("client-address") .nick("QUIC client address") - .blurb("Address to be used by this QUIC client e.g. 127.0.0.1:5001") + .blurb("Address to be used by this QUIC client e.g. 127.0.0.1") + .build(), + glib::ParamSpecUInt::builder("client-port") + .nick("QUIC client port") + .blurb("Port to be used by this QUIC client e.g. 5001") + .maximum(65535) + .default_value(DEFAULT_CLIENT_PORT as u32) + .readwrite() .build(), gst::ParamSpecArray::builder("alpn-protocols") .nick("QUIC ALPN values") @@ -172,43 +192,25 @@ impl ObjectImpl for QuicSink { } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + match pspec.name() { "server-name" => { - let mut settings = self.settings.lock().unwrap(); settings.server_name = value.get::().expect("type checked upstream"); } "server-address" => { - let addr = value.get::().expect("type checked upstream"); - let addr = make_socket_addr(&addr); - match addr { - Ok(server_address) => { - let mut settings = self.settings.lock().unwrap(); - settings.server_address = server_address; - } - Err(e) => gst::element_imp_error!( - self, - gst::ResourceError::Failed, - ["Invalid server address: {}", e] - ), - } + settings.server_address = value.get::().expect("type checked upstream"); + } + "server-port" => { + settings.server_port = value.get::().expect("type checked upstream") as u16; } "client-address" => { - let addr = value.get::().expect("type checked upstream"); - let addr = make_socket_addr(&addr); - match addr { - Ok(client_address) => { - let mut settings = self.settings.lock().unwrap(); - settings.client_address = client_address; - } - Err(e) => gst::element_imp_error!( - self, - gst::ResourceError::Failed, - ["Invalid client address: {}", e] - ), - } + settings.client_address = value.get::().expect("type checked upstream"); + } + "client-port" => { + settings.client_port = value.get::().expect("type checked upstream") as u16; } "alpn-protocols" => { - let mut settings = self.settings.lock().unwrap(); settings.alpns = value .get::() .expect("type checked upstream") @@ -222,15 +224,12 @@ impl ObjectImpl for QuicSink { .collect::>(); } "timeout" => { - let mut settings = self.settings.lock().unwrap(); settings.timeout = value.get().expect("type checked upstream"); } "secure-connection" => { - let mut settings = self.settings.lock().unwrap(); settings.secure_conn = value.get().expect("type checked upstream"); } "use-datagram" => { - let mut settings = self.settings.lock().unwrap(); settings.use_datagram = value.get().expect("type checked upstream"); } _ => unimplemented!(), @@ -238,36 +237,27 @@ impl ObjectImpl for QuicSink { } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { - "server-name" => { - let settings = self.settings.lock().unwrap(); - settings.server_name.to_value() + "server-name" => settings.server_name.to_value(), + "server-address" => settings.server_address.to_string().to_value(), + "server-port" => { + let port = settings.server_port as u32; + port.to_value() } - "server-address" => { - let settings = self.settings.lock().unwrap(); - settings.server_address.to_string().to_value() - } - "client-address" => { - let settings = self.settings.lock().unwrap(); - settings.client_address.to_string().to_value() + "client-address" => settings.client_address.to_string().to_value(), + "client-port" => { + let port = settings.client_port as u32; + port.to_value() } "alpn-protocols" => { - let settings = self.settings.lock().unwrap(); let alpns = settings.alpns.iter().map(|v| v.as_str()); gst::Array::new(alpns).to_value() } - "timeout" => { - let settings = self.settings.lock().unwrap(); - settings.timeout.to_value() - } - "secure-connection" => { - let settings = self.settings.lock().unwrap(); - settings.secure_conn.to_value() - } - "use-datagram" => { - let settings = self.settings.lock().unwrap(); - settings.use_datagram.to_value() - } + "timeout" => settings.timeout.to_value(), + "secure-connection" => settings.secure_conn.to_value(), + "use-datagram" => settings.use_datagram.to_value(), _ => unimplemented!(), } } @@ -465,8 +455,14 @@ impl QuicSink { { let settings = self.settings.lock().unwrap(); - client_addr = settings.client_address; - server_addr = settings.server_address; + + client_addr = make_socket_addr( + format!("{}:{}", settings.client_address, settings.client_port).as_str(), + )?; + server_addr = make_socket_addr( + format!("{}:{}", settings.server_address, settings.server_port).as_str(), + )?; + server_name = settings.server_name.clone(); alpns = settings.alpns.clone(); use_datagram = settings.use_datagram; diff --git a/net/quic/src/quicsrc/imp.rs b/net/quic/src/quicsrc/imp.rs index 8fc48803..c45a21ea 100644 --- a/net/quic/src/quicsrc/imp.rs +++ b/net/quic/src/quicsrc/imp.rs @@ -19,12 +19,13 @@ use gst_base::subclass::base_src::CreateSuccess; use gst_base::subclass::prelude::*; use once_cell::sync::Lazy; use quinn::{Connection, ConnectionError, RecvStream}; -use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Mutex; static DEFAULT_SERVER_NAME: &str = "localhost"; -static DEFAULT_SERVER_ADDR: &str = "127.0.0.1:5000"; +static DEFAULT_SERVER_ADDR: &str = "127.0.0.1"; +static DEFAULT_SERVER_PORT: u16 = 5000; + /* * For QUIC transport parameters * @@ -60,7 +61,8 @@ enum State { #[derive(Clone, Debug)] struct Settings { - server_address: SocketAddr, + server_address: String, + server_port: u16, server_name: String, alpns: Vec, timeout: u32, @@ -74,7 +76,8 @@ struct Settings { impl Default for Settings { fn default() -> Self { Settings { - server_address: DEFAULT_SERVER_ADDR.parse::().unwrap(), + server_address: DEFAULT_SERVER_ADDR.to_string(), + server_port: DEFAULT_SERVER_PORT, server_name: DEFAULT_SERVER_NAME.to_string(), alpns: vec![DEFAULT_ALPN.to_string()], timeout: DEFAULT_TIMEOUT, @@ -176,7 +179,14 @@ impl ObjectImpl for QuicSrc { .build(), glib::ParamSpecString::builder("server-address") .nick("QUIC server address") - .blurb("Address of the QUIC server to connect to e.g. 127.0.0.1:5000") + .blurb("Address of the QUIC server e.g. 127.0.0.1") + .build(), + glib::ParamSpecUInt::builder("server-port") + .nick("QUIC server port") + .blurb("Port of the QUIC server e.g. 5000") + .maximum(65535) + .default_value(DEFAULT_SERVER_PORT as u32) + .readwrite() .build(), gst::ParamSpecArray::builder("alpn-protocols") .nick("QUIC ALPN values") @@ -219,28 +229,19 @@ impl ObjectImpl for QuicSrc { } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + match pspec.name() { "server-name" => { - let mut settings = self.settings.lock().unwrap(); settings.server_name = value.get::().expect("type checked upstream"); } "server-address" => { - let addr = value.get::().expect("type checked upstream"); - let addr = make_socket_addr(&addr); - match addr { - Ok(server_address) => { - let mut settings = self.settings.lock().unwrap(); - settings.server_address = server_address; - } - Err(e) => gst::element_imp_error!( - self, - gst::ResourceError::Failed, - ["Invalid server address: {}", e] - ), - } + settings.server_address = value.get::().expect("type checked upstream"); + } + "server-port" => { + settings.server_port = value.get::().expect("type checked upstream") as u16; } "alpn-protocols" => { - let mut settings = self.settings.lock().unwrap(); settings.alpns = value .get::() .expect("type checked upstream") @@ -254,7 +255,6 @@ impl ObjectImpl for QuicSrc { .collect::>() } "caps" => { - let mut settings = self.settings.lock().unwrap(); settings.caps = value .get::>() .expect("type checked upstream") @@ -264,24 +264,19 @@ impl ObjectImpl for QuicSrc { srcpad.mark_reconfigure(); } "timeout" => { - let mut settings = self.settings.lock().unwrap(); settings.timeout = value.get().expect("type checked upstream"); } "secure-connection" => { - let mut settings = self.settings.lock().unwrap(); settings.secure_conn = value.get().expect("type checked upstream"); } "certificate-path" => { let value: String = value.get().unwrap(); - let mut settings = self.settings.lock().unwrap(); settings.certificate_path = Some(value.into()); } "use-datagram" => { - let mut settings = self.settings.lock().unwrap(); settings.use_datagram = value.get().expect("type checked upstream"); } "private-key-type" => { - let mut settings = self.settings.lock().unwrap(); settings.private_key_type = value .get::() .expect("type checked upstream"); @@ -291,45 +286,28 @@ impl ObjectImpl for QuicSrc { } fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + match pspec.name() { - "server-name" => { - let settings = self.settings.lock().unwrap(); - settings.server_name.to_value() - } - "server-address" => { - let settings = self.settings.lock().unwrap(); - settings.server_address.to_string().to_value() + "server-name" => settings.server_name.to_value(), + "server-address" => settings.server_address.to_string().to_value(), + "server-port" => { + let port = settings.server_port as u32; + port.to_value() } "alpn-protocols" => { - let settings = self.settings.lock().unwrap(); let alpns = settings.alpns.iter().map(|v| v.as_str()); gst::Array::new(alpns).to_value() } - "caps" => { - let settings = self.settings.lock().unwrap(); - settings.caps.to_value() - } - "timeout" => { - let settings = self.settings.lock().unwrap(); - settings.timeout.to_value() - } - "secure-connection" => { - let settings = self.settings.lock().unwrap(); - settings.secure_conn.to_value() - } + "caps" => settings.caps.to_value(), + "timeout" => settings.timeout.to_value(), + "secure-connection" => settings.secure_conn.to_value(), "certificate-path" => { - let settings = self.settings.lock().unwrap(); let certpath = settings.certificate_path.as_ref(); certpath.and_then(|file| file.to_str()).to_value() } - "use-datagram" => { - let settings = self.settings.lock().unwrap(); - settings.use_datagram.to_value() - } - "private-key-type" => { - let settings = self.settings.lock().unwrap(); - settings.private_key_type.to_value() - } + "use-datagram" => settings.use_datagram.to_value(), + "private-key-type" => settings.private_key_type.to_value(), _ => unimplemented!(), } } @@ -563,7 +541,11 @@ impl QuicSrc { { let settings = self.settings.lock().unwrap(); - server_addr = settings.server_address; + + server_addr = make_socket_addr( + format!("{}:{}", settings.server_address, settings.server_port).as_str(), + )?; + server_name = settings.server_name.clone(); alpns = settings.alpns.clone(); use_datagram = settings.use_datagram; diff --git a/net/quic/src/utils.rs b/net/quic/src/utils.rs index 1d2ab185..04de1187 100644 --- a/net/quic/src/utils.rs +++ b/net/quic/src/utils.rs @@ -16,7 +16,7 @@ use quinn::{ClientConfig, Endpoint, ServerConfig}; use std::error::Error; use std::fs::File; use std::io::BufReader; -use std::net::{AddrParseError, SocketAddr}; +use std::net::SocketAddr; use std::path::PathBuf; use std::sync::{Arc, Mutex}; use std::time::Duration; @@ -102,13 +102,19 @@ where res } +pub fn make_socket_addr(addr: &str) -> Result { + match addr.parse::() { + Ok(address) => Ok(address), + Err(e) => Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Invalid address: {}", e] + ))), + } +} + /* * Following functions are taken from Quinn documentation/repository */ -pub fn make_socket_addr(addr: &str) -> Result { - addr.parse::() -} - struct SkipServerVerification; impl SkipServerVerification { diff --git a/net/quic/tests/quic.rs b/net/quic/tests/quic.rs index 11be17ac..80c7cc35 100644 --- a/net/quic/tests/quic.rs +++ b/net/quic/tests/quic.rs @@ -35,7 +35,8 @@ fn test_send_receive_without_datagram() { let content = "Hello, world!\n".as_bytes(); thread::spawn(move || { - let mut h1 = gst_check::Harness::new("quicsink"); + let mut h1 = gst_check::Harness::new_empty(); + h1.add_parse("quicsink secure-connection=false"); h1.set_src_caps(gst::Caps::builder("text/plain").build()); @@ -50,7 +51,8 @@ fn test_send_receive_without_datagram() { drop(h1); }); - let mut h2 = gst_check::Harness::new("quicsrc"); + let mut h2 = gst_check::Harness::new_empty(); + h2.add_parse("quicsrc secure-connection=false"); h2.play(); @@ -77,8 +79,8 @@ fn test_send_receive_with_datagram() { // in the other test. We get a address already in use error otherwise. thread::spawn(move || { let mut h1 = gst_check::Harness::new_empty(); + h1.add_parse("quicsrc use-datagram=true server-address=127.0.0.1 server-port=6000 secure-connection=false"); - h1.add_parse(format!("quicsrc use-datagram=true server-address=127.0.0.1:6000").as_str()); h1.play(); let buf = h1.pull_until_eos().unwrap().unwrap(); @@ -94,8 +96,7 @@ fn test_send_receive_with_datagram() { }); let mut h2 = gst_check::Harness::new_empty(); - - h2.add_parse(format!("quicsink use-datagram=true client-address=127.0.0.1:6001 server-address=127.0.0.1:6000").as_str()); + h2.add_parse("quicsink use-datagram=true client-address=127.0.0.1 client-port=6001 server-address=127.0.0.1 server-port=6000 secure-connection=false"); h2.set_src_caps(gst::Caps::builder("text/plain").build());