quinn: add a new WebTransport client element

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1867>
This commit is contained in:
Andoni Morales Alastruey 2024-10-13 01:30:02 -04:00 committed by GStreamer Marge Bot
parent 62e49b3ed5
commit ef21a6aa3b
9 changed files with 808 additions and 26 deletions

107
Cargo.lock generated
View file

@ -1056,6 +1056,12 @@ dependencies = [
"thiserror 2.0.3",
]
[[package]]
name = "cesu8"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d43a04d8753f35258c91f8ec639f792891f748a1edbd759cf1dcea3382ad83c"
[[package]]
name = "cexpr"
version = "0.6.0"
@ -1215,6 +1221,16 @@ version = "1.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
[[package]]
name = "combine"
version = "4.6.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba5a308b75df32fe02788e748662718f03fde005016435c444eea572398219fd"
dependencies = [
"bytes",
"memchr",
]
[[package]]
name = "concurrent-queue"
version = "2.5.0"
@ -2874,6 +2890,7 @@ dependencies = [
"gstreamer-base",
"gstreamer-check",
"itertools 0.12.1",
"once_cell",
"quinn",
"quinn-proto",
"rcgen",
@ -2883,6 +2900,8 @@ dependencies = [
"serial_test",
"thiserror 2.0.3",
"tokio",
"url",
"web-transport-quinn",
]
[[package]]
@ -4549,6 +4568,26 @@ version = "1.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d75a2a4b1b190afb6f5425f10f6a8f959d2ea0b9c2b1d79553551850539e4674"
[[package]]
name = "jni"
version = "0.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6df18c2e3db7e453d3c6ac5b3e9d5182664d28788126d39b91f2d1e22b017ec"
dependencies = [
"cesu8",
"combine",
"jni-sys",
"log",
"thiserror 1.0.69",
"walkdir",
]
[[package]]
name = "jni-sys"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130"
[[package]]
name = "jobserver"
version = "0.1.32"
@ -5979,6 +6018,7 @@ dependencies = [
"rustc-hash 2.0.0",
"rustls 0.23.19",
"rustls-pki-types",
"rustls-platform-verifier",
"slab",
"thiserror 2.0.3",
"tinyvec",
@ -6568,6 +6608,33 @@ dependencies = [
"web-time",
]
[[package]]
name = "rustls-platform-verifier"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4c7dc240fec5517e6c4eab3310438636cfe6391dfc345ba013109909a90d136"
dependencies = [
"core-foundation 0.9.4",
"core-foundation-sys",
"jni",
"log",
"once_cell",
"rustls 0.23.17",
"rustls-native-certs 0.7.3",
"rustls-platform-verifier-android",
"rustls-webpki 0.102.8",
"security-framework 2.11.1",
"security-framework-sys",
"webpki-root-certs",
"windows-sys 0.52.0",
]
[[package]]
name = "rustls-platform-verifier-android"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f87165f0995f63a9fbeea62b64d10b4d9d8e78ec6d7d51fb2125fda7bb36788f"
[[package]]
name = "rustls-webpki"
version = "0.101.7"
@ -6691,6 +6758,7 @@ dependencies = [
"core-foundation 0.9.4",
"core-foundation-sys",
"libc",
"num-bigint",
"security-framework-sys",
]
@ -8146,6 +8214,36 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "web-transport-proto"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a3806ea43df5817f0d90618c842d28db5946bc18a5db0659b2275c2be48d472"
dependencies = [
"bytes",
"http 1.1.0",
"thiserror 1.0.69",
"url",
]
[[package]]
name = "web-transport-quinn"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3020b51cda10472a365e42d9a701916d4f04d74cc743de08246ef6a421c2d137"
dependencies = [
"bytes",
"futures",
"http 1.1.0",
"log",
"quinn",
"quinn-proto",
"thiserror 1.0.69",
"tokio",
"url",
"web-transport-proto",
]
[[package]]
name = "webm-iterable"
version = "0.6.3"
@ -8165,6 +8263,15 @@ dependencies = [
"untrusted",
]
[[package]]
name = "webpki-root-certs"
version = "0.26.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cd5da49bdf1f30054cfe0b8ce2958b8fbeb67c4d82c8967a598af481bef255c"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "webpki-roots"
version = "0.25.4"

View file

@ -27,6 +27,9 @@ thiserror = "2"
async-channel = "2.3"
itertools = "0.12"
env_logger = "0.11"
web-transport-quinn = "0.3.3"
url = "2.5.2"
once_cell = "1.20.2"
[dev-dependencies]
gst-check = { workspace = true, features = ["v1_20"] }

View file

@ -24,6 +24,7 @@ pub(crate) static DEFAULT_MIN_UDP_PAYLOAD_SIZE: u16 = 1200;
pub(crate) static DEFAULT_MAX_UDP_PAYLOAD_SIZE: u16 = 65527;
pub(crate) static DEFAULT_DROP_BUFFER_FOR_DATAGRAM: bool = false;
pub(crate) static DEFAULT_MAX_CONCURRENT_UNI_STREAMS: VarInt = VarInt::from_u32(32);
pub(crate) static DEFAULT_USE_DATAGRAM: bool = false;
/*
* For QUIC transport parameters
@ -36,6 +37,7 @@ pub(crate) static DEFAULT_MAX_CONCURRENT_UNI_STREAMS: VarInt = VarInt::from_u32(
pub(crate) const DEFAULT_ALPN: &str = "gst-quinn";
pub(crate) const DEFAULT_TIMEOUT: u32 = 15;
pub(crate) const DEFAULT_SECURE_CONNECTION: bool = true;
pub(crate) const HTTP3_ALPN: &str = "h3";
#[derive(Debug, Eq, PartialEq, Ord, PartialOrd, Hash, Clone, Copy, glib::Enum)]
#[repr(u32)]
@ -56,6 +58,7 @@ pub struct QuinnQuicTransportConfig {
pub max_udp_payload_size: u16,
pub min_mtu: u16,
pub upper_bound_mtu: u16,
pub max_concurrent_bidi_streams: VarInt,
pub max_concurrent_uni_streams: VarInt,
pub send_window: u64,
pub stream_receive_window: VarInt,
@ -78,6 +81,7 @@ impl Default for QuinnQuicTransportConfig {
max_udp_payload_size: DEFAULT_MAX_UDP_PAYLOAD_SIZE,
min_mtu: DEFAULT_MINIMUM_MTU,
upper_bound_mtu: DEFAULT_UPPER_BOUND_MTU,
max_concurrent_bidi_streams: VarInt::from(0u32),
max_concurrent_uni_streams: DEFAULT_MAX_CONCURRENT_UNI_STREAMS,
send_window: (8 * STREAM_RWND).into(),
stream_receive_window: STREAM_RWND.into(),

View file

@ -28,6 +28,7 @@ mod quinnquicsink;
mod quinnquicsrc;
mod quinnroqdemux;
mod quinnroqmux;
mod quinnwtclientsrc;
mod utils;
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
@ -42,6 +43,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
quinnroqdemux::register(plugin)?;
quinnquicsink::register(plugin)?;
quinnquicsrc::register(plugin)?;
quinnwtclientsrc::register(plugin)?;
Ok(())
}

View file

@ -446,7 +446,7 @@ impl ObjectImpl for QuinnQuicSink {
match *state {
State::Started(ref state) => {
let connection = state.connection.clone();
get_stats(Some(connection)).to_value()
get_stats(Some(connection.stats())).to_value()
}
State::Stopped => get_stats(None).to_value(),
}
@ -721,13 +721,14 @@ impl QuinnQuicSink {
QuinnQuicEndpointConfig {
server_addr,
server_name,
client_addr,
client_addr: Some(client_addr),
secure_conn,
alpns,
certificate_file,
private_key_file,
keep_alive_interval,
transport_config,
with_client_auth: true,
},
)
};

View file

@ -483,7 +483,7 @@ impl ObjectImpl for QuinnQuicSrc {
match *state {
State::Started(ref state) => {
let connection = state.connection.clone();
get_stats(Some(connection)).to_value()
get_stats(Some(connection.stats())).to_value()
}
State::Stopped => get_stats(None).to_value(),
}
@ -918,13 +918,14 @@ impl QuinnQuicSrc {
QuinnQuicEndpointConfig {
server_addr,
server_name,
client_addr,
client_addr: Some(client_addr),
secure_conn,
alpns,
certificate_file,
private_key_file,
keep_alive_interval,
transport_config,
with_client_auth: false,
},
)
};

View file

@ -0,0 +1,610 @@
// Copyright (C) 2024, Fluendo S.A.
// Author: Andoni Morales Alastruey <amorales@fluendo.com>
//
// Copyright (C) 2024, Asymptotic Inc.
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{
client_endpoint, get_stats, make_socket_addr, server_endpoint, wait, Canceller,
QuinnQuicEndpointConfig, WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG,
};
use crate::{common::*, utils};
use bytes::{buf, Bytes};
use futures::future;
use gst::{glib, prelude::*, subclass::prelude::*};
use gst_base::prelude::*;
use gst_base::subclass::base_src::CreateSuccess;
use gst_base::subclass::prelude::*;
use once_cell::sync::Lazy;
use quinn::{Connection, ConnectionError, TransportConfig};
use rustls::server;
use std::borrow::Borrow;
use std::fmt::Error;
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::path::PathBuf;
use std::sync::Mutex;
use tokio::net::lookup_host;
use web_transport_quinn::{ReadError, RecvStream, Session, SessionError, ALPN};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"quinnwtclientsrc",
gst::DebugColorFlags::empty(),
Some("Quinn WebTransport client source"),
)
});
struct Started {
session: Session,
stream: Option<RecvStream>,
}
#[derive(Default)]
enum State {
#[default]
Stopped,
Started(Started),
}
#[derive(Debug)]
struct Settings {
bind_address: String,
bind_port: u16,
caps: gst::Caps,
certificate_file: Option<PathBuf>,
keep_alive_interval: u64,
timeout: u32,
transport_config: QuinnQuicTransportConfig,
url: String,
use_datagram: bool,
}
impl Default for Settings {
fn default() -> Self {
let mut transport_config = QuinnQuicTransportConfig::default();
// Required for the WebTransport handshake
transport_config.max_concurrent_bidi_streams = 2u32.into();
transport_config.max_concurrent_uni_streams = 1u32.into();
Settings {
caps: gst::Caps::new_any(),
bind_address: DEFAULT_BIND_ADDR.to_string(),
bind_port: DEFAULT_BIND_PORT,
certificate_file: None,
keep_alive_interval: 0,
timeout: DEFAULT_TIMEOUT,
transport_config,
url: DEFAULT_ADDR.to_string(),
use_datagram: DEFAULT_USE_DATAGRAM,
}
}
}
pub struct QuinnWebTransportClientSrc {
settings: Mutex<Settings>,
state: Mutex<State>,
canceller: Mutex<utils::Canceller>,
}
impl Default for QuinnWebTransportClientSrc {
fn default() -> Self {
Self {
settings: Mutex::new(Settings::default()),
state: Mutex::new(State::default()),
canceller: Mutex::new(utils::Canceller::default()),
}
}
}
impl GstObjectImpl for QuinnWebTransportClientSrc {}
impl ElementImpl for QuinnWebTransportClientSrc {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"Quinn WebTransport Client Source",
"Source/Network/QUIC",
"Receive data over the network via WebTransport",
"Andoni Morales Alastruey <amorales@fluendo.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let src_pad_template = gst::PadTemplate::new(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&gst::Caps::new_any(),
)
.unwrap();
vec![src_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}
impl ObjectImpl for QuinnWebTransportClientSrc {
fn constructed(&self) {
self.parent_constructed();
self.obj().set_format(gst::Format::Time);
self.obj().set_live(true);
self.obj().set_do_timestamp(true);
}
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![
glib::ParamSpecBoxed::builder::<gst::Caps>("caps")
.nick("caps")
.blurb("The caps of the source pad")
.build(),
glib::ParamSpecString::builder("certificate-file")
.nick("Certificate file")
.blurb("Path to certificate chain in single file")
.build(),
glib::ParamSpecUInt64::builder("keep-alive-interval")
.nick("QUIC connection keep alive interval in ms")
.blurb("Keeps QUIC connection alive by periodically pinging the server. Value set in ms, 0 disables this feature")
.default_value(0)
.readwrite()
.build(),
glib::ParamSpecUInt::builder("timeout")
.nick("Timeout")
.blurb("Value in seconds to timeout WebTransport endpoint requests (0 = No timeout).")
.maximum(3600)
.default_value(DEFAULT_TIMEOUT)
.readwrite()
.build(),
glib::ParamSpecString::builder("url")
.nick("Server URL")
.blurb("URL of the HTTP/3 server to connect to.")
.build(),
glib::ParamSpecBoolean::builder("use-datagram")
.nick("Use datagram")
.blurb("Use datagram for lower latency, unreliable messaging")
.default_value(false)
.build(),
glib::ParamSpecBoxed::builder::<gst::Structure>("stats")
.nick("Connection statistics")
.blurb("Connection statistics")
.read_only()
.build(),
]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
let mut settings = self.settings.lock().unwrap();
match pspec.name() {
"caps" => {
settings.caps = value
.get::<Option<gst::Caps>>()
.expect("type checked upstream")
.unwrap_or_else(gst::Caps::new_any);
let srcpad = self.obj().static_pad("src").expect("source pad expected");
srcpad.mark_reconfigure();
}
"certificate-file" => {
let value: String = value.get().unwrap();
settings.certificate_file = Some(value.into());
}
"keep-alive-interval" => {
settings.keep_alive_interval = value.get().expect("type checked upstream");
}
"timeout" => {
settings.timeout = value.get().expect("type checked upstream");
}
"url" => {
settings.url = value.get::<String>().expect("type checked upstream");
}
"use-datagram" => {
settings.use_datagram = value.get::<bool>().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
let settings = self.settings.lock().unwrap();
match pspec.name() {
"caps" => settings.caps.to_value(),
"certificate-file" => {
let certfile = settings.certificate_file.as_ref();
certfile.and_then(|file| file.to_str()).to_value()
}
"keep-alive-interval" => settings.keep_alive_interval.to_value(),
"timeout" => settings.timeout.to_value(),
"url" => settings.url.to_value(),
"use-datagram" => settings.use_datagram.to_value(),
"stats" => {
let state = self.state.lock().unwrap();
match *state {
State::Started(ref state) => get_stats(Some(state.session.stats())).to_value(),
State::Stopped => get_stats(None).to_value(),
}
}
_ => unimplemented!(),
}
}
}
#[glib::object_subclass]
impl ObjectSubclass for QuinnWebTransportClientSrc {
const NAME: &'static str = "GstQuinnWebTransportClientSrc";
type Type = super::QuinnWebTransportClientSrc;
type ParentType = gst_base::BaseSrc;
}
impl BaseSrcImpl for QuinnWebTransportClientSrc {
fn is_seekable(&self) -> bool {
false
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
let settings = self.settings.lock().unwrap();
let timeout = settings.timeout;
drop(settings);
let mut state = self.state.lock().unwrap();
if let State::Started { .. } = *state {
unreachable!("QuinnWebTransportClientSrc already started");
}
match wait(&self.canceller, self.init_session(), timeout) {
Ok(Ok((c, s))) => {
*state = State::Started(Started {
session: c,
stream: s,
});
gst::info!(CAT, imp = self, "Started");
Ok(())
}
Ok(Err(e)) | Err(e) => match e {
WaitError::FutureAborted => {
gst::warning!(CAT, imp = self, "Connection aborted");
Ok(())
}
WaitError::FutureError(err) => {
gst::error!(CAT, imp = self, "Connection request failed: {}", err);
Err(gst::error_msg!(
gst::ResourceError::Failed,
["Connection request failed: {}", err]
))
}
},
}
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
if let State::Started(ref mut state) = *state {
let session = &state.session;
session.close(
CONNECTION_CLOSE_CODE.into(),
CONNECTION_CLOSE_MSG.as_bytes(),
);
}
*state = State::Stopped;
Ok(())
}
fn query(&self, query: &mut gst::QueryRef) -> bool {
if let gst::QueryViewMut::Scheduling(q) = query.view_mut() {
q.set(
gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED,
1,
-1,
0,
);
q.add_scheduling_modes(&[gst::PadMode::Pull, gst::PadMode::Push]);
return true;
}
BaseSrcImplExt::parent_query(self, query)
}
fn create(
&self,
offset: u64,
buffer: Option<&mut gst::BufferRef>,
length: u32,
) -> Result<CreateSuccess, gst::FlowError> {
let data = self.get(offset, u64::from(length));
match data {
Ok(bytes) => {
if bytes.is_empty() {
gst::debug!(CAT, imp = self, "End of stream");
return Err(gst::FlowError::Eos);
}
if let Some(buffer) = buffer {
if let Err(copied_bytes) = buffer.copy_from_slice(0, bytes.as_ref()) {
buffer.set_size(copied_bytes);
}
Ok(CreateSuccess::FilledBuffer)
} else {
Ok(CreateSuccess::NewBuffer(gst::Buffer::from_slice(bytes)))
}
}
Err(None) => Err(gst::FlowError::Flushing),
Err(Some(err)) => {
gst::error!(CAT, imp = self, "Could not GET: {}", err);
Err(gst::FlowError::Error)
}
}
}
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
let mut canceller = self.canceller.lock().unwrap();
canceller.abort();
Ok(())
}
fn unlock_stop(&self) -> Result<(), gst::ErrorMessage> {
let mut canceller = self.canceller.lock().unwrap();
if matches!(&*canceller, Canceller::Cancelled) {
*canceller = Canceller::None;
}
Ok(())
}
fn caps(&self, filter: Option<&gst::Caps>) -> Option<gst::Caps> {
let settings = self.settings.lock().unwrap();
let mut tmp_caps = settings.caps.clone();
gst::debug!(CAT, imp = self, "Advertising our own caps: {:?}", &tmp_caps);
if let Some(filter_caps) = filter {
gst::debug!(
CAT,
imp = self,
"Intersecting with filter caps: {:?}",
&filter_caps
);
tmp_caps = filter_caps.intersect_with_mode(&tmp_caps, gst::CapsIntersectMode::First);
};
gst::debug!(CAT, imp = self, "Returning caps: {:?}", &tmp_caps);
Some(tmp_caps)
}
}
impl QuinnWebTransportClientSrc {
async fn read_stream(
&self,
stream: &mut RecvStream,
length: usize,
) -> Result<Bytes, WaitError> {
match stream.read_chunk(length, true).await {
Ok(Some(chunk)) => Ok(chunk.bytes),
Ok(None) => Ok(Bytes::new()),
Err(err) => match err {
ReadError::SessionError(conn_err) => match conn_err {
SessionError::ConnectionError(ce) => {
gst::info!(CAT, imp = self, "Connection error, {}", ce);
Ok(Bytes::new())
}
SessionError::SendDatagramError(sde) => {
gst::info!(CAT, imp = self, "Send datagram error, {}", sde);
Ok(Bytes::new())
}
SessionError::WebTransportError(wte) => {
gst::info!(CAT, imp = self, "WebTransport error, {}", wte);
Ok(Bytes::new())
}
},
ReadError::ClosedStream => {
gst::info!(CAT, imp = self, "Stream closed");
Ok(Bytes::new())
}
ReadError::Reset(r) => {
gst::info!(CAT, imp = self, "Reset, {}", r);
Ok(Bytes::new())
}
ReadError::InvalidReset(ir) => {
gst::info!(CAT, imp = self, "Invalid Reset, {}", ir);
Ok(Bytes::new())
}
_ => Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Stream read error: {}", err]
))),
},
}
}
async fn read_datagram(&self, session: &Session) -> Result<Bytes, WaitError> {
match session.read_datagram().await {
Ok(bytes) => Ok(bytes),
Err(err) => match err {
SessionError::ConnectionError(ce) => {
gst::info!(CAT, imp = self, "Connection error, {}", ce);
Ok(Bytes::new())
}
SessionError::SendDatagramError(de) => {
gst::info!(CAT, imp = self, "Error sending datagram, {}", de);
Ok(Bytes::new())
}
SessionError::WebTransportError(we) => {
gst::info!(CAT, imp = self, "WebTransport error, {}", we);
Ok(Bytes::new())
}
},
}
}
fn get(&self, _offset: u64, length: u64) -> Result<Bytes, Option<gst::ErrorMessage>> {
let settings = self.settings.lock().unwrap();
let timeout = settings.timeout;
let use_datagram = settings.use_datagram;
drop(settings);
let mut state = self.state.lock().unwrap();
let (session, stream) = match *state {
State::Started(Started {
ref session,
ref mut stream,
}) => (session, stream),
State::Stopped => {
return Err(Some(gst::error_msg!(
gst::LibraryError::Failed,
["Cannot get data before start"]
)));
}
};
let future = async {
if use_datagram {
self.read_datagram(session).await
} else {
let recv = stream.as_mut().unwrap();
self.read_stream(recv, length as usize).await
}
};
match wait(&self.canceller, future, timeout) {
Ok(Ok(bytes)) => Ok(bytes),
Ok(Err(e)) | Err(e) => match e {
WaitError::FutureAborted => {
gst::warning!(CAT, imp = self, "Read from stream request aborted");
Err(None)
}
WaitError::FutureError(e) => {
gst::error!(CAT, imp = self, "Failed to read from stream: {}", e);
Err(Some(e))
}
},
}
}
async fn init_session(&self) -> Result<(Session, Option<RecvStream>), WaitError> {
let (use_datagram, url, mut endpoint_config) = {
let settings = self.settings.lock().unwrap();
let client_addr = make_socket_addr(
format!("{}:{}", settings.bind_address, settings.bind_port).as_str(),
)?;
let url = url::Url::parse(&settings.url).map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to parse URL: {}", err]
))
})?;
(
settings.use_datagram,
url.clone(),
QuinnQuicEndpointConfig {
server_addr: SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 4443), // This will be filled in correctly later
server_name: DEFAULT_SERVER_NAME.to_string(),
client_addr: Some(client_addr),
secure_conn: true,
alpns: vec![HTTP3_ALPN.to_string()],
certificate_file: settings.certificate_file.clone(),
private_key_file: None,
keep_alive_interval: settings.keep_alive_interval,
transport_config: settings.transport_config,
with_client_auth: false,
},
)
};
let server_port = url.port().unwrap_or(443);
let host = url.host_str().ok_or_else(|| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Cannot parse host for URL: {}", url.as_str()]
))
})?;
// Look up the DNS entry.
let mut remotes = lookup_host((host, server_port)).await.map_err(|_| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Cannot resolve host name for URL: {}", url.as_str()]
))
})?;
// Use the first entry.
endpoint_config.server_addr = match remotes.next() {
Some(remote) => Ok(remote),
None => Err(WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Cannot resolve host name for URL: {}", url.as_str()]
))),
}?;
let client = client_endpoint(&endpoint_config).map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to configure endpoint: {}", err]
))
})?;
let session = web_transport_quinn::connect(&client, &url)
.await
.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to connect to server: {}", err]
))
})?;
let stream = if !use_datagram {
let (_, stream) = session.accept_bi().await.map_err(|err| {
WaitError::FutureError(gst::error_msg!(
gst::ResourceError::Failed,
["Failed to open stream: {}", err]
))
})?;
Some(stream)
} else {
let max_datagram_size = session.max_datagram_size();
gst::info!(
CAT,
imp = self,
"Datagram size reported by peer: {max_datagram_size}"
);
None
};
gst::info!(
CAT,
imp = self,
"Remote connection accepted: {}",
session.remote_address()
);
Ok((session, stream))
}
}

View file

@ -0,0 +1,39 @@
// Copyright (C) 2024, Fluendo S.A.
// Author: Andoni Morales Alastruey <amorales@fluendo.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
/**
* element-quinnwtclientsrc:
* @short-description: [WebTransport](https://www.w3.org/TR/webtransport/) client that receives
* data over the network connecting to a WebTransport server
*
* ## Example receiver pipeline
* ```bash
* gst-launch-1.0 -v -e quinnwtclientsrc url="http://localhost:4443/" \
* certificate-file="certificates/fullchain.pem" caps=audio/x-opus ! \
* ! opusparse ! opusdec ! audio/x-raw,format=S16LE,rate=48000,channels=2,layout=interleaved ! \
* audioconvert ! autoaudiosink
* ```
*/
use gst::glib;
use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct QuinnWebTransportClientSrc(ObjectSubclass<imp::QuinnWebTransportClientSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
gst::Element::register(
Some(plugin),
"quinnwtclientsrc",
gst::Rank::MARGINAL,
QuinnWebTransportClientSrc::static_type(),
)
}

View file

@ -1,6 +1,8 @@
// Copyright (C) 2024, Asymptotic Inc.
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
//G
// Copyright (C) 2024, Fluendo S.A.
// Author: Andoni Morales Alastruey <amorales@fluendo.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
@ -34,13 +36,14 @@ pub const CONNECTION_CLOSE_MSG: &str = "Stopped";
pub struct QuinnQuicEndpointConfig {
pub server_addr: SocketAddr,
pub server_name: String,
pub client_addr: SocketAddr,
pub client_addr: Option<SocketAddr>,
pub secure_conn: bool,
pub alpns: Vec<String>,
pub certificate_file: Option<PathBuf>,
pub private_key_file: Option<PathBuf>,
pub keep_alive_interval: u64,
pub transport_config: QuinnQuicTransportConfig,
pub with_client_auth: bool,
}
#[derive(Error, Debug)]
@ -227,8 +230,14 @@ fn create_transport_config(
));
transport_config
.datagram_send_buffer_size(ep_config.transport_config.datagram_send_buffer_size);
transport_config.max_concurrent_bidi_streams(0u32.into());
transport_config.max_concurrent_uni_streams(1u32.into());
transport_config.max_concurrent_bidi_streams(
ep_config
.transport_config
.max_concurrent_bidi_streams
.into(),
);
transport_config
.max_concurrent_uni_streams(ep_config.transport_config.max_concurrent_uni_streams.into());
transport_config.mtu_discovery_config(Some(mtu_config));
transport_config
@ -312,7 +321,7 @@ fn read_certs_from_file(
fn read_private_key_from_file(
private_key_file: Option<PathBuf>,
) -> Result<rustls_pki_types::PrivateKeyDer<'static>, Box<dyn Error>> {
let key_file = private_key_file.expect("Expected path to certificates be valid");
let key_file = private_key_file.expect("Expected path to private key to be valid");
let key: rustls_pki_types::PrivateKeyDer<'static> = {
let key_file = File::open(key_file.as_path())?;
@ -359,18 +368,25 @@ fn configure_server(
let mut cert_store = rustls::RootCertStore::empty();
cert_store.add_parsable_certificates(certs.clone());
let auth_client = rustls::server::WebPkiClientVerifier::builder_with_provider(
Arc::new(cert_store),
ring_provider.clone().into(),
)
.build()
.unwrap();
rustls::ServerConfig::builder_with_provider(ring_provider.into())
.with_protocol_versions(&[&rustls::version::TLS13])
.unwrap()
.with_client_cert_verifier(auth_client)
.with_single_cert(certs.clone(), key)
let config_builder =
rustls::ServerConfig::builder_with_provider(ring_provider.clone().into())
.with_protocol_versions(&[&rustls::version::TLS13])
.unwrap();
if ep_config.with_client_auth {
let auth_client = rustls::server::WebPkiClientVerifier::builder_with_provider(
Arc::new(cert_store),
ring_provider.into(),
)
.build()
.unwrap();
config_builder
.with_client_cert_verifier(auth_client)
.with_single_cert(certs.clone(), key)
} else {
config_builder
.with_no_client_auth()
.with_single_cert(certs.clone(), key)
}
} else {
rustls::ServerConfig::builder_with_provider(ring_provider.into())
.with_protocol_versions(&[&rustls::version::TLS13])
@ -414,18 +430,17 @@ pub fn server_endpoint(ep_config: &QuinnQuicEndpointConfig) -> Result<Endpoint,
pub fn client_endpoint(ep_config: &QuinnQuicEndpointConfig) -> Result<Endpoint, Box<dyn Error>> {
let client_cfg = configure_client(ep_config)?;
let mut endpoint = Endpoint::client(ep_config.client_addr)?;
let mut endpoint = Endpoint::client(ep_config.client_addr.expect("client_addr not set"))?;
endpoint.set_default_client_config(client_cfg);
Ok(endpoint)
}
pub fn get_stats(connection: Option<Connection>) -> gst::Structure {
match connection {
Some(conn) => {
pub fn get_stats(stats: Option<ConnectionStats>) -> gst::Structure {
match stats {
Some(stats) => {
// See quinn_proto::ConnectionStats
let stats = conn.stats();
let udp_stats = |udp: UdpStats, name: String| -> gst::Structure {
gst::Structure::builder(name)
.field("datagrams", udp.datagrams)