TOBESQUASHED address review feedback

This commit is contained in:
François Laignel 2024-05-01 21:34:46 +02:00
parent 60f7f4a664
commit 11bea7d896
8 changed files with 154 additions and 120 deletions

1
Cargo.lock generated
View file

@ -2974,6 +2974,7 @@ dependencies = [
"gstreamer-webrtc",
"http 1.1.0",
"human_bytes",
"itertools 0.12.1",
"livekit-api",
"livekit-protocol",
"once_cell",

View file

@ -8040,12 +8040,12 @@
"kind": "object",
"properties": {
"audio-caps": {
"blurb": "Governs what audio codecs will be proposed",
"blurb": "Governs what audio codecs will be proposed. Valid values: [audio/x-opus; audio/x-raw, format=(string)S24BE, layout=(string)interleaved; audio/x-raw, format=(string)S16BE, layout=(string)interleaved; audio/x-raw, format=(string)U8, layout=(string)interleaved]",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "audio/x-opus; audio/x-raw, format=(string)S24BE, layout=(GstAudioLayout)interleaved; audio/x-raw, format=(string)S16BE, layout=(GstAudioLayout)interleaved; audio/x-raw, format=(string)U8, layout=(GstAudioLayout)interleaved",
"default": "audio/x-opus",
"mutable": "ready",
"readable": true,
"type": "GstCaps",
@ -8223,12 +8223,12 @@
"writable": true
},
"video-caps": {
"blurb": "Governs what video codecs will be proposed",
"blurb": "Governs what video codecs will be proposed. Valid values: [video/x-vp8; video/x-h264; video/x-vp9; video/x-h265; video/x-raw, format=(string){ RGB, RGBA, BGR, BGRA, AYUV, UYVY, I420, Y41B, UYVP }]",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "video/x-vp8; video/x-h264; video/x-vp9; video/x-h265; video/x-raw, format=(string){ RGB, RGBA, BGR, BGRA, AYUV, UYVY, I420, Y41B, UYVP }",
"default": "video/x-vp8; video/x-h264; video/x-vp9; video/x-h265",
"mutable": "ready",
"readable": true,
"type": "GstCaps",

View file

@ -25,6 +25,7 @@ anyhow = "1"
chrono = "0.4"
thiserror = "1"
futures = "0.3"
itertools = "0.12"
tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread", "time"] }
tokio-native-tls = "0.3.0"
tokio-stream = "0.1.11"

View file

@ -46,7 +46,7 @@ The example uses the default WebRTC signaller. Launch it using the following
command:
```shell
cargo run --bin gst-webrtc-signalling-server
cargo run --bin gst-webrtc-signalling-server --no-default-features
```
### Receiver
@ -55,14 +55,14 @@ The receiver awaits for new audio & video stream publishers and render the
streams using auto sink elements. Launch it using the following command:
```shell
cargo r --example webrtc-precise-sync-recv
cargo r --example webrtc-precise-sync-recv --no-default-features
```
The default configuration should work for a local test. For a multi-host setup,
see the available options:
```shell
cargo r --example webrtc-precise-sync-recv -- --help
cargo r --example webrtc-precise-sync-recv --no-default-features -- --help
```
E.g.: the following will force `avdec_h264` over hardware decoders, activate
@ -72,7 +72,8 @@ specified address:
```shell
GST_PLUGIN_FEATURE_RANK=avdec_h264:MAX \
WEBRTC_PRECISE_SYNC_RECV_LOG=debug \
cargo r --example webrtc-precise-sync-recv -- --server 192.168.1.22
cargo r --example webrtc-precise-sync-recv --no-default-features -- \
--server 192.168.1.22
```
### Sender
@ -81,7 +82,7 @@ The sender publishes audio & video test streams. Launch it using the following
command:
```shell
cargo r --example webrtc-precise-sync-send
cargo r --example webrtc-precise-sync-send --no-default-features
```
The default configuration should work for a local test. For a multi-host setup,
@ -89,7 +90,7 @@ to set the number of audio / video streams, to enable rapid synchronization or
to force the video encoder, see the available options:
```shell
cargo r --example webrtc-precise-sync-send -- --help
cargo r --example webrtc-precise-sync-send --no-default-features -- --help
```
E.g.: the following will force H264 and `x264enc` over hardware encoders,
@ -99,7 +100,7 @@ specified address:
```shell
GST_PLUGIN_FEATURE_RANK=264enc:MAX \
WEBRTC_PRECISE_SYNC_SEND_LOG=debug \
cargo r --example webrtc-precise-sync-send -- \
cargo r --example webrtc-precise-sync-send --no-default-features -- \
--server 192.168.1.22 --video-caps video/x-h264
```
@ -122,13 +123,15 @@ commands such as:
#### Receiver
```shell
cargo r --example webrtc-precise-sync-recv -- --expect-clock-signalling
cargo r --example webrtc-precise-sync-recv --no-default-features -- \
--expect-clock-signalling
```
#### Sender
```shell
cargo r --example webrtc-precise-sync-send -- --clock ntp --do-clock-signalling \
cargo r --example webrtc-precise-sync-send --no-default-features -- \
--clock ntp --do-clock-signalling \
--video-streams 0 --audio-streams 2
```
@ -139,23 +142,24 @@ The sender can be instructed to send raw payloads.
This command will stream two stereo L24 streams:
```shell
cargo r --example webrtc-precise-sync-send -- \
--video-streams 0 --audio-streams 2 \
--audio-caps 'audio/x-raw,format=S24BE,rate=48000,channels=2'
cargo r --example webrtc-precise-sync-send --no-default-features -- \
--video-streams 0 \
--audio-streams 2 --audio-codecs L24
```
Launch the receiver with:
```shell
cargo r --example webrtc-precise-sync-recv
cargo r --example webrtc-precise-sync-recv --no-default-features
```
This can be used to stream multiple RAW video streams forcing width and
allowing fallback to VP8 & OPUS if remote doesn't support raw payloads:
This can be used to stream multiple RAW video streams using specific CAPS for
the streams and allowing fallback to VP8 & OPUS if remote doesn't support raw
payloads:
```shell
cargo r --example webrtc-precise-sync-send -- \
cargo r --example webrtc-precise-sync-send --no-default-features -- \
--video-streams 2 --audio-streams 1 \
--video-caps 'video/x-raw,format=I420,width=400;video/x-vp8' \
--audio-caps 'audio/x-raw,format=S24BE,rate=48000,channels=2;video/x-opus'
--video-codecs RAW --video-codecs VP8 --video-caps video/x-raw,format=I420,width=400 \
--audio-codecs L24 --audio-codecs OPUS --audio-caps audio/x-raw,rate=48000,channels=2
```

View file

@ -1,7 +1,9 @@
use anyhow::{bail, Context};
use anyhow::{anyhow, bail, Context};
use futures::prelude::*;
use gst::prelude::*;
use gst_rtp::prelude::*;
use gstrswebrtc::utils::Codecs;
use itertools::Itertools;
use tracing::{debug, error, info};
use url::Url;
@ -47,11 +49,26 @@ struct Args {
#[clap(
long,
help = "Force audio caps (ex. for L24 'audio/x-raw,format=S24BE,rate=48000,channels=2')"
help = "Audio codecs that will be proposed in the SDP (ex. 'L24', defaults to ['OPUS']). Accepts several occurrences."
)]
pub audio_codecs: Vec<String>,
#[clap(
long,
help = "Use specific audio caps (ex. 'audio/x-raw,rate=48000,channels=2')"
)]
pub audio_caps: Option<String>,
#[clap(long, help = "Force video caps (ex. 'video/x-h264')")]
#[clap(
long,
help = "Video codecs that will be proposed in the SDP (ex. 'RAW', defaults to ['VP8', 'H264', 'VP9', 'H265']). Accepts several occurrences."
)]
pub video_codecs: Vec<String>,
#[clap(
long,
help = "Use specific video caps (ex. 'video/x-raw,format=I420,width=400')"
)]
pub video_caps: Option<String>,
#[clap(long, help = "Use RFC 6051 64-bit NTP timestamp RTP header extension.")]
@ -225,18 +242,25 @@ impl App {
});
}
let raw_audio_caps = if let Some(ref audio_caps) = self.args.audio_caps {
let caps = gst::Caps::from_str(audio_caps).context("Parsing audio caps")?;
webrtcsink.set_property("audio-caps", &caps);
if !self.args.audio_codecs.is_empty() {
let mut audio_caps = gst::Caps::new_empty();
for codec in self.args.audio_codecs.iter() {
audio_caps.merge(
Codecs::audio_codecs()
.find(|c| &c.name == codec)
.ok_or_else(|| {
anyhow!(
"Unknown audio codec {codec}. Valid values are: {}",
Codecs::audio_codecs().map(|c| c.name.as_str()).join(", ")
)
})?
.caps
.clone(),
);
}
// Reuse the first user defined caps for the raw caps
let mut s = caps.structure(0).expect("parsed above").to_owned();
s.set_name("audio/x-raw");
Some(gst::Caps::from(s))
} else {
None
};
webrtcsink.set_property("audio-caps", audio_caps);
}
for idx in 0..self.args.audio_streams {
let audiosrc = gst::ElementFactory::make("audiotestsrc")
@ -247,9 +271,14 @@ impl App {
.context("Creating audiotestsrc")?;
self.pipeline().add(&audiosrc).context("Adding audiosrc")?;
if let Some(ref raw_caps) = raw_audio_caps {
if let Some(ref caps) = self.args.audio_caps {
audiosrc
.link_pads_filtered(None, &webrtcsink, Some("audio_%u"), raw_caps)
.link_pads_filtered(
None,
&webrtcsink,
Some("audio_%u"),
&gst::Caps::from_str(caps).context("Parsing audio caps")?,
)
.context("Linking audiosrc")?;
} else {
audiosrc
@ -258,16 +287,29 @@ impl App {
}
}
if !self.args.video_codecs.is_empty() {
let mut video_caps = gst::Caps::new_empty();
for codec in self.args.video_codecs.iter() {
video_caps.merge(
Codecs::video_codecs()
.find(|c| &c.name == codec)
.ok_or_else(|| {
anyhow!(
"Unknown video codec {codec}. Valid values are: {}",
Codecs::video_codecs().map(|c| c.name.as_str()).join(", ")
)
})?
.caps
.clone(),
);
}
webrtcsink.set_property("video-caps", video_caps);
}
let raw_video_caps = {
let mut raw_video_caps = if let Some(ref video_caps) = self.args.video_caps {
let caps = gst::Caps::from_str(video_caps).context("Parsing video caps")?;
webrtcsink.set_property("video-caps", &caps);
// Reuse the first user defined caps for the raw caps
let mut s = caps.structure(0).expect("parsed above").to_owned();
s.set_name("video/x-raw");
gst::Caps::from(s)
gst::Caps::from_str(video_caps).context("Parsing video caps")?
} else {
gst::Caps::new_empty_simple("video/x-raw")
};

View file

@ -437,6 +437,7 @@ pub struct Codec {
pub name: String,
pub caps: gst::Caps,
pub stream_type: gst::StreamType,
pub is_raw: bool,
payload_type: Option<i32>,
decoding_info: Option<DecodingInfo>,
@ -481,6 +482,7 @@ impl Codec {
caps: caps.clone(),
stream_type,
name: name.into(),
is_raw: false,
payload_type: None,
decoding_info,
encoding_info,
@ -513,6 +515,7 @@ impl Codec {
caps: caps.clone(),
stream_type,
name: name.into(),
is_raw: true,
payload_type: None,
decoding_info,
encoding_info,
@ -532,11 +535,11 @@ impl Codec {
return false;
}
let decoder_info = self.decoding_info.as_ref().unwrap();
if is_raw_caps(&self.caps) {
if self.is_raw {
return true;
}
let decoder_info = self.decoding_info.as_ref().unwrap();
if decoder_info.has_decoder.load(Ordering::SeqCst) {
true
} else if Self::has_decoder_for_caps(
@ -628,7 +631,7 @@ impl Codec {
factory.static_pad_templates().iter().any(|template| {
let template_caps = template.caps();
if template.direction() != gst::PadDirection::Sink || template_caps.is_any() {
if template.direction() != gst::PadDirection::Sink {
return false;
}
@ -779,7 +782,7 @@ pub static L24_CAPS: Lazy<gst::Caps> = Lazy::new(|| {
.structure(
gst::Structure::builder("audio/x-raw")
.field("format", gst_audio::AudioFormat::S24be.to_str())
.field("layout", glib::gstr!("interleaved"))
.field("layout", "interleaved")
.build(),
)
.build()
@ -789,7 +792,7 @@ pub static L16_CAPS: Lazy<gst::Caps> = Lazy::new(|| {
.structure(
gst::Structure::builder("audio/x-raw")
.field("format", gst_audio::AudioFormat::S16be.to_str())
.field("layout", glib::gstr!("interleaved"))
.field("layout", "interleaved")
.build(),
)
.build()
@ -799,7 +802,7 @@ pub static L8_CAPS: Lazy<gst::Caps> = Lazy::new(|| {
.structure(
gst::Structure::builder("audio/x-raw")
.field("format", gst_audio::AudioFormat::U8.to_str())
.field("layout", glib::gstr!("interleaved"))
.field("layout", "interleaved")
.build(),
)
.build()
@ -980,36 +983,16 @@ impl Codecs {
.cloned()
}
pub fn video_codecs() -> Vec<Codec> {
pub fn video_codecs<'a>() -> impl Iterator<Item = &'a Codec> {
CODECS
.iter()
.filter(|codec| codec.stream_type == gst::StreamType::VIDEO)
.cloned()
.collect()
}
pub fn audio_codecs() -> Vec<Codec> {
pub fn audio_codecs<'a>() -> impl Iterator<Item = &'a Codec> {
CODECS
.iter()
.filter(|codec| codec.stream_type == gst::StreamType::AUDIO)
.cloned()
.collect()
}
pub fn video_codec_names() -> Vec<String> {
CODECS
.iter()
.filter(|codec| codec.stream_type == gst::StreamType::VIDEO)
.map(|codec| codec.name.clone())
.collect()
}
pub fn audio_codec_names() -> Vec<String> {
CODECS
.iter()
.filter(|codec| codec.stream_type == gst::StreamType::AUDIO)
.map(|codec| codec.name.clone())
.collect()
}
/// List all codecs that can be used for encoding the given caps and assign
@ -1053,8 +1036,9 @@ impl Codecs {
}
}
pub fn is_raw_caps(caps: &gst::Caps) -> bool {
["video/x-raw", "audio/x-raw"].contains(&caps.structure(0).unwrap().name().as_str())
pub fn has_raw_caps(caps: &gst::Caps) -> bool {
caps.iter()
.any(|s| ["video/x-raw", "audio/x-raw"].contains(&s.name().as_str()))
}
pub fn cleanup_codec_caps(mut caps: gst::Caps) -> gst::Caps {

View file

@ -1,6 +1,8 @@
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{cleanup_codec_caps, is_raw_caps, make_element, Codec, Codecs, NavigationEvent};
use crate::utils::{
cleanup_codec_caps, has_raw_caps, make_element, Codec, Codecs, NavigationEvent,
};
use anyhow::Context;
use gst::glib;
use gst::prelude::*;
@ -13,6 +15,7 @@ use gst_webrtc::{WebRTCDataChannel, WebRTCICETransportPolicy};
use futures::prelude::*;
use anyhow::{anyhow, Error};
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::collections::HashMap;
@ -498,12 +501,12 @@ impl Default for Settings {
Self {
video_caps: Codecs::video_codecs()
.into_iter()
.flat_map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::<Vec<_>>())
.filter(|codec| !codec.is_raw)
.flat_map(|codec| codec.caps.iter().map(ToOwned::to_owned))
.collect::<gst::Caps>(),
audio_caps: Codecs::audio_codecs()
.into_iter()
.flat_map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::<Vec<_>>())
.filter(|codec| !codec.is_raw)
.flat_map(|codec| codec.caps.iter().map(ToOwned::to_owned))
.collect::<gst::Caps>(),
stun_server: DEFAULT_STUN_SERVER.map(String::from),
turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
@ -845,7 +848,12 @@ impl PayloadChainBuilder {
codec = self.codec,
);
let needs_encoding = !is_raw_caps(&self.codec.caps) && is_raw_caps(&self.input_caps);
let needs_encoding = if self.codec.is_raw {
!self.codec.caps.can_intersect(&self.input_caps)
} else {
has_raw_caps(&self.input_caps)
};
let mut elements: Vec<gst::Element> = Vec::new();
let (raw_filter, encoder) = if needs_encoding {
@ -859,14 +867,20 @@ impl PayloadChainBuilder {
let raw_filter = self.codec.raw_converter_filter()?;
elements.push(raw_filter.clone());
let encoder = self
.codec
.build_encoder()
.expect("We should always have an encoder for negotiated codecs")?;
elements.push(encoder.clone());
elements.push(make_element("capsfilter", None)?);
let encoder = if self.codec.is_raw {
None
} else {
let encoder = self
.codec
.build_encoder()
.expect("We should always have an encoder for negotiated codecs")?;
elements.push(encoder.clone());
elements.push(make_element("capsfilter", None)?);
(Some(raw_filter), Some(encoder))
Some(encoder)
};
(Some(raw_filter), encoder)
} else {
(None, None)
};
@ -3318,7 +3332,7 @@ impl BaseWebRTCSink {
) -> Result<gst::Structure, Error> {
let pipe = PipelineWrapper(gst::Pipeline::default());
let has_raw_input = is_raw_caps(&input_caps);
let has_raw_input = has_raw_caps(&input_caps);
let src = discovery_info.create_src();
let mut elements = vec![src.clone().upcast::<gst::Element>()];
let encoding_chain_src = if codec.is_video() && has_raw_input {
@ -3503,7 +3517,7 @@ impl BaseWebRTCSink {
output_caps: gst::Caps,
codecs: &Codecs,
) -> Result<(), Error> {
let futs = if is_raw_caps(&discovery_info.caps) {
let futs = if has_raw_caps(&discovery_info.caps) {
let sink_caps = discovery_info.caps.clone();
let is_video = match sink_caps.structure(0).unwrap().name().as_str() {
@ -3514,10 +3528,7 @@ impl BaseWebRTCSink {
codecs
.iter()
.filter(|codec| {
codec.is_video() == is_video
&& (!is_raw_caps(&codec.caps) || codec.caps.can_intersect(&sink_caps))
})
.filter(|codec| codec.is_video() == is_video)
.map(|codec| {
BaseWebRTCSink::run_discovery_pipeline(
element,
@ -3536,7 +3547,7 @@ impl BaseWebRTCSink {
gst::info!(
CAT,
obj: element,
"Stream already conforms to {}, still need to payload it",
"Stream is already in the {} format, we still need to payload it",
codec.name
);
@ -3834,12 +3845,16 @@ impl ObjectImpl for BaseWebRTCSink {
vec![
glib::ParamSpecBoxed::builder::<gst::Caps>("video-caps")
.nick("Video encoder caps")
.blurb("Governs what video codecs will be proposed")
.blurb(&format!("Governs what video codecs will be proposed. Valid values: [{}]",
Codecs::video_codecs().map(|c| c.caps.to_string()).join("; ")
))
.mutable_ready()
.build(),
glib::ParamSpecBoxed::builder::<gst::Caps>("audio-caps")
.nick("Audio encoder caps")
.blurb("Governs what audio codecs will be proposed")
.blurb(&format!("Governs what audio codecs will be proposed. Valid values: [{}]",
Codecs::audio_codecs().map(|c| c.caps.to_string()).join("; ")
))
.mutable_ready()
.build(),
glib::ParamSpecString::builder("stun-server")
@ -4284,10 +4299,7 @@ impl ElementImpl for BaseWebRTCSink {
gst::CapsFeatures::new([D3D11_MEMORY_FEATURE]),
);
for codec in Codecs::video_codecs()
.iter()
.filter(|codec| !is_raw_caps(&codec.caps))
{
for codec in Codecs::video_codecs().filter(|codec| !codec.is_raw) {
caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}
@ -4302,10 +4314,7 @@ impl ElementImpl for BaseWebRTCSink {
let mut caps_builder =
gst::Caps::builder_full().structure(gst::Structure::builder("audio/x-raw").build());
for codec in Codecs::audio_codecs()
.iter()
.filter(|codec| !is_raw_caps(&codec.caps))
{
for codec in Codecs::audio_codecs().filter(|codec| !codec.is_raw) {
caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}
let audio_pad_template = gst::PadTemplate::with_gtype(

View file

@ -9,6 +9,7 @@ use anyhow::{Context, Error};
use gst::glib;
use gst::subclass::prelude::*;
use gst_webrtc::WebRTCDataChannel;
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::borrow::BorrowMut;
use std::collections::HashSet;
@ -96,14 +97,14 @@ impl ObjectImpl for BaseWebRTCSrc {
gst::ParamSpecArray::builder("video-codecs")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]",
Codecs::video_codec_names().into_iter().collect::<Vec<String>>().join(", ")
Codecs::video_codecs().map(|c| c.name.as_str()).join(", ")
))
.element_spec(&glib::ParamSpecString::builder("video-codec-name").build())
.build(),
gst::ParamSpecArray::builder("audio-codecs")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]",
Codecs::audio_codec_names().into_iter().collect::<Vec<String>>().join(", ")
Codecs::audio_codecs().map(|c| c.name.as_str()).join(", ")
))
.element_spec(&glib::ParamSpecString::builder("audio-codec-name").build())
.build(),
@ -273,12 +274,12 @@ impl Default for Settings {
signaller: signaller.upcast(),
meta: Default::default(),
audio_codecs: Codecs::audio_codecs()
.into_iter()
.filter(|codec| codec.can_be_received())
.cloned()
.collect(),
video_codecs: Codecs::video_codecs()
.into_iter()
.filter(|codec| codec.can_be_received())
.cloned()
.collect(),
enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION,
do_retransmission: DEFAULT_DO_RETRANSMISSION,
@ -1027,18 +1028,13 @@ impl BaseWebRTCSrc {
impl ElementImpl for BaseWebRTCSrc {
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
use crate::utils::is_raw_caps;
// Ignore specific raw caps from Codecs: they are covered by VIDEO_CAPS & AUDIO_CAPS
let mut video_caps_builder = gst::Caps::builder_full()
.structure_with_any_features(VIDEO_CAPS.structure(0).unwrap().to_owned())
.structure(RTP_CAPS.structure(0).unwrap().to_owned());
for codec in Codecs::video_codecs()
.iter()
.filter(|codec| !is_raw_caps(&codec.caps))
{
for codec in Codecs::video_codecs().filter(|codec| !codec.is_raw) {
video_caps_builder =
video_caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}
@ -1047,10 +1043,7 @@ impl ElementImpl for BaseWebRTCSrc {
.structure_with_any_features(AUDIO_CAPS.structure(0).unwrap().to_owned())
.structure(RTP_CAPS.structure(0).unwrap().to_owned());
for codec in Codecs::audio_codecs()
.iter()
.filter(|codec| !is_raw_caps(&codec.caps))
{
for codec in Codecs::audio_codecs().filter(|codec| !codec.is_raw) {
audio_caps_builder =
audio_caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}