mirror of
https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs.git
synced 2024-06-02 07:09:51 +00:00
Merge branch 'quic' into 'main'
net: Add QUIC source and sink Closes #183 See merge request gstreamer/gst-plugins-rs!1036
This commit is contained in:
commit
335df80a95
510
Cargo.lock
generated
510
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -34,6 +34,7 @@ members = [
|
||||||
"net/webrtc",
|
"net/webrtc",
|
||||||
"net/webrtc/protocol",
|
"net/webrtc/protocol",
|
||||||
"net/webrtc/signalling",
|
"net/webrtc/signalling",
|
||||||
|
"net/quinn",
|
||||||
|
|
||||||
"text/ahead",
|
"text/ahead",
|
||||||
"text/json",
|
"text/json",
|
||||||
|
|
|
@ -3889,6 +3889,324 @@
|
||||||
"tracers": {},
|
"tracers": {},
|
||||||
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
|
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
|
||||||
},
|
},
|
||||||
|
"quinn": {
|
||||||
|
"description": "GStreamer Plugin for QUIC",
|
||||||
|
"elements": {
|
||||||
|
"quinnquicsink": {
|
||||||
|
"author": "Sanchayan Maity <sanchayan@asymptotic.io>",
|
||||||
|
"description": "Send data over the network via QUIC",
|
||||||
|
"hierarchy": [
|
||||||
|
"GstQuinnQUICSink",
|
||||||
|
"GstBaseSink",
|
||||||
|
"GstElement",
|
||||||
|
"GstObject",
|
||||||
|
"GInitiallyUnowned",
|
||||||
|
"GObject"
|
||||||
|
],
|
||||||
|
"klass": "Source/Network/QUIC",
|
||||||
|
"pad-templates": {
|
||||||
|
"sink": {
|
||||||
|
"caps": "ANY",
|
||||||
|
"direction": "sink",
|
||||||
|
"presence": "always"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"properties": {
|
||||||
|
"alpn-protocols": {
|
||||||
|
"blurb": "QUIC connection Application-Layer Protocol Negotiation (ALPN) values",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "GstValueArray",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"certificate-file": {
|
||||||
|
"blurb": "Path to certificate chain in single file",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "NULL",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gchararray",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"client-address": {
|
||||||
|
"blurb": "Address to be used by this QUIC client e.g. 127.0.0.1",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "127.0.0.1",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gchararray",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"client-port": {
|
||||||
|
"blurb": "Port to be used by this QUIC client e.g. 5001",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "5001",
|
||||||
|
"max": "65535",
|
||||||
|
"min": "0",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "guint",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"private-key-file": {
|
||||||
|
"blurb": "Path to a PKCS8 or RSA private key file",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "NULL",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gchararray",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"secure-connection": {
|
||||||
|
"blurb": "Use certificates for QUIC connection. False: Insecure connection, True: Secure connection.",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "true",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gboolean",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"server-address": {
|
||||||
|
"blurb": "Address of the QUIC server to connect to e.g. 127.0.0.1",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "127.0.0.1",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gchararray",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"server-name": {
|
||||||
|
"blurb": "Name of the QUIC server which is in server certificate",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "localhost",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gchararray",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"server-port": {
|
||||||
|
"blurb": "Port of the QUIC server to connect to e.g. 5000",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "5000",
|
||||||
|
"max": "65535",
|
||||||
|
"min": "0",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "guint",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"timeout": {
|
||||||
|
"blurb": "Value in seconds to timeout QUIC endpoint requests (0 = No timeout).",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "15",
|
||||||
|
"max": "3600",
|
||||||
|
"min": "0",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "guint",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"use-datagram": {
|
||||||
|
"blurb": "Use datagram for lower latency, unreliable messaging",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "false",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gboolean",
|
||||||
|
"writable": true
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"rank": "marginal"
|
||||||
|
},
|
||||||
|
"quinnquicsrc": {
|
||||||
|
"author": "Sanchayan Maity <sanchayan@asymptotic.io>",
|
||||||
|
"description": "Receive data over the network via QUIC",
|
||||||
|
"hierarchy": [
|
||||||
|
"GstQuinnQUICSrc",
|
||||||
|
"GstBaseSrc",
|
||||||
|
"GstElement",
|
||||||
|
"GstObject",
|
||||||
|
"GInitiallyUnowned",
|
||||||
|
"GObject"
|
||||||
|
],
|
||||||
|
"klass": "Source/Network/QUIC",
|
||||||
|
"pad-templates": {
|
||||||
|
"src": {
|
||||||
|
"caps": "ANY",
|
||||||
|
"direction": "src",
|
||||||
|
"presence": "always"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"properties": {
|
||||||
|
"alpn-protocols": {
|
||||||
|
"blurb": "QUIC connection Application-Layer Protocol Negotiation (ALPN) values",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "GstValueArray",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"caps": {
|
||||||
|
"blurb": "The caps of the source pad",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "ANY",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "GstCaps",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"certificate-file": {
|
||||||
|
"blurb": "Path to certificate chain in single file",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "NULL",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gchararray",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"private-key-file": {
|
||||||
|
"blurb": "Path to a PKCS8 or RSA private key file",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "NULL",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gchararray",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"secure-connection": {
|
||||||
|
"blurb": "Use certificates for QUIC connection. False: Insecure connection, True: Secure connection.",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "true",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gboolean",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"server-address": {
|
||||||
|
"blurb": "Address of the QUIC server e.g. 127.0.0.1",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "127.0.0.1",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gchararray",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"server-name": {
|
||||||
|
"blurb": "Name of the QUIC server which is in server certificate",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "localhost",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gchararray",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"server-port": {
|
||||||
|
"blurb": "Port of the QUIC server e.g. 5000",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "5000",
|
||||||
|
"max": "65535",
|
||||||
|
"min": "0",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "guint",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"timeout": {
|
||||||
|
"blurb": "Value in seconds to timeout QUIC endpoint requests (0 = No timeout).",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "15",
|
||||||
|
"max": "3600",
|
||||||
|
"min": "0",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "guint",
|
||||||
|
"writable": true
|
||||||
|
},
|
||||||
|
"use-datagram": {
|
||||||
|
"blurb": "Use datagram for lower latency, unreliable messaging",
|
||||||
|
"conditionally-available": false,
|
||||||
|
"construct": false,
|
||||||
|
"construct-only": false,
|
||||||
|
"controllable": false,
|
||||||
|
"default": "false",
|
||||||
|
"mutable": "null",
|
||||||
|
"readable": true,
|
||||||
|
"type": "gboolean",
|
||||||
|
"writable": true
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"rank": "marginal"
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"filename": "gstquinn",
|
||||||
|
"license": "MPL",
|
||||||
|
"other-types": {},
|
||||||
|
"package": "gst-plugin-quinn",
|
||||||
|
"source": "gst-plugin-quinn",
|
||||||
|
"tracers": {},
|
||||||
|
"url": "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
|
||||||
|
},
|
||||||
"raptorq": {
|
"raptorq": {
|
||||||
"description": "GStreamer RaptorQ FEC Plugin",
|
"description": "GStreamer RaptorQ FEC Plugin",
|
||||||
"elements": {
|
"elements": {
|
||||||
|
|
|
@ -205,6 +205,7 @@ plugins = {
|
||||||
'extra-deps': {'cairo-gobject': []},
|
'extra-deps': {'cairo-gobject': []},
|
||||||
},
|
},
|
||||||
'gopbuffer': {'library': 'libgstgopbuffer'},
|
'gopbuffer': {'library': 'libgstgopbuffer'},
|
||||||
|
'quinn': {'library': 'libgstquinn'},
|
||||||
}
|
}
|
||||||
|
|
||||||
if get_option('examples').allowed()
|
if get_option('examples').allowed()
|
||||||
|
|
|
@ -34,6 +34,7 @@ option('rtsp', type: 'feature', value: 'auto', description: 'Build rtsp plugin')
|
||||||
option('rtp', type: 'feature', value: 'auto', description: 'Build rtp plugin')
|
option('rtp', type: 'feature', value: 'auto', description: 'Build rtp plugin')
|
||||||
option('webrtc', type: 'feature', value: 'auto', yield: true, description: 'Build webrtc plugin')
|
option('webrtc', type: 'feature', value: 'auto', yield: true, description: 'Build webrtc plugin')
|
||||||
option('webrtchttp', type: 'feature', value: 'auto', description: 'Build webrtchttp plugin')
|
option('webrtchttp', type: 'feature', value: 'auto', description: 'Build webrtchttp plugin')
|
||||||
|
option('quinn', type: 'feature', value: 'auto', description: 'Build quinn plugin')
|
||||||
|
|
||||||
# text
|
# text
|
||||||
option('textahead', type: 'feature', value: 'auto', description: 'Build textahead plugin')
|
option('textahead', type: 'feature', value: 'auto', description: 'Build textahead plugin')
|
||||||
|
|
55
net/quinn/Cargo.toml
Normal file
55
net/quinn/Cargo.toml
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
[package]
|
||||||
|
name = "gst-plugin-quinn"
|
||||||
|
version.workspace = true
|
||||||
|
authors = ["Sanchayan Maity <sanchayan@asymptotic.io"]
|
||||||
|
repository = "https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs"
|
||||||
|
license = "MPL-2.0"
|
||||||
|
edition = "2021"
|
||||||
|
description = "GStreamer Plugin for QUIC"
|
||||||
|
rust-version = "1.63"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
gst = { package = "gstreamer", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
||||||
|
gst-base = { package = "gstreamer-base", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" }
|
||||||
|
once_cell = "1.19"
|
||||||
|
tokio = { version = "1.36.0", default-features = false, features = ["time", "rt-multi-thread"] }
|
||||||
|
futures = "0.3.30"
|
||||||
|
quinn = "0.10.2"
|
||||||
|
rustls = { version = "0.21.8", default-features = false, features = ["dangerous_configuration", "quic"] }
|
||||||
|
rustls-pemfile = "1.0.3"
|
||||||
|
rcgen = "0.12.1"
|
||||||
|
bytes = "1.5.0"
|
||||||
|
thiserror = "1"
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
gst-check = { package = "gstreamer-check", git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs", features = ["v1_20"] }
|
||||||
|
serial_test = "3"
|
||||||
|
|
||||||
|
[lib]
|
||||||
|
name = "gstquinn"
|
||||||
|
crate-type = ["cdylib", "rlib"]
|
||||||
|
path = "src/lib.rs"
|
||||||
|
required-features = ["tls-rustls"]
|
||||||
|
|
||||||
|
[build-dependencies]
|
||||||
|
gst-plugin-version-helper = { path="../../version-helper" }
|
||||||
|
|
||||||
|
[features]
|
||||||
|
static = []
|
||||||
|
capi = []
|
||||||
|
doc = []
|
||||||
|
|
||||||
|
[package.metadata.capi]
|
||||||
|
min_version = "0.8.0"
|
||||||
|
|
||||||
|
[package.metadata.capi.header]
|
||||||
|
enabled = false
|
||||||
|
|
||||||
|
[package.metadata.capi.library]
|
||||||
|
install_subdir = "gstreamer-1.0"
|
||||||
|
versioning = false
|
||||||
|
|
||||||
|
[package.metadata.capi.pkg_config]
|
||||||
|
requires_private = "gstreamer-1.0, gstreamer-base-1.0, gobject-2.0, glib-2.0"
|
3
net/quinn/build.rs
Normal file
3
net/quinn/build.rs
Normal file
|
@ -0,0 +1,3 @@
|
||||||
|
fn main() {
|
||||||
|
gst_plugin_version_helper::info()
|
||||||
|
}
|
38
net/quinn/src/lib.rs
Normal file
38
net/quinn/src/lib.rs
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
// Copyright (C) 2024, Asymptotic Inc.
|
||||||
|
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||||
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||||
|
// <https://mozilla.org/MPL/2.0/>.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
#![allow(clippy::non_send_fields_in_send_ty, unused_doc_comments)]
|
||||||
|
|
||||||
|
/**
|
||||||
|
* plugin-quinn:
|
||||||
|
*
|
||||||
|
* Since: plugins-rs-0.13.0
|
||||||
|
*/
|
||||||
|
use gst::glib;
|
||||||
|
mod quinnquicsink;
|
||||||
|
mod quinnquicsrc;
|
||||||
|
mod utils;
|
||||||
|
|
||||||
|
fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||||
|
quinnquicsink::register(plugin)?;
|
||||||
|
quinnquicsrc::register(plugin)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
gst::plugin_define!(
|
||||||
|
quinn,
|
||||||
|
env!("CARGO_PKG_DESCRIPTION"),
|
||||||
|
plugin_init,
|
||||||
|
concat!(env!("CARGO_PKG_VERSION"), "-", env!("COMMIT_ID")),
|
||||||
|
"MPL",
|
||||||
|
env!("CARGO_PKG_NAME"),
|
||||||
|
env!("CARGO_PKG_NAME"),
|
||||||
|
env!("CARGO_PKG_REPOSITORY"),
|
||||||
|
env!("BUILD_REL_DATE")
|
||||||
|
);
|
571
net/quinn/src/quinnquicsink/imp.rs
Normal file
571
net/quinn/src/quinnquicsink/imp.rs
Normal file
|
@ -0,0 +1,571 @@
|
||||||
|
// Copyright (C) 2024, Asymptotic Inc.
|
||||||
|
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||||
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||||
|
// <https://mozilla.org/MPL/2.0/>.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use crate::utils::{
|
||||||
|
client_endpoint, make_socket_addr, wait, WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG,
|
||||||
|
};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::future;
|
||||||
|
use gst::{glib, prelude::*, subclass::prelude::*};
|
||||||
|
use gst_base::subclass::prelude::*;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use quinn::{Connection, SendStream};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
static DEFAULT_SERVER_NAME: &str = "localhost";
|
||||||
|
static DEFAULT_SERVER_ADDR: &str = "127.0.0.1";
|
||||||
|
static DEFAULT_SERVER_PORT: u16 = 5000;
|
||||||
|
static DEFAULT_CLIENT_ADDR: &str = "127.0.0.1";
|
||||||
|
static DEFAULT_CLIENT_PORT: u16 = 5001;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* For QUIC transport parameters
|
||||||
|
* <https://datatracker.ietf.org/doc/html/rfc9000#section-7.4>
|
||||||
|
*
|
||||||
|
* A HTTP client might specify "http/1.1" and/or "h2" or "h3".
|
||||||
|
* Other well-known values are listed in the at IANA registry at
|
||||||
|
* <https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#alpn-protocol-ids>.
|
||||||
|
*/
|
||||||
|
const DEFAULT_ALPN: &str = "gst-quinn";
|
||||||
|
const DEFAULT_TIMEOUT: u32 = 15;
|
||||||
|
const DEFAULT_SECURE_CONNECTION: bool = true;
|
||||||
|
|
||||||
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||||
|
gst::DebugCategory::new(
|
||||||
|
"quinnquicsink",
|
||||||
|
gst::DebugColorFlags::empty(),
|
||||||
|
Some("Quinn QUIC Sink"),
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
struct Started {
|
||||||
|
connection: Connection,
|
||||||
|
stream: Option<SendStream>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
enum State {
|
||||||
|
#[default]
|
||||||
|
Stopped,
|
||||||
|
Started(Started),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct Settings {
|
||||||
|
client_address: String,
|
||||||
|
client_port: u16,
|
||||||
|
server_address: String,
|
||||||
|
server_port: u16,
|
||||||
|
server_name: String,
|
||||||
|
alpns: Vec<String>,
|
||||||
|
timeout: u32,
|
||||||
|
secure_conn: bool,
|
||||||
|
use_datagram: bool,
|
||||||
|
certificate_file: Option<PathBuf>,
|
||||||
|
private_key_file: Option<PathBuf>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Settings {
|
||||||
|
fn default() -> Self {
|
||||||
|
Settings {
|
||||||
|
client_address: DEFAULT_CLIENT_ADDR.to_string(),
|
||||||
|
client_port: DEFAULT_CLIENT_PORT,
|
||||||
|
server_address: DEFAULT_SERVER_ADDR.to_string(),
|
||||||
|
server_port: DEFAULT_SERVER_PORT,
|
||||||
|
server_name: DEFAULT_SERVER_NAME.to_string(),
|
||||||
|
alpns: vec![DEFAULT_ALPN.to_string()],
|
||||||
|
timeout: DEFAULT_TIMEOUT,
|
||||||
|
secure_conn: DEFAULT_SECURE_CONNECTION,
|
||||||
|
use_datagram: false,
|
||||||
|
certificate_file: None,
|
||||||
|
private_key_file: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct QuinnQuicSink {
|
||||||
|
settings: Mutex<Settings>,
|
||||||
|
state: Mutex<State>,
|
||||||
|
canceller: Mutex<Option<future::AbortHandle>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for QuinnQuicSink {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
settings: Mutex::new(Settings::default()),
|
||||||
|
state: Mutex::new(State::default()),
|
||||||
|
canceller: Mutex::new(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GstObjectImpl for QuinnQuicSink {}
|
||||||
|
|
||||||
|
impl ElementImpl for QuinnQuicSink {
|
||||||
|
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
||||||
|
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
||||||
|
gst::subclass::ElementMetadata::new(
|
||||||
|
"Quinn QUIC Sink",
|
||||||
|
"Source/Network/QUIC",
|
||||||
|
"Send data over the network via QUIC",
|
||||||
|
"Sanchayan Maity <sanchayan@asymptotic.io>",
|
||||||
|
)
|
||||||
|
});
|
||||||
|
Some(&*ELEMENT_METADATA)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pad_templates() -> &'static [gst::PadTemplate] {
|
||||||
|
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
|
||||||
|
let sink_pad_template = gst::PadTemplate::new(
|
||||||
|
"sink",
|
||||||
|
gst::PadDirection::Sink,
|
||||||
|
gst::PadPresence::Always,
|
||||||
|
&gst::Caps::new_any(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
vec![sink_pad_template]
|
||||||
|
});
|
||||||
|
|
||||||
|
PAD_TEMPLATES.as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn change_state(
|
||||||
|
&self,
|
||||||
|
transition: gst::StateChange,
|
||||||
|
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
|
||||||
|
if transition == gst::StateChange::NullToReady {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fail the state change if a secure connection was requested but
|
||||||
|
* no certificate path was provided.
|
||||||
|
*/
|
||||||
|
if settings.secure_conn
|
||||||
|
&& (settings.certificate_file.is_none() || settings.private_key_file.is_none())
|
||||||
|
{
|
||||||
|
gst::error!(
|
||||||
|
CAT,
|
||||||
|
imp: self,
|
||||||
|
"Certificate or private key file not provided for secure connection"
|
||||||
|
);
|
||||||
|
return Err(gst::StateChangeError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.parent_change_state(transition)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ObjectImpl for QuinnQuicSink {
|
||||||
|
fn constructed(&self) {
|
||||||
|
self.parent_constructed();
|
||||||
|
}
|
||||||
|
|
||||||
|
fn properties() -> &'static [glib::ParamSpec] {
|
||||||
|
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
||||||
|
vec![
|
||||||
|
glib::ParamSpecString::builder("server-name")
|
||||||
|
.nick("QUIC server name")
|
||||||
|
.blurb("Name of the QUIC server which is in server certificate")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecString::builder("server-address")
|
||||||
|
.nick("QUIC server address")
|
||||||
|
.blurb("Address of the QUIC server to connect to e.g. 127.0.0.1")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecUInt::builder("server-port")
|
||||||
|
.nick("QUIC server port")
|
||||||
|
.blurb("Port of the QUIC server to connect to e.g. 5000")
|
||||||
|
.maximum(65535)
|
||||||
|
.default_value(DEFAULT_SERVER_PORT as u32)
|
||||||
|
.readwrite()
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecString::builder("client-address")
|
||||||
|
.nick("QUIC client address")
|
||||||
|
.blurb("Address to be used by this QUIC client e.g. 127.0.0.1")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecUInt::builder("client-port")
|
||||||
|
.nick("QUIC client port")
|
||||||
|
.blurb("Port to be used by this QUIC client e.g. 5001")
|
||||||
|
.maximum(65535)
|
||||||
|
.default_value(DEFAULT_CLIENT_PORT as u32)
|
||||||
|
.readwrite()
|
||||||
|
.build(),
|
||||||
|
gst::ParamSpecArray::builder("alpn-protocols")
|
||||||
|
.nick("QUIC ALPN values")
|
||||||
|
.blurb("QUIC connection Application-Layer Protocol Negotiation (ALPN) values")
|
||||||
|
.element_spec(&glib::ParamSpecString::builder("alpn-protocol").build())
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecUInt::builder("timeout")
|
||||||
|
.nick("Timeout")
|
||||||
|
.blurb("Value in seconds to timeout QUIC endpoint requests (0 = No timeout).")
|
||||||
|
.maximum(3600)
|
||||||
|
.default_value(DEFAULT_TIMEOUT)
|
||||||
|
.readwrite()
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecBoolean::builder("secure-connection")
|
||||||
|
.nick("Use secure connection")
|
||||||
|
.blurb("Use certificates for QUIC connection. False: Insecure connection, True: Secure connection.")
|
||||||
|
.default_value(DEFAULT_SECURE_CONNECTION)
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecString::builder("certificate-file")
|
||||||
|
.nick("Certificate file")
|
||||||
|
.blurb("Path to certificate chain in single file")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecString::builder("private-key-file")
|
||||||
|
.nick("Private key file")
|
||||||
|
.blurb("Path to a PKCS8 or RSA private key file")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecBoolean::builder("use-datagram")
|
||||||
|
.nick("Use datagram")
|
||||||
|
.blurb("Use datagram for lower latency, unreliable messaging")
|
||||||
|
.default_value(false)
|
||||||
|
.build(),
|
||||||
|
]
|
||||||
|
});
|
||||||
|
|
||||||
|
PROPERTIES.as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
|
||||||
|
let mut settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
|
match pspec.name() {
|
||||||
|
"server-name" => {
|
||||||
|
settings.server_name = value.get::<String>().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
"server-address" => {
|
||||||
|
settings.server_address = value.get::<String>().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
"server-port" => {
|
||||||
|
settings.server_port = value.get::<u32>().expect("type checked upstream") as u16;
|
||||||
|
}
|
||||||
|
"client-address" => {
|
||||||
|
settings.client_address = value.get::<String>().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
"client-port" => {
|
||||||
|
settings.client_port = value.get::<u32>().expect("type checked upstream") as u16;
|
||||||
|
}
|
||||||
|
"alpn-protocols" => {
|
||||||
|
settings.alpns = value
|
||||||
|
.get::<gst::ArrayRef>()
|
||||||
|
.expect("type checked upstream")
|
||||||
|
.as_slice()
|
||||||
|
.iter()
|
||||||
|
.map(|alpn| {
|
||||||
|
alpn.get::<&str>()
|
||||||
|
.expect("type checked upstream")
|
||||||
|
.to_string()
|
||||||
|
})
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
}
|
||||||
|
"timeout" => {
|
||||||
|
settings.timeout = value.get().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
"secure-connection" => {
|
||||||
|
settings.secure_conn = value.get().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
"certificate-file" => {
|
||||||
|
let value: String = value.get().unwrap();
|
||||||
|
settings.certificate_file = Some(value.into());
|
||||||
|
}
|
||||||
|
"private-key-file" => {
|
||||||
|
let value: String = value.get().unwrap();
|
||||||
|
settings.private_key_file = Some(value.into());
|
||||||
|
}
|
||||||
|
"use-datagram" => {
|
||||||
|
settings.use_datagram = value.get().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
_ => unimplemented!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
|
match pspec.name() {
|
||||||
|
"server-name" => settings.server_name.to_value(),
|
||||||
|
"server-address" => settings.server_address.to_string().to_value(),
|
||||||
|
"server-port" => {
|
||||||
|
let port = settings.server_port as u32;
|
||||||
|
port.to_value()
|
||||||
|
}
|
||||||
|
"client-address" => settings.client_address.to_string().to_value(),
|
||||||
|
"client-port" => {
|
||||||
|
let port = settings.client_port as u32;
|
||||||
|
port.to_value()
|
||||||
|
}
|
||||||
|
"alpn-protocols" => {
|
||||||
|
let alpns = settings.alpns.iter().map(|v| v.as_str());
|
||||||
|
gst::Array::new(alpns).to_value()
|
||||||
|
}
|
||||||
|
"timeout" => settings.timeout.to_value(),
|
||||||
|
"secure-connection" => settings.secure_conn.to_value(),
|
||||||
|
"certificate-file" => {
|
||||||
|
let certfile = settings.certificate_file.as_ref();
|
||||||
|
certfile.and_then(|file| file.to_str()).to_value()
|
||||||
|
}
|
||||||
|
"private-key-file" => {
|
||||||
|
let privkey = settings.private_key_file.as_ref();
|
||||||
|
privkey.and_then(|file| file.to_str()).to_value()
|
||||||
|
}
|
||||||
|
"use-datagram" => settings.use_datagram.to_value(),
|
||||||
|
_ => unimplemented!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[glib::object_subclass]
|
||||||
|
impl ObjectSubclass for QuinnQuicSink {
|
||||||
|
const NAME: &'static str = "GstQuinnQUICSink";
|
||||||
|
type Type = super::QuinnQuicSink;
|
||||||
|
type ParentType = gst_base::BaseSink;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BaseSinkImpl for QuinnQuicSink {
|
||||||
|
fn start(&self) -> Result<(), gst::ErrorMessage> {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
let timeout = settings.timeout;
|
||||||
|
drop(settings);
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
if let State::Started { .. } = *state {
|
||||||
|
unreachable!("QuicSink is already started");
|
||||||
|
}
|
||||||
|
|
||||||
|
match wait(&self.canceller, self.establish_connection(), timeout) {
|
||||||
|
Ok(Ok((c, s))) => {
|
||||||
|
*state = State::Started(Started {
|
||||||
|
connection: c,
|
||||||
|
stream: s,
|
||||||
|
});
|
||||||
|
|
||||||
|
gst::info!(CAT, imp: self, "Started");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Ok(Err(e)) => match e {
|
||||||
|
WaitError::FutureAborted => {
|
||||||
|
gst::warning!(CAT, imp: self, "Connection aborted");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
WaitError::FutureError(err) => {
|
||||||
|
gst::error!(CAT, imp: self, "Connection request failed: {}", err);
|
||||||
|
Err(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Connection request failed: {}", err]
|
||||||
|
))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
Err(e) => {
|
||||||
|
gst::error!(CAT, imp: self, "Failed to establish a connection: {:?}", e);
|
||||||
|
Err(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Failed to establish a connection: {:?}", e]
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stop(&self) -> Result<(), gst::ErrorMessage> {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
let timeout = settings.timeout;
|
||||||
|
let use_datagram = settings.use_datagram;
|
||||||
|
drop(settings);
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
if let State::Started(ref mut state) = *state {
|
||||||
|
let connection = &state.connection;
|
||||||
|
let mut close_msg = CONNECTION_CLOSE_MSG.to_string();
|
||||||
|
|
||||||
|
if !use_datagram {
|
||||||
|
let send = &mut state.stream.as_mut().unwrap();
|
||||||
|
|
||||||
|
// Shutdown stream gracefully
|
||||||
|
match wait(&self.canceller, send.finish(), timeout) {
|
||||||
|
Ok(r) => {
|
||||||
|
if let Err(e) = r {
|
||||||
|
close_msg = format!("Stream finish request error: {}", e);
|
||||||
|
gst::error!(CAT, imp: self, "{}", close_msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => match e {
|
||||||
|
WaitError::FutureAborted => {
|
||||||
|
close_msg = "Stream finish request aborted".to_string();
|
||||||
|
gst::warning!(CAT, imp: self, "{}", close_msg);
|
||||||
|
}
|
||||||
|
WaitError::FutureError(e) => {
|
||||||
|
close_msg = format!("Stream finish request future error: {}", e);
|
||||||
|
gst::error!(CAT, imp: self, "{}", close_msg);
|
||||||
|
}
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
connection.close(CONNECTION_CLOSE_CODE.into(), close_msg.as_bytes());
|
||||||
|
}
|
||||||
|
|
||||||
|
*state = State::Stopped;
|
||||||
|
|
||||||
|
gst::info!(CAT, imp: self, "Stopped");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn render(&self, buffer: &gst::Buffer) -> Result<gst::FlowSuccess, gst::FlowError> {
|
||||||
|
if let State::Stopped = *self.state.lock().unwrap() {
|
||||||
|
gst::element_imp_error!(self, gst::CoreError::Failed, ["Not started yet"]);
|
||||||
|
return Err(gst::FlowError::Error);
|
||||||
|
}
|
||||||
|
|
||||||
|
gst::trace!(CAT, imp: self, "Rendering {:?}", buffer);
|
||||||
|
|
||||||
|
let map = buffer.map_readable().map_err(|_| {
|
||||||
|
gst::element_imp_error!(self, gst::CoreError::Failed, ["Failed to map buffer"]);
|
||||||
|
gst::FlowError::Error
|
||||||
|
})?;
|
||||||
|
|
||||||
|
match self.send_buffer(&map) {
|
||||||
|
Ok(_) => Ok(gst::FlowSuccess::Ok),
|
||||||
|
Err(err) => match err {
|
||||||
|
Some(error_message) => {
|
||||||
|
gst::error!(CAT, imp: self, "Data sending failed: {}", error_message);
|
||||||
|
self.post_error_message(error_message);
|
||||||
|
Err(gst::FlowError::Error)
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
gst::info!(CAT, imp: self, "Send interrupted. Flushing...");
|
||||||
|
Err(gst::FlowError::Flushing)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl QuinnQuicSink {
|
||||||
|
fn send_buffer(&self, src: &[u8]) -> Result<(), Option<gst::ErrorMessage>> {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
let timeout = settings.timeout;
|
||||||
|
let use_datagram = settings.use_datagram;
|
||||||
|
drop(settings);
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
let (conn, stream) = match *state {
|
||||||
|
State::Started(Started {
|
||||||
|
ref connection,
|
||||||
|
ref mut stream,
|
||||||
|
}) => (connection, stream),
|
||||||
|
State::Stopped => {
|
||||||
|
return Err(Some(gst::error_msg!(
|
||||||
|
gst::LibraryError::Failed,
|
||||||
|
["Cannot send before start()"]
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if use_datagram {
|
||||||
|
match conn.send_datagram(Bytes::copy_from_slice(src)) {
|
||||||
|
Ok(_) => Ok(()),
|
||||||
|
Err(e) => Err(Some(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Sending data failed: {}", e]
|
||||||
|
))),
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let send = &mut stream.as_mut().unwrap();
|
||||||
|
|
||||||
|
match wait(&self.canceller, send.write_all(src), timeout) {
|
||||||
|
Ok(Ok(_)) => Ok(()),
|
||||||
|
Ok(Err(e)) => Err(Some(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Sending data failed: {}", e]
|
||||||
|
))),
|
||||||
|
Err(e) => match e {
|
||||||
|
WaitError::FutureAborted => {
|
||||||
|
gst::warning!(CAT, imp: self, "Sending aborted");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
WaitError::FutureError(e) => Err(Some(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Sending data failed: {}", e]
|
||||||
|
))),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn establish_connection(&self) -> Result<(Connection, Option<SendStream>), WaitError> {
|
||||||
|
let client_addr;
|
||||||
|
let server_addr;
|
||||||
|
let server_name;
|
||||||
|
let alpns;
|
||||||
|
let use_datagram;
|
||||||
|
let secure_conn;
|
||||||
|
let cert_file;
|
||||||
|
let private_key_file;
|
||||||
|
|
||||||
|
{
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
|
client_addr = make_socket_addr(
|
||||||
|
format!("{}:{}", settings.client_address, settings.client_port).as_str(),
|
||||||
|
)?;
|
||||||
|
server_addr = make_socket_addr(
|
||||||
|
format!("{}:{}", settings.server_address, settings.server_port).as_str(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
server_name = settings.server_name.clone();
|
||||||
|
alpns = settings.alpns.clone();
|
||||||
|
use_datagram = settings.use_datagram;
|
||||||
|
secure_conn = settings.secure_conn;
|
||||||
|
cert_file = settings.certificate_file.clone();
|
||||||
|
private_key_file = settings.private_key_file.clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
let endpoint =
|
||||||
|
client_endpoint(client_addr, secure_conn, alpns, cert_file, private_key_file).map_err(
|
||||||
|
|err| {
|
||||||
|
WaitError::FutureError(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Failed to configure endpoint: {}", err]
|
||||||
|
))
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let connection = endpoint
|
||||||
|
.connect(server_addr, &server_name)
|
||||||
|
.unwrap()
|
||||||
|
.await
|
||||||
|
.map_err(|err| {
|
||||||
|
WaitError::FutureError(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Connection error: {}", err]
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let stream = if !use_datagram {
|
||||||
|
let res = connection.open_uni().await.map_err(|err| {
|
||||||
|
WaitError::FutureError(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Failed to open stream: {}", err]
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Some(res)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((connection, stream))
|
||||||
|
}
|
||||||
|
}
|
26
net/quinn/src/quinnquicsink/mod.rs
Normal file
26
net/quinn/src/quinnquicsink/mod.rs
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
// Copyright (C) 2024, Asymptotic Inc.
|
||||||
|
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
|
||||||
|
//G
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||||
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||||
|
// <https://mozilla.org/MPL/2.0/>.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use gst::glib;
|
||||||
|
use gst::prelude::*;
|
||||||
|
|
||||||
|
pub mod imp;
|
||||||
|
|
||||||
|
glib::wrapper! {
|
||||||
|
pub struct QuinnQuicSink(ObjectSubclass<imp::QuinnQuicSink>) @extends gst_base::BaseSink, gst::Element, gst::Object;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||||
|
gst::Element::register(
|
||||||
|
Some(plugin),
|
||||||
|
"quinnquicsink",
|
||||||
|
gst::Rank::MARGINAL,
|
||||||
|
QuinnQuicSink::static_type(),
|
||||||
|
)
|
||||||
|
}
|
603
net/quinn/src/quinnquicsrc/imp.rs
Normal file
603
net/quinn/src/quinnquicsrc/imp.rs
Normal file
|
@ -0,0 +1,603 @@
|
||||||
|
// Copyright (C) 2024, Asymptotic Inc.
|
||||||
|
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||||
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||||
|
// <https://mozilla.org/MPL/2.0/>.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use crate::utils::{
|
||||||
|
make_socket_addr, server_endpoint, wait, WaitError, CONNECTION_CLOSE_CODE, CONNECTION_CLOSE_MSG,
|
||||||
|
};
|
||||||
|
use bytes::Bytes;
|
||||||
|
use futures::future;
|
||||||
|
use gst::{glib, prelude::*, subclass::prelude::*};
|
||||||
|
use gst_base::prelude::*;
|
||||||
|
use gst_base::subclass::base_src::CreateSuccess;
|
||||||
|
use gst_base::subclass::prelude::*;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use quinn::{Connection, ConnectionError, RecvStream};
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
|
||||||
|
static DEFAULT_SERVER_NAME: &str = "localhost";
|
||||||
|
static DEFAULT_SERVER_ADDR: &str = "127.0.0.1";
|
||||||
|
static DEFAULT_SERVER_PORT: u16 = 5000;
|
||||||
|
|
||||||
|
/*
|
||||||
|
* For QUIC transport parameters
|
||||||
|
* <https://datatracker.ietf.org/doc/html/rfc9000#section-7.4>
|
||||||
|
*
|
||||||
|
* A HTTP client might specify "http/1.1" and/or "h2" or "h3".
|
||||||
|
* Other well-known values are listed in the at IANA registry at
|
||||||
|
* <https://www.iana.org/assignments/tls-extensiontype-values/tls-extensiontype-values.xhtml#alpn-protocol-ids>.
|
||||||
|
*/
|
||||||
|
const DEFAULT_ALPN: &str = "gst-quinn";
|
||||||
|
const DEFAULT_TIMEOUT: u32 = 15;
|
||||||
|
const DEFAULT_SECURE_CONNECTION: bool = true;
|
||||||
|
|
||||||
|
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
|
||||||
|
gst::DebugCategory::new(
|
||||||
|
"quinnquicsrc",
|
||||||
|
gst::DebugColorFlags::empty(),
|
||||||
|
Some("Quinn QUIC Source"),
|
||||||
|
)
|
||||||
|
});
|
||||||
|
|
||||||
|
struct Started {
|
||||||
|
connection: Connection,
|
||||||
|
stream: Option<RecvStream>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
enum State {
|
||||||
|
#[default]
|
||||||
|
Stopped,
|
||||||
|
Started(Started),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone, Debug)]
|
||||||
|
struct Settings {
|
||||||
|
server_address: String,
|
||||||
|
server_port: u16,
|
||||||
|
server_name: String,
|
||||||
|
alpns: Vec<String>,
|
||||||
|
timeout: u32,
|
||||||
|
secure_conn: bool,
|
||||||
|
caps: gst::Caps,
|
||||||
|
use_datagram: bool,
|
||||||
|
certificate_file: Option<PathBuf>,
|
||||||
|
private_key_file: Option<PathBuf>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for Settings {
|
||||||
|
fn default() -> Self {
|
||||||
|
Settings {
|
||||||
|
server_address: DEFAULT_SERVER_ADDR.to_string(),
|
||||||
|
server_port: DEFAULT_SERVER_PORT,
|
||||||
|
server_name: DEFAULT_SERVER_NAME.to_string(),
|
||||||
|
alpns: vec![DEFAULT_ALPN.to_string()],
|
||||||
|
timeout: DEFAULT_TIMEOUT,
|
||||||
|
secure_conn: DEFAULT_SECURE_CONNECTION,
|
||||||
|
caps: gst::Caps::new_any(),
|
||||||
|
use_datagram: false,
|
||||||
|
certificate_file: None,
|
||||||
|
private_key_file: None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub struct QuinnQuicSrc {
|
||||||
|
settings: Mutex<Settings>,
|
||||||
|
state: Mutex<State>,
|
||||||
|
canceller: Mutex<Option<future::AbortHandle>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for QuinnQuicSrc {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
settings: Mutex::new(Settings::default()),
|
||||||
|
state: Mutex::new(State::default()),
|
||||||
|
canceller: Mutex::new(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl GstObjectImpl for QuinnQuicSrc {}
|
||||||
|
|
||||||
|
impl ElementImpl for QuinnQuicSrc {
|
||||||
|
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
|
||||||
|
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
|
||||||
|
gst::subclass::ElementMetadata::new(
|
||||||
|
"Quinn QUIC Source",
|
||||||
|
"Source/Network/QUIC",
|
||||||
|
"Receive data over the network via QUIC",
|
||||||
|
"Sanchayan Maity <sanchayan@asymptotic.io>",
|
||||||
|
)
|
||||||
|
});
|
||||||
|
Some(&*ELEMENT_METADATA)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn pad_templates() -> &'static [gst::PadTemplate] {
|
||||||
|
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
|
||||||
|
let sink_pad_template = gst::PadTemplate::new(
|
||||||
|
"src",
|
||||||
|
gst::PadDirection::Src,
|
||||||
|
gst::PadPresence::Always,
|
||||||
|
&gst::Caps::new_any(),
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
vec![sink_pad_template]
|
||||||
|
});
|
||||||
|
|
||||||
|
PAD_TEMPLATES.as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn change_state(
|
||||||
|
&self,
|
||||||
|
transition: gst::StateChange,
|
||||||
|
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
|
||||||
|
if transition == gst::StateChange::NullToReady {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Fail the state change if a secure connection was requested but
|
||||||
|
* no certificate path was provided.
|
||||||
|
*/
|
||||||
|
if settings.secure_conn
|
||||||
|
&& (settings.certificate_file.is_none() || settings.private_key_file.is_none())
|
||||||
|
{
|
||||||
|
gst::error!(
|
||||||
|
CAT,
|
||||||
|
imp: self,
|
||||||
|
"Certificate or private key file not provided for secure connection"
|
||||||
|
);
|
||||||
|
return Err(gst::StateChangeError);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.parent_change_state(transition)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ObjectImpl for QuinnQuicSrc {
|
||||||
|
fn constructed(&self) {
|
||||||
|
self.parent_constructed();
|
||||||
|
self.obj().set_format(gst::Format::Bytes);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn properties() -> &'static [glib::ParamSpec] {
|
||||||
|
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
|
||||||
|
vec![
|
||||||
|
glib::ParamSpecString::builder("server-name")
|
||||||
|
.nick("QUIC server name")
|
||||||
|
.blurb("Name of the QUIC server which is in server certificate")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecString::builder("server-address")
|
||||||
|
.nick("QUIC server address")
|
||||||
|
.blurb("Address of the QUIC server e.g. 127.0.0.1")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecUInt::builder("server-port")
|
||||||
|
.nick("QUIC server port")
|
||||||
|
.blurb("Port of the QUIC server e.g. 5000")
|
||||||
|
.maximum(65535)
|
||||||
|
.default_value(DEFAULT_SERVER_PORT as u32)
|
||||||
|
.readwrite()
|
||||||
|
.build(),
|
||||||
|
gst::ParamSpecArray::builder("alpn-protocols")
|
||||||
|
.nick("QUIC ALPN values")
|
||||||
|
.blurb("QUIC connection Application-Layer Protocol Negotiation (ALPN) values")
|
||||||
|
.element_spec(&glib::ParamSpecString::builder("alpn-protocol").build())
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecUInt::builder("timeout")
|
||||||
|
.nick("Timeout")
|
||||||
|
.blurb("Value in seconds to timeout QUIC endpoint requests (0 = No timeout).")
|
||||||
|
.maximum(3600)
|
||||||
|
.default_value(DEFAULT_TIMEOUT)
|
||||||
|
.readwrite()
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecBoolean::builder("secure-connection")
|
||||||
|
.nick("Use secure connection")
|
||||||
|
.blurb("Use certificates for QUIC connection. False: Insecure connection, True: Secure connection.")
|
||||||
|
.default_value(DEFAULT_SECURE_CONNECTION)
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecString::builder("certificate-file")
|
||||||
|
.nick("Certificate file")
|
||||||
|
.blurb("Path to certificate chain in single file")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecString::builder("private-key-file")
|
||||||
|
.nick("Private key file")
|
||||||
|
.blurb("Path to a PKCS8 or RSA private key file")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecBoxed::builder::<gst::Caps>("caps")
|
||||||
|
.nick("caps")
|
||||||
|
.blurb("The caps of the source pad")
|
||||||
|
.build(),
|
||||||
|
glib::ParamSpecBoolean::builder("use-datagram")
|
||||||
|
.nick("Use datagram")
|
||||||
|
.blurb("Use datagram for lower latency, unreliable messaging")
|
||||||
|
.default_value(false)
|
||||||
|
.build(),
|
||||||
|
]
|
||||||
|
});
|
||||||
|
|
||||||
|
PROPERTIES.as_ref()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
|
||||||
|
let mut settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
|
match pspec.name() {
|
||||||
|
"server-name" => {
|
||||||
|
settings.server_name = value.get::<String>().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
"server-address" => {
|
||||||
|
settings.server_address = value.get::<String>().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
"server-port" => {
|
||||||
|
settings.server_port = value.get::<u32>().expect("type checked upstream") as u16;
|
||||||
|
}
|
||||||
|
"alpn-protocols" => {
|
||||||
|
settings.alpns = value
|
||||||
|
.get::<gst::ArrayRef>()
|
||||||
|
.expect("type checked upstream")
|
||||||
|
.as_slice()
|
||||||
|
.iter()
|
||||||
|
.map(|alpn| {
|
||||||
|
alpn.get::<&str>()
|
||||||
|
.expect("type checked upstream")
|
||||||
|
.to_string()
|
||||||
|
})
|
||||||
|
.collect::<Vec<String>>()
|
||||||
|
}
|
||||||
|
"caps" => {
|
||||||
|
settings.caps = value
|
||||||
|
.get::<Option<gst::Caps>>()
|
||||||
|
.expect("type checked upstream")
|
||||||
|
.unwrap_or_else(gst::Caps::new_any);
|
||||||
|
|
||||||
|
let srcpad = self.obj().static_pad("src").expect("source pad expected");
|
||||||
|
srcpad.mark_reconfigure();
|
||||||
|
}
|
||||||
|
"timeout" => {
|
||||||
|
settings.timeout = value.get().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
"secure-connection" => {
|
||||||
|
settings.secure_conn = value.get().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
"certificate-file" => {
|
||||||
|
let value: String = value.get().unwrap();
|
||||||
|
settings.certificate_file = Some(value.into());
|
||||||
|
}
|
||||||
|
"private-key-file" => {
|
||||||
|
let value: String = value.get().unwrap();
|
||||||
|
settings.private_key_file = Some(value.into());
|
||||||
|
}
|
||||||
|
"use-datagram" => {
|
||||||
|
settings.use_datagram = value.get().expect("type checked upstream");
|
||||||
|
}
|
||||||
|
_ => unimplemented!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
|
match pspec.name() {
|
||||||
|
"server-name" => settings.server_name.to_value(),
|
||||||
|
"server-address" => settings.server_address.to_string().to_value(),
|
||||||
|
"server-port" => {
|
||||||
|
let port = settings.server_port as u32;
|
||||||
|
port.to_value()
|
||||||
|
}
|
||||||
|
"alpn-protocols" => {
|
||||||
|
let alpns = settings.alpns.iter().map(|v| v.as_str());
|
||||||
|
gst::Array::new(alpns).to_value()
|
||||||
|
}
|
||||||
|
"caps" => settings.caps.to_value(),
|
||||||
|
"timeout" => settings.timeout.to_value(),
|
||||||
|
"secure-connection" => settings.secure_conn.to_value(),
|
||||||
|
"certificate-file" => {
|
||||||
|
let certfile = settings.certificate_file.as_ref();
|
||||||
|
certfile.and_then(|file| file.to_str()).to_value()
|
||||||
|
}
|
||||||
|
"private-key-file" => {
|
||||||
|
let privkey = settings.private_key_file.as_ref();
|
||||||
|
privkey.and_then(|file| file.to_str()).to_value()
|
||||||
|
}
|
||||||
|
"use-datagram" => settings.use_datagram.to_value(),
|
||||||
|
_ => unimplemented!(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[glib::object_subclass]
|
||||||
|
impl ObjectSubclass for QuinnQuicSrc {
|
||||||
|
const NAME: &'static str = "GstQuinnQUICSrc";
|
||||||
|
type Type = super::QuinnQuicSrc;
|
||||||
|
type ParentType = gst_base::BaseSrc;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl BaseSrcImpl for QuinnQuicSrc {
|
||||||
|
fn is_seekable(&self) -> bool {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
|
||||||
|
fn start(&self) -> Result<(), gst::ErrorMessage> {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
let timeout = settings.timeout;
|
||||||
|
drop(settings);
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
if let State::Started { .. } = *state {
|
||||||
|
unreachable!("QuicSrc already started");
|
||||||
|
}
|
||||||
|
|
||||||
|
match wait(&self.canceller, self.wait_for_connection(), timeout) {
|
||||||
|
Ok(Ok((c, s))) => {
|
||||||
|
*state = State::Started(Started {
|
||||||
|
connection: c,
|
||||||
|
stream: s,
|
||||||
|
});
|
||||||
|
|
||||||
|
gst::info!(CAT, imp: self, "Started");
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
Ok(Err(e)) | Err(e) => match e {
|
||||||
|
WaitError::FutureAborted => {
|
||||||
|
gst::warning!(CAT, imp: self, "Connection aborted");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
WaitError::FutureError(err) => {
|
||||||
|
gst::error!(CAT, imp: self, "Connection request failed: {}", err);
|
||||||
|
Err(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Connection request failed: {}", err]
|
||||||
|
))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn stop(&self) -> Result<(), gst::ErrorMessage> {
|
||||||
|
self.cancel();
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
if let State::Started(ref mut state) = *state {
|
||||||
|
let connection = &state.connection;
|
||||||
|
|
||||||
|
connection.close(
|
||||||
|
CONNECTION_CLOSE_CODE.into(),
|
||||||
|
CONNECTION_CLOSE_MSG.as_bytes(),
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
*state = State::Stopped;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn query(&self, query: &mut gst::QueryRef) -> bool {
|
||||||
|
if let gst::QueryViewMut::Scheduling(q) = query.view_mut() {
|
||||||
|
q.set(
|
||||||
|
gst::SchedulingFlags::SEQUENTIAL | gst::SchedulingFlags::BANDWIDTH_LIMITED,
|
||||||
|
1,
|
||||||
|
-1,
|
||||||
|
0,
|
||||||
|
);
|
||||||
|
q.add_scheduling_modes(&[gst::PadMode::Pull, gst::PadMode::Push]);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
BaseSrcImplExt::parent_query(self, query)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create(
|
||||||
|
&self,
|
||||||
|
offset: u64,
|
||||||
|
buffer: Option<&mut gst::BufferRef>,
|
||||||
|
length: u32,
|
||||||
|
) -> Result<CreateSuccess, gst::FlowError> {
|
||||||
|
let data = self.get(offset, u64::from(length));
|
||||||
|
|
||||||
|
match data {
|
||||||
|
Ok(bytes) => {
|
||||||
|
if bytes.is_empty() {
|
||||||
|
gst::debug!(CAT, imp: self, "End of stream");
|
||||||
|
return Err(gst::FlowError::Eos);
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(buffer) = buffer {
|
||||||
|
if let Err(copied_bytes) = buffer.copy_from_slice(0, bytes.as_ref()) {
|
||||||
|
buffer.set_size(copied_bytes);
|
||||||
|
}
|
||||||
|
Ok(CreateSuccess::FilledBuffer)
|
||||||
|
} else {
|
||||||
|
Ok(CreateSuccess::NewBuffer(gst::Buffer::from_slice(bytes)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(None) => Err(gst::FlowError::Flushing),
|
||||||
|
Err(Some(err)) => {
|
||||||
|
gst::error!(CAT, imp: self, "Could not GET: {}", err);
|
||||||
|
Err(gst::FlowError::Error)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn unlock(&self) -> Result<(), gst::ErrorMessage> {
|
||||||
|
self.cancel();
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn caps(&self, filter: Option<&gst::Caps>) -> Option<gst::Caps> {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
|
let mut tmp_caps = settings.caps.clone();
|
||||||
|
|
||||||
|
gst::debug!(CAT, imp: self, "Advertising our own caps: {:?}", &tmp_caps);
|
||||||
|
|
||||||
|
if let Some(filter_caps) = filter {
|
||||||
|
gst::debug!(
|
||||||
|
CAT,
|
||||||
|
imp: self,
|
||||||
|
"Intersecting with filter caps: {:?}",
|
||||||
|
&filter_caps
|
||||||
|
);
|
||||||
|
|
||||||
|
tmp_caps = filter_caps.intersect_with_mode(&tmp_caps, gst::CapsIntersectMode::First);
|
||||||
|
};
|
||||||
|
|
||||||
|
gst::debug!(CAT, imp: self, "Returning caps: {:?}", &tmp_caps);
|
||||||
|
|
||||||
|
Some(tmp_caps)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl QuinnQuicSrc {
|
||||||
|
fn get(&self, _offset: u64, length: u64) -> Result<Bytes, Option<gst::ErrorMessage>> {
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
let timeout = settings.timeout;
|
||||||
|
let use_datagram = settings.use_datagram;
|
||||||
|
drop(settings);
|
||||||
|
|
||||||
|
let mut state = self.state.lock().unwrap();
|
||||||
|
|
||||||
|
let (conn, stream) = match *state {
|
||||||
|
State::Started(Started {
|
||||||
|
ref connection,
|
||||||
|
ref mut stream,
|
||||||
|
}) => (connection, stream),
|
||||||
|
State::Stopped => {
|
||||||
|
return Err(Some(gst::error_msg!(
|
||||||
|
gst::LibraryError::Failed,
|
||||||
|
["Cannot get data before start"]
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let future = async {
|
||||||
|
if use_datagram {
|
||||||
|
match conn.read_datagram().await {
|
||||||
|
Ok(bytes) => Ok(bytes),
|
||||||
|
Err(err) => match err {
|
||||||
|
ConnectionError::ApplicationClosed(_)
|
||||||
|
| ConnectionError::ConnectionClosed(_) => Ok(Bytes::new()),
|
||||||
|
_ => Err(WaitError::FutureError(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Datagram read error: {}", err]
|
||||||
|
))),
|
||||||
|
},
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
let recv = stream.as_mut().unwrap();
|
||||||
|
|
||||||
|
match recv.read_chunk(length as usize, true).await {
|
||||||
|
Ok(Some(chunk)) => Ok(chunk.bytes),
|
||||||
|
Ok(None) => Ok(Bytes::new()),
|
||||||
|
Err(err) => Err(WaitError::FutureError(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Stream read error: {}", err]
|
||||||
|
))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match wait(&self.canceller, future, timeout) {
|
||||||
|
Ok(Ok(bytes)) => Ok(bytes),
|
||||||
|
Ok(Err(e)) | Err(e) => match e {
|
||||||
|
WaitError::FutureAborted => {
|
||||||
|
gst::warning!(CAT, imp: self, "Read from stream request aborted");
|
||||||
|
Err(None)
|
||||||
|
}
|
||||||
|
WaitError::FutureError(e) => {
|
||||||
|
gst::error!(CAT, imp: self, "Failed to read from stream: {}", e);
|
||||||
|
Err(Some(e))
|
||||||
|
}
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cancel(&self) {
|
||||||
|
let mut canceller = self.canceller.lock().unwrap();
|
||||||
|
|
||||||
|
if let Some(c) = canceller.take() {
|
||||||
|
c.abort()
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn wait_for_connection(&self) -> Result<(Connection, Option<RecvStream>), WaitError> {
|
||||||
|
let server_addr;
|
||||||
|
let server_name;
|
||||||
|
let alpns;
|
||||||
|
let use_datagram;
|
||||||
|
let secure_conn;
|
||||||
|
let cert_file;
|
||||||
|
let private_key_file;
|
||||||
|
|
||||||
|
{
|
||||||
|
let settings = self.settings.lock().unwrap();
|
||||||
|
|
||||||
|
server_addr = make_socket_addr(
|
||||||
|
format!("{}:{}", settings.server_address, settings.server_port).as_str(),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
server_name = settings.server_name.clone();
|
||||||
|
alpns = settings.alpns.clone();
|
||||||
|
use_datagram = settings.use_datagram;
|
||||||
|
secure_conn = settings.secure_conn;
|
||||||
|
cert_file = settings.certificate_file.clone();
|
||||||
|
private_key_file = settings.private_key_file.clone();
|
||||||
|
}
|
||||||
|
|
||||||
|
let endpoint = server_endpoint(
|
||||||
|
server_addr,
|
||||||
|
&server_name,
|
||||||
|
secure_conn,
|
||||||
|
alpns,
|
||||||
|
cert_file,
|
||||||
|
private_key_file,
|
||||||
|
)
|
||||||
|
.map_err(|err| {
|
||||||
|
WaitError::FutureError(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Failed to configure endpoint: {}", err]
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let incoming_conn = endpoint.accept().await.unwrap();
|
||||||
|
|
||||||
|
let connection = incoming_conn.await.map_err(|err| {
|
||||||
|
WaitError::FutureError(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Connection error: {}", err]
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let stream = if !use_datagram {
|
||||||
|
let res = connection.accept_uni().await.map_err(|err| {
|
||||||
|
WaitError::FutureError(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Failed to open stream: {}", err]
|
||||||
|
))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
Some(res)
|
||||||
|
} else {
|
||||||
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
gst::info!(
|
||||||
|
CAT,
|
||||||
|
imp: self,
|
||||||
|
"Remote connection accepted: {}",
|
||||||
|
connection.remote_address()
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok((connection, stream))
|
||||||
|
}
|
||||||
|
}
|
26
net/quinn/src/quinnquicsrc/mod.rs
Normal file
26
net/quinn/src/quinnquicsrc/mod.rs
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
// Copyright (C) 2024, Asymptotic Inc.
|
||||||
|
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||||
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||||
|
// <https://mozilla.org/MPL/2.0/>.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use gst::glib;
|
||||||
|
use gst::prelude::*;
|
||||||
|
|
||||||
|
mod imp;
|
||||||
|
|
||||||
|
glib::wrapper! {
|
||||||
|
pub struct QuinnQuicSrc(ObjectSubclass<imp::QuinnQuicSrc>) @extends gst_base::BaseSrc, gst::Element, gst::Object;
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
|
||||||
|
gst::Element::register(
|
||||||
|
Some(plugin),
|
||||||
|
"quinnquicsrc",
|
||||||
|
gst::Rank::MARGINAL,
|
||||||
|
QuinnQuicSrc::static_type(),
|
||||||
|
)
|
||||||
|
}
|
310
net/quinn/src/utils.rs
Normal file
310
net/quinn/src/utils.rs
Normal file
|
@ -0,0 +1,310 @@
|
||||||
|
// Copyright (C) 2024, Asymptotic Inc.
|
||||||
|
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
|
||||||
|
//G
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||||
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||||
|
// <https://mozilla.org/MPL/2.0/>.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use futures::future;
|
||||||
|
use futures::prelude::*;
|
||||||
|
use gst::ErrorMessage;
|
||||||
|
use once_cell::sync::Lazy;
|
||||||
|
use quinn::{ClientConfig, Endpoint, ServerConfig};
|
||||||
|
use std::error::Error;
|
||||||
|
use std::fs::File;
|
||||||
|
use std::io::BufReader;
|
||||||
|
use std::net::SocketAddr;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::time::Duration;
|
||||||
|
use thiserror::Error;
|
||||||
|
use tokio::runtime;
|
||||||
|
|
||||||
|
pub const CONNECTION_CLOSE_CODE: u32 = 0;
|
||||||
|
pub const CONNECTION_CLOSE_MSG: &str = "Stopped";
|
||||||
|
|
||||||
|
#[derive(Error, Debug)]
|
||||||
|
pub enum WaitError {
|
||||||
|
#[error("Future aborted")]
|
||||||
|
FutureAborted,
|
||||||
|
#[error("Future returned an error: {0}")]
|
||||||
|
FutureError(ErrorMessage),
|
||||||
|
}
|
||||||
|
|
||||||
|
pub static RUNTIME: Lazy<runtime::Runtime> = Lazy::new(|| {
|
||||||
|
runtime::Builder::new_multi_thread()
|
||||||
|
.enable_all()
|
||||||
|
.worker_threads(1)
|
||||||
|
.thread_name("gst-quic-runtime")
|
||||||
|
.build()
|
||||||
|
.unwrap()
|
||||||
|
});
|
||||||
|
|
||||||
|
pub fn wait<F, T>(
|
||||||
|
canceller: &Mutex<Option<future::AbortHandle>>,
|
||||||
|
future: F,
|
||||||
|
timeout: u32,
|
||||||
|
) -> Result<T, WaitError>
|
||||||
|
where
|
||||||
|
F: Send + Future<Output = T>,
|
||||||
|
T: Send + 'static,
|
||||||
|
{
|
||||||
|
let mut canceller_guard = canceller.lock().unwrap();
|
||||||
|
let (abort_handle, abort_registration) = future::AbortHandle::new_pair();
|
||||||
|
|
||||||
|
if canceller_guard.is_some() {
|
||||||
|
return Err(WaitError::FutureError(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Old Canceller should not exist"]
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
canceller_guard.replace(abort_handle);
|
||||||
|
drop(canceller_guard);
|
||||||
|
|
||||||
|
let future = async {
|
||||||
|
if timeout == 0 {
|
||||||
|
Ok(future.await)
|
||||||
|
} else {
|
||||||
|
let res = tokio::time::timeout(Duration::from_secs(timeout.into()), future).await;
|
||||||
|
|
||||||
|
match res {
|
||||||
|
Ok(r) => Ok(r),
|
||||||
|
Err(e) => Err(gst::error_msg!(
|
||||||
|
gst::ResourceError::Read,
|
||||||
|
["Request timeout, elapsed: {}", e.to_string()]
|
||||||
|
)),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let future = async {
|
||||||
|
match future::Abortable::new(future, abort_registration).await {
|
||||||
|
Ok(Ok(res)) => Ok(res),
|
||||||
|
|
||||||
|
Ok(Err(err)) => Err(WaitError::FutureError(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Future resolved with an error {:?}", err]
|
||||||
|
))),
|
||||||
|
|
||||||
|
Err(future::Aborted) => Err(WaitError::FutureAborted),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let res = RUNTIME.block_on(future);
|
||||||
|
|
||||||
|
canceller_guard = canceller.lock().unwrap();
|
||||||
|
*canceller_guard = None;
|
||||||
|
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn make_socket_addr(addr: &str) -> Result<SocketAddr, WaitError> {
|
||||||
|
match addr.parse::<SocketAddr>() {
|
||||||
|
Ok(address) => Ok(address),
|
||||||
|
Err(e) => Err(WaitError::FutureError(gst::error_msg!(
|
||||||
|
gst::ResourceError::Failed,
|
||||||
|
["Invalid address: {}", e]
|
||||||
|
))),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Following functions are taken from Quinn documentation/repository
|
||||||
|
*/
|
||||||
|
struct SkipServerVerification;
|
||||||
|
|
||||||
|
impl SkipServerVerification {
|
||||||
|
pub fn new() -> Arc<Self> {
|
||||||
|
Arc::new(Self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl rustls::client::ServerCertVerifier for SkipServerVerification {
|
||||||
|
fn verify_server_cert(
|
||||||
|
&self,
|
||||||
|
_end_entity: &rustls::Certificate,
|
||||||
|
_intermediates: &[rustls::Certificate],
|
||||||
|
_server_name: &rustls::ServerName,
|
||||||
|
_scts: &mut dyn Iterator<Item = &[u8]>,
|
||||||
|
_ocsp_response: &[u8],
|
||||||
|
_now: std::time::SystemTime,
|
||||||
|
) -> Result<rustls::client::ServerCertVerified, rustls::Error> {
|
||||||
|
Ok(rustls::client::ServerCertVerified::assertion())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn configure_client(
|
||||||
|
secure_conn: bool,
|
||||||
|
certificate_file: Option<PathBuf>,
|
||||||
|
private_key_file: Option<PathBuf>,
|
||||||
|
alpns: Vec<String>,
|
||||||
|
) -> Result<ClientConfig, Box<dyn Error>> {
|
||||||
|
let mut crypto = if secure_conn {
|
||||||
|
let (certs, key) = read_certs_from_file(certificate_file, private_key_file)?;
|
||||||
|
let mut cert_store = rustls::RootCertStore::empty();
|
||||||
|
|
||||||
|
for cert in &certs {
|
||||||
|
cert_store.add(cert)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
rustls::ClientConfig::builder()
|
||||||
|
.with_safe_defaults()
|
||||||
|
.with_root_certificates(Arc::new(cert_store))
|
||||||
|
.with_client_auth_cert(certs, key)?
|
||||||
|
} else {
|
||||||
|
rustls::ClientConfig::builder()
|
||||||
|
.with_safe_defaults()
|
||||||
|
.with_custom_certificate_verifier(SkipServerVerification::new())
|
||||||
|
.with_no_client_auth()
|
||||||
|
};
|
||||||
|
|
||||||
|
let alpn_protocols: Vec<Vec<u8>> = alpns
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.as_bytes().to_vec())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
crypto.alpn_protocols = alpn_protocols;
|
||||||
|
crypto.key_log = Arc::new(rustls::KeyLogFile::new());
|
||||||
|
|
||||||
|
Ok(ClientConfig::new(Arc::new(crypto)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn read_certs_from_file(
|
||||||
|
certificate_file: Option<PathBuf>,
|
||||||
|
private_key_file: Option<PathBuf>,
|
||||||
|
) -> Result<(Vec<rustls::Certificate>, rustls::PrivateKey), Box<dyn Error>> {
|
||||||
|
/*
|
||||||
|
* NOTE:
|
||||||
|
*
|
||||||
|
* Certificate file here should correspond to fullchain.pem where
|
||||||
|
* fullchain.pem = cert.pem + chain.pem.
|
||||||
|
* fullchain.pem DOES NOT include a CA's Root Certificates.
|
||||||
|
*
|
||||||
|
* One typically uses chain.pem (or the first certificate in it) when asked
|
||||||
|
* for a CA bundle or CA certificate.
|
||||||
|
*
|
||||||
|
* One typically uses fullchain.pem when asked for the entire certificate
|
||||||
|
* chain in a single file. For example, this is the case of modern day
|
||||||
|
* Apache and nginx.
|
||||||
|
*/
|
||||||
|
let cert_file = certificate_file
|
||||||
|
.clone()
|
||||||
|
.expect("Expected path to certificates be valid");
|
||||||
|
let key_file = private_key_file.expect("Expected path to certificates be valid");
|
||||||
|
|
||||||
|
let certs: Vec<rustls::Certificate> = {
|
||||||
|
let cert_file = File::open(cert_file.as_path())?;
|
||||||
|
let mut cert_file_rdr = BufReader::new(cert_file);
|
||||||
|
let cert_vec = rustls_pemfile::certs(&mut cert_file_rdr)?;
|
||||||
|
cert_vec.into_iter().map(rustls::Certificate).collect()
|
||||||
|
};
|
||||||
|
|
||||||
|
let key: rustls::PrivateKey = {
|
||||||
|
let key_file = File::open(key_file.as_path())?;
|
||||||
|
let mut key_file_rdr = BufReader::new(key_file);
|
||||||
|
|
||||||
|
let keys_iter = rustls_pemfile::read_all(&mut key_file_rdr)?;
|
||||||
|
let key_item = keys_iter
|
||||||
|
.into_iter()
|
||||||
|
.next()
|
||||||
|
.ok_or("Certificate should have at least one private key")?;
|
||||||
|
|
||||||
|
match key_item {
|
||||||
|
rustls_pemfile::Item::RSAKey(key) => rustls::PrivateKey(key),
|
||||||
|
rustls_pemfile::Item::PKCS8Key(key) => rustls::PrivateKey(key),
|
||||||
|
_ => unimplemented!(),
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((certs, key))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn configure_server(
|
||||||
|
server_name: &str,
|
||||||
|
secure_conn: bool,
|
||||||
|
certificate_file: Option<PathBuf>,
|
||||||
|
private_key_file: Option<PathBuf>,
|
||||||
|
alpns: Vec<String>,
|
||||||
|
) -> Result<(ServerConfig, Vec<rustls::Certificate>), Box<dyn Error>> {
|
||||||
|
let (certs, key) = if secure_conn {
|
||||||
|
read_certs_from_file(certificate_file, private_key_file)?
|
||||||
|
} else {
|
||||||
|
let cert = rcgen::generate_simple_self_signed(vec![server_name.into()]).unwrap();
|
||||||
|
let cert_der = cert.serialize_der().unwrap();
|
||||||
|
let priv_key = cert.serialize_private_key_der();
|
||||||
|
let priv_key = rustls::PrivateKey(priv_key);
|
||||||
|
let cert_chain = vec![rustls::Certificate(cert_der)];
|
||||||
|
|
||||||
|
(cert_chain, priv_key)
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut crypto = if secure_conn {
|
||||||
|
let mut cert_store = rustls::RootCertStore::empty();
|
||||||
|
for cert in &certs {
|
||||||
|
cert_store.add(cert)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
let auth_client = rustls::server::AllowAnyAuthenticatedClient::new(cert_store);
|
||||||
|
rustls::ServerConfig::builder()
|
||||||
|
.with_safe_defaults()
|
||||||
|
.with_client_cert_verifier(Arc::new(auth_client))
|
||||||
|
.with_single_cert(certs.clone(), key)
|
||||||
|
} else {
|
||||||
|
rustls::ServerConfig::builder()
|
||||||
|
.with_safe_defaults()
|
||||||
|
.with_no_client_auth()
|
||||||
|
.with_single_cert(certs.clone(), key)
|
||||||
|
}?;
|
||||||
|
|
||||||
|
let alpn_protocols: Vec<Vec<u8>> = alpns
|
||||||
|
.iter()
|
||||||
|
.map(|x| x.as_bytes().to_vec())
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
crypto.alpn_protocols = alpn_protocols;
|
||||||
|
crypto.key_log = Arc::new(rustls::KeyLogFile::new());
|
||||||
|
let mut server_config = ServerConfig::with_crypto(Arc::new(crypto));
|
||||||
|
|
||||||
|
Arc::get_mut(&mut server_config.transport)
|
||||||
|
.unwrap()
|
||||||
|
.max_concurrent_bidi_streams(0_u8.into())
|
||||||
|
.max_concurrent_uni_streams(1_u8.into());
|
||||||
|
|
||||||
|
Ok((server_config, certs))
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn server_endpoint(
|
||||||
|
server_addr: SocketAddr,
|
||||||
|
server_name: &str,
|
||||||
|
secure_conn: bool,
|
||||||
|
alpns: Vec<String>,
|
||||||
|
certificate_file: Option<PathBuf>,
|
||||||
|
private_key_file: Option<PathBuf>,
|
||||||
|
) -> Result<Endpoint, Box<dyn Error>> {
|
||||||
|
let (server_config, _) = configure_server(
|
||||||
|
server_name,
|
||||||
|
secure_conn,
|
||||||
|
certificate_file,
|
||||||
|
private_key_file,
|
||||||
|
alpns,
|
||||||
|
)?;
|
||||||
|
let endpoint = Endpoint::server(server_config, server_addr)?;
|
||||||
|
|
||||||
|
Ok(endpoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn client_endpoint(
|
||||||
|
client_addr: SocketAddr,
|
||||||
|
secure_conn: bool,
|
||||||
|
alpns: Vec<String>,
|
||||||
|
certificate_file: Option<PathBuf>,
|
||||||
|
private_key_file: Option<PathBuf>,
|
||||||
|
) -> Result<Endpoint, Box<dyn Error>> {
|
||||||
|
let client_cfg = configure_client(secure_conn, certificate_file, private_key_file, alpns)?;
|
||||||
|
let mut endpoint = Endpoint::client(client_addr)?;
|
||||||
|
|
||||||
|
endpoint.set_default_client_config(client_cfg);
|
||||||
|
|
||||||
|
Ok(endpoint)
|
||||||
|
}
|
112
net/quinn/tests/quinnquic.rs
Normal file
112
net/quinn/tests/quinnquic.rs
Normal file
|
@ -0,0 +1,112 @@
|
||||||
|
// Copyright (C) 2024, Asymptotic Inc.
|
||||||
|
// Author: Sanchayan Maity <sanchayan@asymptotic.io>
|
||||||
|
//
|
||||||
|
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
|
||||||
|
// If a copy of the MPL was not distributed with this file, You can obtain one at
|
||||||
|
// <https://mozilla.org/MPL/2.0/>.
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: MPL-2.0
|
||||||
|
|
||||||
|
use gst::prelude::*;
|
||||||
|
use serial_test::serial;
|
||||||
|
use std::thread;
|
||||||
|
|
||||||
|
fn init() {
|
||||||
|
use std::sync::Once;
|
||||||
|
static INIT: Once = Once::new();
|
||||||
|
|
||||||
|
INIT.call_once(|| {
|
||||||
|
gst::init().unwrap();
|
||||||
|
gstquinn::plugin_register_static().expect("QUIC source sink send receive tests");
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn make_buffer(content: &[u8]) -> gst::Buffer {
|
||||||
|
let mut buf = gst::Buffer::from_slice(content.to_owned());
|
||||||
|
buf.make_mut().set_pts(gst::ClockTime::from_mseconds(200));
|
||||||
|
buf
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[serial]
|
||||||
|
fn test_send_receive_without_datagram() {
|
||||||
|
init();
|
||||||
|
|
||||||
|
let content = "Hello, world!\n".as_bytes();
|
||||||
|
|
||||||
|
thread::spawn(move || {
|
||||||
|
let mut h1 = gst_check::Harness::new_empty();
|
||||||
|
h1.add_parse("quinnquicsink secure-connection=false");
|
||||||
|
|
||||||
|
h1.set_src_caps(gst::Caps::builder("text/plain").build());
|
||||||
|
|
||||||
|
h1.play();
|
||||||
|
|
||||||
|
assert!(h1.push(make_buffer(content)) == Ok(gst::FlowSuccess::Ok));
|
||||||
|
|
||||||
|
h1.push_event(gst::event::Eos::new());
|
||||||
|
|
||||||
|
h1.element().unwrap().set_state(gst::State::Null).unwrap();
|
||||||
|
|
||||||
|
drop(h1);
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut h2 = gst_check::Harness::new_empty();
|
||||||
|
h2.add_parse("quinnquicsrc secure-connection=false");
|
||||||
|
|
||||||
|
h2.play();
|
||||||
|
|
||||||
|
let buf = h2.pull_until_eos().unwrap().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
content,
|
||||||
|
buf.into_mapped_buffer_readable().unwrap().as_slice()
|
||||||
|
);
|
||||||
|
|
||||||
|
h2.element().unwrap().set_state(gst::State::Null).unwrap();
|
||||||
|
|
||||||
|
drop(h2);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
#[serial]
|
||||||
|
fn test_send_receive_with_datagram() {
|
||||||
|
init();
|
||||||
|
|
||||||
|
let content = "Hello, world!\n".as_bytes();
|
||||||
|
|
||||||
|
// Use a different port address compared to the default that will be used
|
||||||
|
// in the other test. We get a address already in use error otherwise.
|
||||||
|
thread::spawn(move || {
|
||||||
|
let mut h1 = gst_check::Harness::new_empty();
|
||||||
|
h1.add_parse("quinnquicsrc use-datagram=true server-address=127.0.0.1 server-port=6000 secure-connection=false");
|
||||||
|
|
||||||
|
h1.play();
|
||||||
|
|
||||||
|
let buf = h1.pull_until_eos().unwrap().unwrap();
|
||||||
|
|
||||||
|
assert_eq!(
|
||||||
|
content,
|
||||||
|
buf.into_mapped_buffer_readable().unwrap().as_slice()
|
||||||
|
);
|
||||||
|
|
||||||
|
h1.element().unwrap().set_state(gst::State::Null).unwrap();
|
||||||
|
|
||||||
|
drop(h1);
|
||||||
|
});
|
||||||
|
|
||||||
|
let mut h2 = gst_check::Harness::new_empty();
|
||||||
|
h2.add_parse("quinnquicsink use-datagram=true client-address=127.0.0.1 client-port=6001 server-address=127.0.0.1 server-port=6000 secure-connection=false");
|
||||||
|
|
||||||
|
h2.set_src_caps(gst::Caps::builder("text/plain").build());
|
||||||
|
|
||||||
|
h2.play();
|
||||||
|
|
||||||
|
assert!(h2.push(make_buffer(content)) == Ok(gst::FlowSuccess::Ok));
|
||||||
|
|
||||||
|
h2.push_event(gst::event::Eos::new());
|
||||||
|
|
||||||
|
h2.element().unwrap().set_state(gst::State::Null).unwrap();
|
||||||
|
|
||||||
|
drop(h2);
|
||||||
|
}
|
Loading…
Reference in a new issue