From ef21a6aa3b6ae40977980707da716586ce6e165b Mon Sep 17 00:00:00 2001 From: Andoni Morales Alastruey Date: Sun, 13 Oct 2024 01:30:02 -0400 Subject: [PATCH] quinn: add a new WebTransport client element Part-of: --- Cargo.lock | 107 +++++ net/quinn/Cargo.toml | 3 + net/quinn/src/common.rs | 4 + net/quinn/src/lib.rs | 2 + net/quinn/src/quinnquicsink/imp.rs | 5 +- net/quinn/src/quinnquicsrc/imp.rs | 5 +- net/quinn/src/quinnwtclientsrc/imp.rs | 610 ++++++++++++++++++++++++++ net/quinn/src/quinnwtclientsrc/mod.rs | 39 ++ net/quinn/src/utils.rs | 59 ++- 9 files changed, 808 insertions(+), 26 deletions(-) create mode 100644 net/quinn/src/quinnwtclientsrc/imp.rs create mode 100644 net/quinn/src/quinnwtclientsrc/mod.rs diff --git a/Cargo.lock b/Cargo.lock index b009cd78..c66bfbdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1056,6 +1056,12 @@ dependencies = [ "thiserror 2.0.3", ] +[[package]] +name = "cesu8" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c" + [[package]] name = "cexpr" version = "0.6.0" @@ -1215,6 +1221,16 @@ version = "1.0.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990" +[[package]] +name = "combine" +version = "4.6.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd" +dependencies = [ + "bytes", + "memchr", +] + [[package]] name = "concurrent-queue" version = "2.5.0" @@ -2874,6 +2890,7 @@ dependencies = [ "gstreamer-base", "gstreamer-check", "itertools 0.12.1", + "once_cell", "quinn", "quinn-proto", "rcgen", @@ -2883,6 +2900,8 @@ dependencies = [ "serial_test", "thiserror 2.0.3", "tokio", + "url", + "web-transport-quinn", ] [[package]] @@ -4549,6 +4568,26 @@ version = "1.0.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674" +[[package]] +name = "jni" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6df18c2e3db7e453d3c6ac5b3e9d5182664d28788126d39b91f2d1e22b017ec" +dependencies = [ + "cesu8", + "combine", + "jni-sys", + "log", + "thiserror 1.0.69", + "walkdir", +] + +[[package]] +name = "jni-sys" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" + [[package]] name = "jobserver" version = "0.1.32" @@ -5979,6 +6018,7 @@ dependencies = [ "rustc-hash 2.0.0", "rustls 0.23.19", "rustls-pki-types", + "rustls-platform-verifier", "slab", "thiserror 2.0.3", "tinyvec", @@ -6568,6 +6608,33 @@ dependencies = [ "web-time", ] +[[package]] +name = "rustls-platform-verifier" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a4c7dc240fec5517e6c4eab3310438636cfe6391dfc345ba013109909a90d136" +dependencies = [ + "core-foundation 0.9.4", + "core-foundation-sys", + "jni", + "log", + "once_cell", + "rustls 0.23.17", + "rustls-native-certs 0.7.3", + "rustls-platform-verifier-android", + "rustls-webpki 0.102.8", + "security-framework 2.11.1", + "security-framework-sys", + "webpki-root-certs", + "windows-sys 0.52.0", +] + +[[package]] +name = "rustls-platform-verifier-android" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f" + [[package]] name = "rustls-webpki" version = "0.101.7" @@ -6691,6 +6758,7 @@ dependencies = [ "core-foundation 0.9.4", "core-foundation-sys", "libc", + "num-bigint", "security-framework-sys", ] @@ -8146,6 +8214,36 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-transport-proto" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a3806ea43df5817f0d90618c842d28db5946bc18a5db0659b2275c2be48d472" +dependencies = [ + "bytes", + "http 1.1.0", + "thiserror 1.0.69", + "url", +] + +[[package]] +name = "web-transport-quinn" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3020b51cda10472a365e42d9a701916d4f04d74cc743de08246ef6a421c2d137" +dependencies = [ + "bytes", + "futures", + "http 1.1.0", + "log", + "quinn", + "quinn-proto", + "thiserror 1.0.69", + "tokio", + "url", + "web-transport-proto", +] + [[package]] name = "webm-iterable" version = "0.6.3" @@ -8165,6 +8263,15 @@ dependencies = [ "untrusted", ] +[[package]] +name = "webpki-root-certs" +version = "0.26.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cd5da49bdf1f30054cfe0b8ce2958b8fbeb67c4d82c8967a598af481bef255c" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "webpki-roots" version = "0.25.4" diff --git a/net/quinn/Cargo.toml b/net/quinn/Cargo.toml index e77f6ded..98137fa4 100644 --- a/net/quinn/Cargo.toml +++ b/net/quinn/Cargo.toml @@ -27,6 +27,9 @@ thiserror = "2" async-channel = "2.3" itertools = "0.12" env_logger = "0.11" +web-transport-quinn = "0.3.3" +url = "2.5.2" +once_cell = "1.20.2" [dev-dependencies] gst-check = { workspace = true, features = ["v1_20"] } diff --git a/net/quinn/src/common.rs b/net/quinn/src/common.rs index 877e557f..9468482a 100644 --- a/net/quinn/src/common.rs +++ b/net/quinn/src/common.rs @@ -24,6 +24,7 @@ pub(crate) static DEFAULT_MIN_UDP_PAYLOAD_SIZE: u16 = 1200; pub(crate) static DEFAULT_MAX_UDP_PAYLOAD_SIZE: u16 = 65527; pub(crate) static DEFAULT_DROP_BUFFER_FOR_DATAGRAM: bool = false; pub(crate) static DEFAULT_MAX_CONCURRENT_UNI_STREAMS: VarInt = VarInt::from_u32(32); +pub(crate) static DEFAULT_USE_DATAGRAM: bool = false; /* * For QUIC transport parameters @@ -36,6 +37,7 @@ pub(crate) static DEFAULT_MAX_CONCURRENT_UNI_STREAMS: VarInt = VarInt::from_u32( pub(crate) const DEFAULT_ALPN: &str = "gst-quinn"; pub(crate) const DEFAULT_TIMEOUT: u32 = 15; pub(crate) const DEFAULT_SECURE_CONNECTION: bool = true; +pub(crate) const HTTP3_ALPN: &str = "h3"; #[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)] #[repr(u32)] @@ -56,6 +58,7 @@ pub struct QuinnQuicTransportConfig { pub max_udp_payload_size: u16, pub min_mtu: u16, pub upper_bound_mtu: u16, + pub max_concurrent_bidi_streams: VarInt, pub max_concurrent_uni_streams: VarInt, pub send_window: u64, pub stream_receive_window: VarInt, @@ -78,6 +81,7 @@ impl Default for QuinnQuicTransportConfig { max_udp_payload_size: DEFAULT_MAX_UDP_PAYLOAD_SIZE, min_mtu: DEFAULT_MINIMUM_MTU, upper_bound_mtu: DEFAULT_UPPER_BOUND_MTU, + max_concurrent_bidi_streams: VarInt::from(0u32), max_concurrent_uni_streams: DEFAULT_MAX_CONCURRENT_UNI_STREAMS, send_window: (8 * STREAM_RWND).into(), stream_receive_window: STREAM_RWND.into(), diff --git a/net/quinn/src/lib.rs b/net/quinn/src/lib.rs index 5ee69b31..0142ffa7 100644 --- a/net/quinn/src/lib.rs +++ b/net/quinn/src/lib.rs @@ -28,6 +28,7 @@ mod quinnquicsink; mod quinnquicsrc; mod quinnroqdemux; mod quinnroqmux; +mod quinnwtclientsrc; mod utils; fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { @@ -42,6 +43,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { quinnroqdemux::register(plugin)?; quinnquicsink::register(plugin)?; quinnquicsrc::register(plugin)?; + quinnwtclientsrc::register(plugin)?; Ok(()) } diff --git a/net/quinn/src/quinnquicsink/imp.rs b/net/quinn/src/quinnquicsink/imp.rs index 627ffe3a..678e7d45 100644 --- a/net/quinn/src/quinnquicsink/imp.rs +++ b/net/quinn/src/quinnquicsink/imp.rs @@ -446,7 +446,7 @@ impl ObjectImpl for QuinnQuicSink { match *state { State::Started(ref state) => { let connection = state.connection.clone(); - get_stats(Some(connection)).to_value() + get_stats(Some(connection.stats())).to_value() } State::Stopped => get_stats(None).to_value(), } @@ -721,13 +721,14 @@ impl QuinnQuicSink { QuinnQuicEndpointConfig { server_addr, server_name, - client_addr, + client_addr: Some(client_addr), secure_conn, alpns, certificate_file, private_key_file, keep_alive_interval, transport_config, + with_client_auth: true, }, ) }; diff --git a/net/quinn/src/quinnquicsrc/imp.rs b/net/quinn/src/quinnquicsrc/imp.rs index 2e970f2d..cfe5e847 100644 --- a/net/quinn/src/quinnquicsrc/imp.rs +++ b/net/quinn/src/quinnquicsrc/imp.rs @@ -483,7 +483,7 @@ impl ObjectImpl for QuinnQuicSrc { match *state { State::Started(ref state) => { let connection = state.connection.clone(); - get_stats(Some(connection)).to_value() + get_stats(Some(connection.stats())).to_value() } State::Stopped => get_stats(None).to_value(), } @@ -918,13 +918,14 @@ impl QuinnQuicSrc { QuinnQuicEndpointConfig { server_addr, server_name, - client_addr, + client_addr: Some(client_addr), secure_conn, alpns, certificate_file, private_key_file, keep_alive_interval, transport_config, + with_client_auth: false, }, ) }; diff --git a/net/quinn/src/quinnwtclientsrc/imp.rs b/net/quinn/src/quinnwtclientsrc/imp.rs new file mode 100644 index 00000000..4c32984e --- /dev/null +++ b/net/quinn/src/quinnwtclientsrc/imp.rs @@ -0,0 +1,610 @@ +// Copyright (C) 2024, Fluendo S.A. +// Author: Andoni Morales Alastruey +// +// Copyright (C) 2024, Asymptotic Inc. +// Author: Sanchayan Maity +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use crate::utils::{ + client_endpoint, get_stats, make_socket_addr, server_endpoint, wait, Canceller, + QuinnQuicEndpointConfig, WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG, +}; +use crate::{common::*, utils}; +use bytes::{buf, Bytes}; +use futures::future; +use gst::{glib, prelude::*, subclass::prelude::*}; +use gst_base::prelude::*; +use gst_base::subclass::base_src::CreateSuccess; +use gst_base::subclass::prelude::*; +use once_cell::sync::Lazy; +use quinn::{Connection, ConnectionError, TransportConfig}; +use rustls::server; +use std::borrow::Borrow; +use std::fmt::Error; +use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::path::PathBuf; +use std::sync::Mutex; +use tokio::net::lookup_host; +use web_transport_quinn::{ReadError, RecvStream, Session, SessionError, ALPN}; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "quinnwtclientsrc", + gst::DebugColorFlags::empty(), + Some("Quinn WebTransport client source"), + ) +}); + +struct Started { + session: Session, + stream: Option, +} + +#[derive(Default)] +enum State { + #[default] + Stopped, + Started(Started), +} + +#[derive(Debug)] +struct Settings { + bind_address: String, + bind_port: u16, + caps: gst::Caps, + certificate_file: Option, + keep_alive_interval: u64, + timeout: u32, + transport_config: QuinnQuicTransportConfig, + url: String, + use_datagram: bool, +} + +impl Default for Settings { + fn default() -> Self { + let mut transport_config = QuinnQuicTransportConfig::default(); + // Required for the WebTransport handshake + transport_config.max_concurrent_bidi_streams = 2u32.into(); + transport_config.max_concurrent_uni_streams = 1u32.into(); + + Settings { + caps: gst::Caps::new_any(), + bind_address: DEFAULT_BIND_ADDR.to_string(), + bind_port: DEFAULT_BIND_PORT, + certificate_file: None, + keep_alive_interval: 0, + timeout: DEFAULT_TIMEOUT, + transport_config, + url: DEFAULT_ADDR.to_string(), + use_datagram: DEFAULT_USE_DATAGRAM, + } + } +} + +pub struct QuinnWebTransportClientSrc { + settings: Mutex, + state: Mutex, + canceller: Mutex, +} + +impl Default for QuinnWebTransportClientSrc { + fn default() -> Self { + Self { + settings: Mutex::new(Settings::default()), + state: Mutex::new(State::default()), + canceller: Mutex::new(utils::Canceller::default()), + } + } +} + +impl GstObjectImpl for QuinnWebTransportClientSrc {} + +impl ElementImpl for QuinnWebTransportClientSrc { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "Quinn WebTransport Client Source", + "Source/Network/QUIC", + "Receive data over the network via WebTransport", + "Andoni Morales Alastruey ", + ) + }); + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &gst::Caps::new_any(), + ) + .unwrap(); + + vec![src_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl ObjectImpl for QuinnWebTransportClientSrc { + fn constructed(&self) { + self.parent_constructed(); + self.obj().set_format(gst::Format::Time); + self.obj().set_live(true); + self.obj().set_do_timestamp(true); + } + + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecBoxed::builder::("caps") + .nick("caps") + .blurb("The caps of the source pad") + .build(), + glib::ParamSpecString::builder("certificate-file") + .nick("Certificate file") + .blurb("Path to certificate chain in single file") + .build(), + glib::ParamSpecUInt64::builder("keep-alive-interval") + .nick("QUIC connection keep alive interval in ms") + .blurb("Keeps QUIC connection alive by periodically pinging the server. Value set in ms, 0 disables this feature") + .default_value(0) + .readwrite() + .build(), + glib::ParamSpecUInt::builder("timeout") + .nick("Timeout") + .blurb("Value in seconds to timeout WebTransport endpoint requests (0 = No timeout).") + .maximum(3600) + .default_value(DEFAULT_TIMEOUT) + .readwrite() + .build(), + glib::ParamSpecString::builder("url") + .nick("Server URL") + .blurb("URL of the HTTP/3 server to connect to.") + .build(), + glib::ParamSpecBoolean::builder("use-datagram") + .nick("Use datagram") + .blurb("Use datagram for lower latency, unreliable messaging") + .default_value(false) + .build(), + glib::ParamSpecBoxed::builder::("stats") + .nick("Connection statistics") + .blurb("Connection statistics") + .read_only() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + let mut settings = self.settings.lock().unwrap(); + + match pspec.name() { + "caps" => { + settings.caps = value + .get::>() + .expect("type checked upstream") + .unwrap_or_else(gst::Caps::new_any); + + let srcpad = self.obj().static_pad("src").expect("source pad expected"); + srcpad.mark_reconfigure(); + } + "certificate-file" => { + let value: String = value.get().unwrap(); + settings.certificate_file = Some(value.into()); + } + "keep-alive-interval" => { + settings.keep_alive_interval = value.get().expect("type checked upstream"); + } + "timeout" => { + settings.timeout = value.get().expect("type checked upstream"); + } + "url" => { + settings.url = value.get::().expect("type checked upstream"); + } + "use-datagram" => { + settings.use_datagram = value.get::().expect("type checked upstream"); + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + let settings = self.settings.lock().unwrap(); + + match pspec.name() { + "caps" => settings.caps.to_value(), + "certificate-file" => { + let certfile = settings.certificate_file.as_ref(); + certfile.and_then(|file| file.to_str()).to_value() + } + "keep-alive-interval" => settings.keep_alive_interval.to_value(), + "timeout" => settings.timeout.to_value(), + "url" => settings.url.to_value(), + "use-datagram" => settings.use_datagram.to_value(), + "stats" => { + let state = self.state.lock().unwrap(); + match *state { + State::Started(ref state) => get_stats(Some(state.session.stats())).to_value(), + State::Stopped => get_stats(None).to_value(), + } + } + _ => unimplemented!(), + } + } +} + +#[glib::object_subclass] +impl ObjectSubclass for QuinnWebTransportClientSrc { + const NAME: &'static str = "GstQuinnWebTransportClientSrc"; + type Type = super::QuinnWebTransportClientSrc; + type ParentType = gst_base::BaseSrc; +} + +impl BaseSrcImpl for QuinnWebTransportClientSrc { + fn is_seekable(&self) -> bool { + false + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + let settings = self.settings.lock().unwrap(); + let timeout = settings.timeout; + drop(settings); + + let mut state = self.state.lock().unwrap(); + + if let State::Started { .. } = *state { + unreachable!("QuinnWebTransportClientSrc already started"); + } + + match wait(&self.canceller, self.init_session(), timeout) { + Ok(Ok((c, s))) => { + *state = State::Started(Started { + session: c, + stream: s, + }); + + gst::info!(CAT, imp = self, "Started"); + + Ok(()) + } + Ok(Err(e)) | Err(e) => match e { + WaitError::FutureAborted => { + gst::warning!(CAT, imp = self, "Connection aborted"); + Ok(()) + } + WaitError::FutureError(err) => { + gst::error!(CAT, imp = self, "Connection request failed: {}", err); + Err(gst::error_msg!( + gst::ResourceError::Failed, + ["Connection request failed: {}", err] + )) + } + }, + } + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + let mut state = self.state.lock().unwrap(); + + if let State::Started(ref mut state) = *state { + let session = &state.session; + + session.close( + CONNECTION_CLOSE_CODE.into(), + CONNECTION_CLOSE_MSG.as_bytes(), + ); + } + + *state = State::Stopped; + + Ok(()) + } + + fn query(&self, query: &mut gst::QueryRef) -> bool { + if let gst::QueryViewMut::Scheduling(q) = query.view_mut() { + q.set( + gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED, + 1, + -1, + 0, + ); + q.add_scheduling_modes(&[gst::PadMode::Pull, gst::PadMode::Push]); + return true; + } + + BaseSrcImplExt::parent_query(self, query) + } + + fn create( + &self, + offset: u64, + buffer: Option<&mut gst::BufferRef>, + length: u32, + ) -> Result { + let data = self.get(offset, u64::from(length)); + + match data { + Ok(bytes) => { + if bytes.is_empty() { + gst::debug!(CAT, imp = self, "End of stream"); + return Err(gst::FlowError::Eos); + } + + if let Some(buffer) = buffer { + if let Err(copied_bytes) = buffer.copy_from_slice(0, bytes.as_ref()) { + buffer.set_size(copied_bytes); + } + Ok(CreateSuccess::FilledBuffer) + } else { + Ok(CreateSuccess::NewBuffer(gst::Buffer::from_slice(bytes))) + } + } + Err(None) => Err(gst::FlowError::Flushing), + Err(Some(err)) => { + gst::error!(CAT, imp = self, "Could not GET: {}", err); + Err(gst::FlowError::Error) + } + } + } + + fn unlock(&self) -> Result<(), gst::ErrorMessage> { + let mut canceller = self.canceller.lock().unwrap(); + canceller.abort(); + Ok(()) + } + + fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> { + let mut canceller = self.canceller.lock().unwrap(); + if matches!(&*canceller, Canceller::Cancelled) { + *canceller = Canceller::None; + } + Ok(()) + } + + fn caps(&self, filter: Option<&gst::Caps>) -> Option { + let settings = self.settings.lock().unwrap(); + + let mut tmp_caps = settings.caps.clone(); + + gst::debug!(CAT, imp = self, "Advertising our own caps: {:?}", &tmp_caps); + + if let Some(filter_caps) = filter { + gst::debug!( + CAT, + imp = self, + "Intersecting with filter caps: {:?}", + &filter_caps + ); + + tmp_caps = filter_caps.intersect_with_mode(&tmp_caps, gst::CapsIntersectMode::First); + }; + + gst::debug!(CAT, imp = self, "Returning caps: {:?}", &tmp_caps); + + Some(tmp_caps) + } +} + +impl QuinnWebTransportClientSrc { + async fn read_stream( + &self, + stream: &mut RecvStream, + length: usize, + ) -> Result { + match stream.read_chunk(length, true).await { + Ok(Some(chunk)) => Ok(chunk.bytes), + Ok(None) => Ok(Bytes::new()), + Err(err) => match err { + ReadError::SessionError(conn_err) => match conn_err { + SessionError::ConnectionError(ce) => { + gst::info!(CAT, imp = self, "Connection error, {}", ce); + Ok(Bytes::new()) + } + SessionError::SendDatagramError(sde) => { + gst::info!(CAT, imp = self, "Send datagram error, {}", sde); + Ok(Bytes::new()) + } + SessionError::WebTransportError(wte) => { + gst::info!(CAT, imp = self, "WebTransport error, {}", wte); + Ok(Bytes::new()) + } + }, + ReadError::ClosedStream => { + gst::info!(CAT, imp = self, "Stream closed"); + Ok(Bytes::new()) + } + ReadError::Reset(r) => { + gst::info!(CAT, imp = self, "Reset, {}", r); + Ok(Bytes::new()) + } + ReadError::InvalidReset(ir) => { + gst::info!(CAT, imp = self, "Invalid Reset, {}", ir); + Ok(Bytes::new()) + } + _ => Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Stream read error: {}", err] + ))), + }, + } + } + + async fn read_datagram(&self, session: &Session) -> Result { + match session.read_datagram().await { + Ok(bytes) => Ok(bytes), + Err(err) => match err { + SessionError::ConnectionError(ce) => { + gst::info!(CAT, imp = self, "Connection error, {}", ce); + Ok(Bytes::new()) + } + SessionError::SendDatagramError(de) => { + gst::info!(CAT, imp = self, "Error sending datagram, {}", de); + Ok(Bytes::new()) + } + SessionError::WebTransportError(we) => { + gst::info!(CAT, imp = self, "WebTransport error, {}", we); + Ok(Bytes::new()) + } + }, + } + } + + fn get(&self, _offset: u64, length: u64) -> Result> { + let settings = self.settings.lock().unwrap(); + let timeout = settings.timeout; + let use_datagram = settings.use_datagram; + drop(settings); + + let mut state = self.state.lock().unwrap(); + + let (session, stream) = match *state { + State::Started(Started { + ref session, + ref mut stream, + }) => (session, stream), + State::Stopped => { + return Err(Some(gst::error_msg!( + gst::LibraryError::Failed, + ["Cannot get data before start"] + ))); + } + }; + + let future = async { + if use_datagram { + self.read_datagram(session).await + } else { + let recv = stream.as_mut().unwrap(); + self.read_stream(recv, length as usize).await + } + }; + + match wait(&self.canceller, future, timeout) { + Ok(Ok(bytes)) => Ok(bytes), + Ok(Err(e)) | Err(e) => match e { + WaitError::FutureAborted => { + gst::warning!(CAT, imp = self, "Read from stream request aborted"); + Err(None) + } + WaitError::FutureError(e) => { + gst::error!(CAT, imp = self, "Failed to read from stream: {}", e); + Err(Some(e)) + } + }, + } + } + + async fn init_session(&self) -> Result<(Session, Option), WaitError> { + let (use_datagram, url, mut endpoint_config) = { + let settings = self.settings.lock().unwrap(); + + let client_addr = make_socket_addr( + format!("{}:{}", settings.bind_address, settings.bind_port).as_str(), + )?; + + let url = url::Url::parse(&settings.url).map_err(|err| { + WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to parse URL: {}", err] + )) + })?; + + ( + settings.use_datagram, + url.clone(), + QuinnQuicEndpointConfig { + server_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 4443), // This will be filled in correctly later + server_name: DEFAULT_SERVER_NAME.to_string(), + client_addr: Some(client_addr), + secure_conn: true, + alpns: vec![HTTP3_ALPN.to_string()], + certificate_file: settings.certificate_file.clone(), + private_key_file: None, + keep_alive_interval: settings.keep_alive_interval, + transport_config: settings.transport_config, + with_client_auth: false, + }, + ) + }; + + let server_port = url.port().unwrap_or(443); + + let host = url.host_str().ok_or_else(|| { + WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Cannot parse host for URL: {}", url.as_str()] + )) + })?; + + // Look up the DNS entry. + let mut remotes = lookup_host((host, server_port)).await.map_err(|_| { + WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Cannot resolve host name for URL: {}", url.as_str()] + )) + })?; + + // Use the first entry. + endpoint_config.server_addr = match remotes.next() { + Some(remote) => Ok(remote), + None => Err(WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Cannot resolve host name for URL: {}", url.as_str()] + ))), + }?; + + let client = client_endpoint(&endpoint_config).map_err(|err| { + WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to configure endpoint: {}", err] + )) + })?; + + let session = web_transport_quinn::connect(&client, &url) + .await + .map_err(|err| { + WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to connect to server: {}", err] + )) + })?; + + let stream = if !use_datagram { + let (_, stream) = session.accept_bi().await.map_err(|err| { + WaitError::FutureError(gst::error_msg!( + gst::ResourceError::Failed, + ["Failed to open stream: {}", err] + )) + })?; + Some(stream) + } else { + let max_datagram_size = session.max_datagram_size(); + gst::info!( + CAT, + imp = self, + "Datagram size reported by peer: {max_datagram_size}" + ); + None + }; + + gst::info!( + CAT, + imp = self, + "Remote connection accepted: {}", + session.remote_address() + ); + + Ok((session, stream)) + } +} diff --git a/net/quinn/src/quinnwtclientsrc/mod.rs b/net/quinn/src/quinnwtclientsrc/mod.rs new file mode 100644 index 00000000..c8481af0 --- /dev/null +++ b/net/quinn/src/quinnwtclientsrc/mod.rs @@ -0,0 +1,39 @@ +// Copyright (C) 2024, Fluendo S.A. +// Author: Andoni Morales Alastruey +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +/** + * element-quinnwtclientsrc: + * @short-description: [WebTransport](https://www.w3.org/TR/webtransport/) client that receives + * data over the network connecting to a WebTransport server + * + * ## Example receiver pipeline + * ```bash + * gst-launch-1.0 -v -e quinnwtclientsrc url="http://localhost:4443/" \ + * certificate-file="certificates/fullchain.pem" caps=audio/x-opus ! \ + * ! opusparse ! opusdec ! audio/x-raw,format=S16LE,rate=48000,channels=2,layout=interleaved ! \ + * audioconvert ! autoaudiosink + * ``` + */ +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct QuinnWebTransportClientSrc(ObjectSubclass) @extends gst_base::BaseSrc, gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "quinnwtclientsrc", + gst::Rank::MARGINAL, + QuinnWebTransportClientSrc::static_type(), + ) +} diff --git a/net/quinn/src/utils.rs b/net/quinn/src/utils.rs index 6729613a..8a20c34d 100644 --- a/net/quinn/src/utils.rs +++ b/net/quinn/src/utils.rs @@ -1,6 +1,8 @@ // Copyright (C) 2024, Asymptotic Inc. // Author: Sanchayan Maity -//G +// Copyright (C) 2024, Fluendo S.A. +// Author: Andoni Morales Alastruey +// // This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. // If a copy of the MPL was not distributed with this file, You can obtain one at // . @@ -34,13 +36,14 @@ pub const CONNECTION_CLOSE_MSG: &str = "Stopped"; pub struct QuinnQuicEndpointConfig { pub server_addr: SocketAddr, pub server_name: String, - pub client_addr: SocketAddr, + pub client_addr: Option, pub secure_conn: bool, pub alpns: Vec, pub certificate_file: Option, pub private_key_file: Option, pub keep_alive_interval: u64, pub transport_config: QuinnQuicTransportConfig, + pub with_client_auth: bool, } #[derive(Error, Debug)] @@ -227,8 +230,14 @@ fn create_transport_config( )); transport_config .datagram_send_buffer_size(ep_config.transport_config.datagram_send_buffer_size); - transport_config.max_concurrent_bidi_streams(0u32.into()); - transport_config.max_concurrent_uni_streams(1u32.into()); + transport_config.max_concurrent_bidi_streams( + ep_config + .transport_config + .max_concurrent_bidi_streams + .into(), + ); + transport_config + .max_concurrent_uni_streams(ep_config.transport_config.max_concurrent_uni_streams.into()); transport_config.mtu_discovery_config(Some(mtu_config)); transport_config @@ -312,7 +321,7 @@ fn read_certs_from_file( fn read_private_key_from_file( private_key_file: Option, ) -> Result, Box> { - let key_file = private_key_file.expect("Expected path to certificates be valid"); + let key_file = private_key_file.expect("Expected path to private key to be valid"); let key: rustls_pki_types::PrivateKeyDer<'static> = { let key_file = File::open(key_file.as_path())?; @@ -359,18 +368,25 @@ fn configure_server( let mut cert_store = rustls::RootCertStore::empty(); cert_store.add_parsable_certificates(certs.clone()); - let auth_client = rustls::server::WebPkiClientVerifier::builder_with_provider( - Arc::new(cert_store), - ring_provider.clone().into(), - ) - .build() - .unwrap(); - - rustls::ServerConfig::builder_with_provider(ring_provider.into()) - .with_protocol_versions(&[&rustls::version::TLS13]) - .unwrap() - .with_client_cert_verifier(auth_client) - .with_single_cert(certs.clone(), key) + let config_builder = + rustls::ServerConfig::builder_with_provider(ring_provider.clone().into()) + .with_protocol_versions(&[&rustls::version::TLS13]) + .unwrap(); + if ep_config.with_client_auth { + let auth_client = rustls::server::WebPkiClientVerifier::builder_with_provider( + Arc::new(cert_store), + ring_provider.into(), + ) + .build() + .unwrap(); + config_builder + .with_client_cert_verifier(auth_client) + .with_single_cert(certs.clone(), key) + } else { + config_builder + .with_no_client_auth() + .with_single_cert(certs.clone(), key) + } } else { rustls::ServerConfig::builder_with_provider(ring_provider.into()) .with_protocol_versions(&[&rustls::version::TLS13]) @@ -414,18 +430,17 @@ pub fn server_endpoint(ep_config: &QuinnQuicEndpointConfig) -> Result Result> { let client_cfg = configure_client(ep_config)?; - let mut endpoint = Endpoint::client(ep_config.client_addr)?; + let mut endpoint = Endpoint::client(ep_config.client_addr.expect("client_addr not set"))?; endpoint.set_default_client_config(client_cfg); Ok(endpoint) } -pub fn get_stats(connection: Option) -> gst::Structure { - match connection { - Some(conn) => { +pub fn get_stats(stats: Option) -> gst::Structure { + match stats { + Some(stats) => { // See quinn_proto::ConnectionStats - let stats = conn.stats(); let udp_stats = |udp: UdpStats, name: String| -> gst::Structure { gst::Structure::builder(name) .field("datagrams", udp.datagrams)