diff --git a/Cargo.lock b/Cargo.lock index 5ba52f5b..a9d16b4a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3051,6 +3051,7 @@ dependencies = [ "gstreamer-webrtc", "http 1.1.0", "human_bytes", + "itertools 0.12.1", "livekit-api", "livekit-protocol", "once_cell", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index a8e855f5..f03a6f48 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -10183,7 +10183,7 @@ "kind": "object", "properties": { "audio-caps": { - "blurb": "Governs what audio codecs will be proposed", + "blurb": "Governs what audio codecs will be proposed. Valid values: [audio/x-opus; audio/x-raw, format=(string)S24BE, layout=(string)interleaved, rate=(int)[ 1, 2147483647 ], channels=(int)[ 1, 2147483647 ]; audio/x-raw, format=(string)S16BE, layout=(string)interleaved, rate=(int)[ 1, 2147483647 ], channels=(int)[ 1, 2147483647 ]; audio/x-raw, format=(string)U8, layout=(string)interleaved, rate=(int)[ 1, 2147483647 ], channels=(int)[ 1, 2147483647 ]]", "conditionally-available": false, "construct": false, "construct-only": false, @@ -10366,7 +10366,7 @@ "writable": true }, "video-caps": { - "blurb": "Governs what video codecs will be proposed", + "blurb": "Governs what video codecs will be proposed. Valid values: [video/x-vp8; video/x-h264; video/x-vp9; video/x-h265; video/x-av1; video/x-raw, format=(string){ RGB, RGBA, BGR, BGRA, AYUV, UYVY, I420, Y41B, UYVP }, width=(int)[ 1, 32767 ], height=(int)[ 1, 32767 ]]", "conditionally-available": false, "construct": false, "construct-only": false, @@ -10498,7 +10498,7 @@ "kind": "object", "properties": { "audio-codecs": { - "blurb": "Names of audio codecs to be be used during the SDP negotiation. Valid values: [OPUS]", + "blurb": "Names of audio codecs to be be used during the SDP negotiation. Valid values: [OPUS, L24, L16, L8]", "conditionally-available": false, "construct": false, "construct-only": false, @@ -10578,7 +10578,7 @@ "writable": true }, "video-codecs": { - "blurb": "Names of video codecs to be be used during the SDP negotiation. Valid values: [VP8, H264, VP9, H265, AV1]", + "blurb": "Names of video codecs to be be used during the SDP negotiation. Valid values: [VP8, H264, VP9, H265, AV1, RAW]", "conditionally-available": false, "construct": false, "construct-only": false, diff --git a/net/webrtc/Cargo.toml b/net/webrtc/Cargo.toml index 375524f6..2b6e2bca 100644 --- a/net/webrtc/Cargo.toml +++ b/net/webrtc/Cargo.toml @@ -25,6 +25,7 @@ anyhow = "1" chrono = "0.4" thiserror = "1" futures = "0.3" +itertools = "0.12" tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread", "time"] } tokio-native-tls = "0.3.0" tokio-stream = "0.1.11" diff --git a/net/webrtc/examples/README.md b/net/webrtc/examples/README.md index 9142dadf..589809c1 100644 --- a/net/webrtc/examples/README.md +++ b/net/webrtc/examples/README.md @@ -34,6 +34,13 @@ mode and an example based on RTSP instead of WebRTC. The examples can also be used for [RFC 7273] NTP or PTP clock signalling and synchronization. +Finally, raw payloads (e.g. L24 audio) can be negotiated. + +Note: you can have your host act as an NTP server, which can help the examples +with clock synchronization. For `chrony`, this can be configure by editing +`/etc/chrony.conf` and uncommenting / editing the `allow` entry. The examples +can then be launched with `--ntp-server _ip_address_`. + [RFC 6051]: https://datatracker.ietf.org/doc/html/rfc6051 [RFC 7273]: https://datatracker.ietf.org/doc/html/rfc7273 [Instantaneous RTP synchronization...]: https://coaxion.net/blog/2022/05/instantaneous-rtp-synchronization-retrieval-of-absolute-sender-clock-times-with-gstreamer/ @@ -44,7 +51,7 @@ The example uses the default WebRTC signaller. Launch it using the following command: ```shell -cargo run --bin gst-webrtc-signalling-server +cargo run --bin gst-webrtc-signalling-server --no-default-features ``` ### Receiver @@ -53,14 +60,14 @@ The receiver awaits for new audio & video stream publishers and render the streams using auto sink elements. Launch it using the following command: ```shell -cargo r --example webrtc-precise-sync-recv +cargo r --example webrtc-precise-sync-recv --no-default-features ``` The default configuration should work for a local test. For a multi-host setup, see the available options: ```shell -cargo r --example webrtc-precise-sync-recv -- --help +cargo r --example webrtc-precise-sync-recv --no-default-features -- --help ``` E.g.: the following will force `avdec_h264` over hardware decoders, activate @@ -70,7 +77,8 @@ specified address: ```shell GST_PLUGIN_FEATURE_RANK=avdec_h264:MAX \ WEBRTC_PRECISE_SYNC_RECV_LOG=debug \ -cargo r --example webrtc-precise-sync-recv -- --server 192.168.1.22 +cargo r --example webrtc-precise-sync-recv --no-default-features -- \ + --server 192.168.1.22 ``` ### Sender @@ -79,7 +87,7 @@ The sender publishes audio & video test streams. Launch it using the following command: ```shell -cargo r --example webrtc-precise-sync-send +cargo r --example webrtc-precise-sync-send --no-default-features ``` The default configuration should work for a local test. For a multi-host setup, @@ -87,7 +95,7 @@ to set the number of audio / video streams, to enable rapid synchronization or to force the video encoder, see the available options: ```shell -cargo r --example webrtc-precise-sync-send -- --help +cargo r --example webrtc-precise-sync-send --no-default-features -- --help ``` E.g.: the following will force H264 and `x264enc` over hardware encoders, @@ -97,7 +105,7 @@ specified address: ```shell GST_PLUGIN_FEATURE_RANK=264enc:MAX \ WEBRTC_PRECISE_SYNC_SEND_LOG=debug \ -cargo r --example webrtc-precise-sync-send -- \ +cargo r --example webrtc-precise-sync-send --no-default-features -- \ --server 192.168.1.22 --video-caps video/x-h264 ``` @@ -120,16 +128,49 @@ commands such as: #### Receiver ```shell -cargo r --example webrtc-precise-sync-recv -- --expect-clock-signalling +cargo r --example webrtc-precise-sync-recv --no-default-features -- \ + --expect-clock-signalling ``` #### Sender ```shell -cargo r --example webrtc-precise-sync-send -- --clock ntp --do-clock-signalling \ +cargo r --example webrtc-precise-sync-send --no-default-features -- \ + --clock ntp --do-clock-signalling \ --video-streams 0 --audio-streams 2 ``` +### Raw payload + +The sender can be instructed to send raw payloads. Note that raw payloads +are not activated by default and must be selected explicitly. + +This command will stream two stereo L24 streams: + +```shell +cargo r --example webrtc-precise-sync-send --no-default-features -- \ + --video-streams 0 \ + --audio-streams 2 --audio-codecs L24 +``` + +Launch the receiver with: + +```shell +cargo r --example webrtc-precise-sync-recv --no-default-features -- \ + --audio-codecs L24 +``` + +This can be used to stream multiple RAW video streams using specific CAPS for +the streams and allowing fallback to VP8 & OPUS if remote doesn't support raw +payloads: + +```shell +cargo r --example webrtc-precise-sync-send --no-default-features -- \ + --video-streams 2 --audio-streams 1 \ + --video-codecs RAW --video-codecs VP8 --video-caps video/x-raw,format=I420,width=400 \ + --audio-codecs L24 --audio-codecs OPUS --audio-caps audio/x-raw,rate=48000,channels=2 +``` + ## Android ### `webrtcsrc` based Android application diff --git a/net/webrtc/examples/webrtc-precise-sync-recv.rs b/net/webrtc/examples/webrtc-precise-sync-recv.rs index d787dbfe..7b08201c 100644 --- a/net/webrtc/examples/webrtc-precise-sync-recv.rs +++ b/net/webrtc/examples/webrtc-precise-sync-recv.rs @@ -43,6 +43,18 @@ struct Args { #[clap(long, help = "RTP jitterbuffer latency (ms)", default_value = "40")] pub rtp_latency: u32, + #[clap( + long, + help = "Force accepted audio codecs. See 'webrtcsrc' 'audio-codecs' property (ex. 'OPUS'). Accepts several occurrences." + )] + pub audio_codecs: Vec, + + #[clap( + long, + help = "Force accepted video codecs. See 'webrtcsrc' 'video-codecs' property (ex. 'VP8'). Accepts several occurrences." + )] + pub video_codecs: Vec, + #[clap(long, help = "Signalling server host", default_value = "localhost")] pub server: String, @@ -123,6 +135,14 @@ fn spawn_consumer( webrtcsrc.set_property("do-retransmission", false); } + if !args.audio_codecs.is_empty() { + webrtcsrc.set_property("audio-codecs", gst::Array::new(&args.audio_codecs)); + } + + if !args.video_codecs.is_empty() { + webrtcsrc.set_property("video-codecs", gst::Array::new(&args.video_codecs)); + } + bin.add(&webrtcsrc).context("Adding webrtcsrc")?; let signaller = webrtcsrc.property::("signaller"); diff --git a/net/webrtc/examples/webrtc-precise-sync-send.rs b/net/webrtc/examples/webrtc-precise-sync-send.rs index 863980ab..06d046ac 100644 --- a/net/webrtc/examples/webrtc-precise-sync-send.rs +++ b/net/webrtc/examples/webrtc-precise-sync-send.rs @@ -1,7 +1,9 @@ -use anyhow::{bail, Context}; +use anyhow::{anyhow, bail, Context}; use futures::prelude::*; use gst::prelude::*; use gst_rtp::prelude::*; +use gstrswebrtc::utils::Codecs; +use itertools::Itertools; use tracing::{debug, error, info}; use url::Url; @@ -45,7 +47,28 @@ struct Args { )] pub video_streams: usize, - #[clap(long, help = "Force video caps (ex. 'video/x-h264')")] + #[clap( + long, + help = "Audio codecs that will be proposed in the SDP (ex. 'L24', defaults to ['OPUS']). Accepts several occurrences." + )] + pub audio_codecs: Vec, + + #[clap( + long, + help = "Use specific audio caps (ex. 'audio/x-raw,rate=48000,channels=2')" + )] + pub audio_caps: Option, + + #[clap( + long, + help = "Video codecs that will be proposed in the SDP (ex. 'RAW', defaults to ['VP8', 'H264', 'VP9', 'H265']). Accepts several occurrences." + )] + pub video_codecs: Vec, + + #[clap( + long, + help = "Use specific video caps (ex. 'video/x-raw,format=I420,width=400')" + )] pub video_caps: Option, #[clap(long, help = "Use RFC 6051 64-bit NTP timestamp RTP header extension.")] @@ -135,6 +158,8 @@ impl App { } async fn prepare(&mut self) -> anyhow::Result<()> { + use std::str::FromStr; + debug!("Preparing"); self.pipeline = Some(gst::Pipeline::new()); @@ -217,6 +242,26 @@ impl App { }); } + if !self.args.audio_codecs.is_empty() { + let mut audio_caps = gst::Caps::new_empty(); + for codec in self.args.audio_codecs.iter() { + audio_caps.merge( + Codecs::audio_codecs() + .find(|c| &c.name == codec) + .ok_or_else(|| { + anyhow!( + "Unknown audio codec {codec}. Valid values are: {}", + Codecs::audio_codecs().map(|c| c.name.as_str()).join(", ") + ) + })? + .caps + .clone(), + ); + } + + webrtcsink.set_property("audio-caps", audio_caps); + } + for idx in 0..self.args.audio_streams { let audiosrc = gst::ElementFactory::make("audiotestsrc") .property("is-live", true) @@ -226,11 +271,65 @@ impl App { .context("Creating audiotestsrc")?; self.pipeline().add(&audiosrc).context("Adding audiosrc")?; - audiosrc - .link_pads(None, &webrtcsink, Some("audio_%u")) - .context("Linking audiosrc")?; + if let Some(ref caps) = self.args.audio_caps { + audiosrc + .link_pads_filtered( + None, + &webrtcsink, + Some("audio_%u"), + &gst::Caps::from_str(caps).context("Parsing audio caps")?, + ) + .context("Linking audiosrc")?; + } else { + audiosrc + .link_pads(None, &webrtcsink, Some("audio_%u")) + .context("Linking audiosrc")?; + } } + if !self.args.video_codecs.is_empty() { + let mut video_caps = gst::Caps::new_empty(); + for codec in self.args.video_codecs.iter() { + video_caps.merge( + Codecs::video_codecs() + .find(|c| &c.name == codec) + .ok_or_else(|| { + anyhow!( + "Unknown video codec {codec}. Valid values are: {}", + Codecs::video_codecs().map(|c| c.name.as_str()).join(", ") + ) + })? + .caps + .clone(), + ); + } + + webrtcsink.set_property("video-caps", video_caps); + } + + let raw_video_caps = { + let mut raw_video_caps = if let Some(ref video_caps) = self.args.video_caps { + gst::Caps::from_str(video_caps).context("Parsing video caps")? + } else { + gst::Caps::new_empty_simple("video/x-raw") + }; + + // If no width / height are specified, set something big enough + let caps_mut = raw_video_caps.make_mut(); + let s = caps_mut.structure_mut(0).expect("created above"); + match (s.get::("width").ok(), s.get::("height").ok()) { + (None, None) => { + s.set("width", 800i32); + s.set("height", 600i32); + } + (Some(width), None) => s.set("height", 3 * width / 4), + (None, Some(height)) => s.set("width", 4 * height / 3), + _ => (), + } + + raw_video_caps + }; + for idx in 0..self.args.video_streams { let videosrc = gst::ElementFactory::make("videotestsrc") .property("is-live", true) @@ -247,13 +346,7 @@ impl App { .expect("adding video elements"); videosrc - .link_filtered( - &video_overlay, - &gst::Caps::builder("video/x-raw") - .field("width", 800i32) - .field("height", 600i32) - .build(), - ) + .link_filtered(&video_overlay, &raw_video_caps) .context("Linking videosrc to timeoverlay")?; video_overlay @@ -261,10 +354,6 @@ impl App { .context("Linking video overlay")?; } - if let Some(ref video_caps) = self.args.video_caps { - webrtcsink.set_property("video-caps", &gst::Caps::builder(video_caps).build()); - } - Ok(()) } diff --git a/net/webrtc/src/utils.rs b/net/webrtc/src/utils.rs index ff1cbe79..e95a0929 100644 --- a/net/webrtc/src/utils.rs +++ b/net/webrtc/src/utils.rs @@ -427,7 +427,7 @@ impl Clone for DecodingInfo { #[derive(Clone, Debug)] struct EncodingInfo { - encoder: gst::ElementFactory, + encoder: Option, payloader: gst::ElementFactory, output_filter: Option, } @@ -437,6 +437,7 @@ pub struct Codec { pub name: String, pub caps: gst::Caps, pub stream_type: gst::StreamType, + pub is_raw: bool, payload_type: Option, decoding_info: Option, @@ -449,16 +450,27 @@ impl Codec { stream_type: gst::StreamType, caps: &gst::Caps, decoders: &glib::List, + depayloaders: &glib::List, encoders: &glib::List, payloaders: &glib::List, ) -> Self { let has_decoder = Self::has_decoder_for_caps(caps, decoders); + let has_depayloader = Self::has_depayloader_for_codec(name, depayloaders); + + let decoding_info = if has_depayloader && has_decoder { + Some(DecodingInfo { + has_decoder: AtomicBool::new(has_decoder), + }) + } else { + None + }; + let encoder = Self::get_encoder_for_caps(caps, encoders); let payloader = Self::get_payloader_for_codec(name, payloaders); let encoding_info = if let (Some(encoder), Some(payloader)) = (encoder, payloader) { Some(EncodingInfo { - encoder, + encoder: Some(encoder), payloader, output_filter: None, }) @@ -470,12 +482,52 @@ impl Codec { caps: caps.clone(), stream_type, name: name.into(), + is_raw: false, payload_type: None, + decoding_info, + encoding_info, + } + } - decoding_info: Some(DecodingInfo { - has_decoder: AtomicBool::new(has_decoder), - }), + pub fn new_raw( + name: &str, + stream_type: gst::StreamType, + depayloaders: &glib::List, + payloaders: &glib::List, + ) -> Self { + let decoding_info = if Self::has_depayloader_for_codec(name, depayloaders) { + Some(DecodingInfo { + has_decoder: AtomicBool::new(false), + }) + } else { + None + }; + let payloader = Self::get_payloader_for_codec(name, payloaders); + let encoding_info = payloader.map(|payloader| EncodingInfo { + encoder: None, + payloader, + output_filter: None, + }); + + let mut caps = None; + if let Some(elem) = Codec::get_payloader_for_codec(name, payloaders) { + if let Some(tmpl) = elem + .static_pad_templates() + .iter() + .find(|template| template.direction() == gst::PadDirection::Sink) + { + caps = Some(tmpl.caps()); + } + } + + Self { + caps: caps.unwrap_or_else(gst::Caps::new_empty), + stream_type, + name: name.into(), + is_raw: true, + payload_type: None, + decoding_info, encoding_info, } } @@ -488,33 +540,15 @@ impl Codec { self.payload_type = Some(pt); } - pub fn new_decoding( - name: &str, - stream_type: gst::StreamType, - caps: &gst::Caps, - decoders: &glib::List, - ) -> Self { - let has_decoder = Self::has_decoder_for_caps(caps, decoders); - - Self { - caps: caps.clone(), - stream_type, - name: name.into(), - payload_type: None, - - decoding_info: Some(DecodingInfo { - has_decoder: AtomicBool::new(has_decoder), - }), - - encoding_info: None, - } - } - - pub fn has_decoder(&self) -> bool { + pub fn can_be_received(&self) -> bool { if self.decoding_info.is_none() { return false; } + if self.is_raw { + return true; + } + let decoder_info = self.decoding_info.as_ref().unwrap(); if decoder_info.has_decoder.load(Ordering::SeqCst) { true @@ -599,6 +633,40 @@ impl Codec { }) } + fn has_depayloader_for_codec( + codec: &str, + depayloaders: &glib::List, + ) -> bool { + depayloaders.iter().any(|factory| { + factory.static_pad_templates().iter().any(|template| { + let template_caps = template.caps(); + + if template.direction() != gst::PadDirection::Sink { + return false; + } + + template_caps.iter().any(|s| { + s.has_field("encoding-name") + && s.get::("encoding-name").map_or_else( + |_| { + if let Ok(encoding_name) = s.get::<&str>("encoding-name") { + encoding_name == codec + } else { + false + } + }, + |encoding_names| { + encoding_names.iter().any(|v| { + v.get::<&str>() + .map_or(false, |encoding_name| encoding_name == codec) + }) + }, + ) + }) + }) + }) + } + pub fn is_video(&self) -> bool { matches!(self.stream_type, gst::StreamType::VIDEO) } @@ -608,11 +676,13 @@ impl Codec { } pub fn build_encoder(&self) -> Option> { - self.encoding_info.as_ref().map(|info| { - info.encoder - .create() - .build() - .with_context(|| format!("Creating encoder {}", info.encoder.name())) + self.encoding_info.as_ref().and_then(|info| { + info.encoder.as_ref().map(|encoder| { + encoder + .create() + .build() + .with_context(|| format!("Creating encoder {}", encoder.name())) + }) }) } @@ -655,13 +725,17 @@ impl Codec { } pub fn encoder_factory(&self) -> Option { - self.encoding_info.as_ref().map(|info| info.encoder.clone()) + self.encoding_info + .as_ref() + .and_then(|info| info.encoder.clone()) } pub fn encoder_name(&self) -> Option { - self.encoding_info - .as_ref() - .map(|info| info.encoder.name().to_string()) + self.encoding_info.as_ref().and_then(|info| { + info.encoder + .as_ref() + .map(|encoder| encoder.name().to_string()) + }) } pub fn set_output_filter(&mut self, caps: gst::Caps) { @@ -753,7 +827,7 @@ impl Codecs { Self(codecs.values().cloned().collect()) } - pub fn find_for_encoded_caps(&self, caps: &gst::Caps) -> Option { + pub fn find_for_payloadable_caps(&self, caps: &gst::Caps) -> Option { self.iter() .find(|codec| codec.caps.can_intersect(caps) && codec.encoding_info.is_some()) .cloned() @@ -766,6 +840,11 @@ static CODECS: Lazy = Lazy::new(|| { gst::Rank::MARGINAL, ); + let depayloaders = gst::ElementFactory::factories_with_type( + gst::ElementFactoryType::DEPAYLOADER, + gst::Rank::MARGINAL, + ); + let encoders = gst::ElementFactory::factories_with_type( gst::ElementFactoryType::ENCODER, gst::Rank::MARGINAL, @@ -782,14 +861,19 @@ static CODECS: Lazy = Lazy::new(|| { gst::StreamType::AUDIO, &OPUS_CAPS, &decoders, + &depayloaders, &encoders, &payloaders, ), + Codec::new_raw("L24", gst::StreamType::AUDIO, &depayloaders, &payloaders), + Codec::new_raw("L16", gst::StreamType::AUDIO, &depayloaders, &payloaders), + Codec::new_raw("L8", gst::StreamType::AUDIO, &depayloaders, &payloaders), Codec::new( "VP8", gst::StreamType::VIDEO, &VP8_CAPS, &decoders, + &depayloaders, &encoders, &payloaders, ), @@ -798,6 +882,7 @@ static CODECS: Lazy = Lazy::new(|| { gst::StreamType::VIDEO, &H264_CAPS, &decoders, + &depayloaders, &encoders, &payloaders, ), @@ -806,6 +891,7 @@ static CODECS: Lazy = Lazy::new(|| { gst::StreamType::VIDEO, &VP9_CAPS, &decoders, + &depayloaders, &encoders, &payloaders, ), @@ -814,6 +900,7 @@ static CODECS: Lazy = Lazy::new(|| { gst::StreamType::VIDEO, &H265_CAPS, &decoders, + &depayloaders, &encoders, &payloaders, ), @@ -822,9 +909,11 @@ static CODECS: Lazy = Lazy::new(|| { gst::StreamType::VIDEO, &AV1_CAPS, &decoders, + &depayloaders, &encoders, &payloaders, ), + Codec::new_raw("RAW", gst::StreamType::VIDEO, &depayloaders, &payloaders), ]) }); @@ -836,36 +925,16 @@ impl Codecs { .cloned() } - pub fn video_codecs() -> Vec { + pub fn video_codecs<'a>() -> impl Iterator { CODECS .iter() .filter(|codec| codec.stream_type == gst::StreamType::VIDEO) - .cloned() - .collect() } - pub fn audio_codecs() -> Vec { + pub fn audio_codecs<'a>() -> impl Iterator { CODECS .iter() .filter(|codec| codec.stream_type == gst::StreamType::AUDIO) - .cloned() - .collect() - } - - pub fn video_codec_names() -> Vec { - CODECS - .iter() - .filter(|codec| codec.stream_type == gst::StreamType::VIDEO) - .map(|codec| codec.name.clone()) - .collect() - } - - pub fn audio_codec_names() -> Vec { - CODECS - .iter() - .filter(|codec| codec.stream_type == gst::StreamType::AUDIO) - .map(|codec| codec.name.clone()) - .collect() } /// List all codecs that can be used for encoding the given caps and assign @@ -909,9 +978,9 @@ impl Codecs { } } -pub fn is_raw_caps(caps: &gst::Caps) -> bool { - assert!(caps.is_fixed()); - ["video/x-raw", "audio/x-raw"].contains(&caps.structure(0).unwrap().name().as_str()) +pub fn has_raw_caps(caps: &gst::Caps) -> bool { + caps.iter() + .any(|s| ["video/x-raw", "audio/x-raw"].contains(&s.name().as_str())) } pub fn cleanup_codec_caps(mut caps: gst::Caps) -> gst::Caps { diff --git a/net/webrtc/src/webrtcsink/imp.rs b/net/webrtc/src/webrtcsink/imp.rs index 3dd0d5cc..4dbf71e9 100644 --- a/net/webrtc/src/webrtcsink/imp.rs +++ b/net/webrtc/src/webrtcsink/imp.rs @@ -1,6 +1,8 @@ // SPDX-License-Identifier: MPL-2.0 -use crate::utils::{cleanup_codec_caps, is_raw_caps, make_element, Codec, Codecs, NavigationEvent}; +use crate::utils::{ + cleanup_codec_caps, has_raw_caps, make_element, Codec, Codecs, NavigationEvent, +}; use anyhow::Context; use gst::glib; use gst::prelude::*; @@ -13,6 +15,7 @@ use gst_webrtc::{WebRTCDataChannel, WebRTCICETransportPolicy}; use futures::prelude::*; use anyhow::{anyhow, Error}; +use itertools::Itertools; use once_cell::sync::Lazy; use std::collections::HashMap; @@ -513,12 +516,12 @@ impl Default for Settings { Self { video_caps: Codecs::video_codecs() - .into_iter() - .flat_map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::>()) + .filter(|codec| !codec.is_raw) + .flat_map(|codec| codec.caps.iter().map(ToOwned::to_owned)) .collect::(), audio_caps: Codecs::audio_codecs() - .into_iter() - .flat_map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::>()) + .filter(|codec| !codec.is_raw) + .flat_map(|codec| codec.caps.iter().map(ToOwned::to_owned)) .collect::(), stun_server: DEFAULT_STUN_SERVER.map(String::from), turn_servers: gst::Array::new(Vec::new() as Vec), @@ -884,7 +887,12 @@ impl PayloadChainBuilder { codec = self.codec, ); - let needs_encoding = is_raw_caps(&self.input_caps); + let needs_encoding = if self.codec.is_raw { + !self.codec.caps.can_intersect(&self.input_caps) + } else { + has_raw_caps(&self.input_caps) + }; + let mut elements: Vec = Vec::new(); let (raw_filter, encoder) = if needs_encoding { @@ -898,14 +906,20 @@ impl PayloadChainBuilder { let raw_filter = self.codec.raw_converter_filter()?; elements.push(raw_filter.clone()); - let encoder = self - .codec - .build_encoder() - .expect("We should always have an encoder for negotiated codecs")?; - elements.push(encoder.clone()); - elements.push(make_element("capsfilter", None)?); + let encoder = if self.codec.is_raw { + None + } else { + let encoder = self + .codec + .build_encoder() + .expect("We should always have an encoder for negotiated codecs")?; + elements.push(encoder.clone()); + elements.push(make_element("capsfilter", None)?); - (Some(raw_filter), Some(encoder)) + Some(encoder) + }; + + (Some(raw_filter), encoder) } else { (None, None) }; @@ -3431,7 +3445,7 @@ impl BaseWebRTCSink { ) -> Result { let pipe = PipelineWrapper(gst::Pipeline::default()); - let has_raw_input = is_raw_caps(&input_caps); + let has_raw_input = has_raw_caps(&input_caps); let src = discovery_info.create_src(); let mut elements = vec![src.clone().upcast::()]; let encoding_chain_src = if codec.is_video() && has_raw_input { @@ -3616,29 +3630,7 @@ impl BaseWebRTCSink { output_caps: gst::Caps, codecs: &Codecs, ) -> Result<(), Error> { - let futs = if let Some(codec) = codecs.find_for_encoded_caps(&discovery_info.caps) { - let mut caps = discovery_info.caps.clone(); - - gst::info!( - CAT, - imp = self, - "Stream is already encoded with codec {}, still need to payload it", - codec.name - ); - - caps = cleanup_codec_caps(caps); - - vec![self.run_discovery_pipeline( - &name, - &discovery_info, - codec, - caps, - &output_caps, - ExtensionConfigurationType::Auto, - )] - } else { - let sink_caps = discovery_info.caps.clone(); - + let futs = if has_raw_caps(&discovery_info.caps) { if codecs.is_empty() { return Err(anyhow!( "No codec available for encoding stream {}, \ @@ -3648,10 +3640,12 @@ impl BaseWebRTCSink { )); } + let sink_caps = discovery_info.caps.clone(); + let is_video = match sink_caps.structure(0).unwrap().name().as_str() { "video/x-raw" => true, "audio/x-raw" => false, - _ => anyhow::bail!("Unsupported caps: {}", discovery_info.caps), + _ => anyhow::bail!("expected audio or video raw caps: {sink_caps}"), }; codecs @@ -3668,6 +3662,28 @@ impl BaseWebRTCSink { ) }) .collect() + } else if let Some(codec) = codecs.find_for_payloadable_caps(&discovery_info.caps) { + let mut caps = discovery_info.caps.clone(); + + gst::info!( + CAT, + imp = self, + "Stream is already in the {} format, we still need to payload it", + codec.name + ); + + caps = cleanup_codec_caps(caps); + + vec![self.run_discovery_pipeline( + &name, + &discovery_info, + codec, + caps, + &output_caps, + ExtensionConfigurationType::Auto, + )] + } else { + anyhow::bail!("Unsupported caps: {}", discovery_info.caps); }; let mut payloader_caps = gst::Caps::new_empty(); @@ -3949,12 +3965,20 @@ impl ObjectImpl for BaseWebRTCSink { vec![ glib::ParamSpecBoxed::builder::("video-caps") .nick("Video encoder caps") - .blurb("Governs what video codecs will be proposed") + .blurb(&format!("Governs what video codecs will be proposed. Valid values: [{}]", + Codecs::video_codecs() + .map(|c| c.caps.to_string()) + .join("; ") + )) .mutable_ready() .build(), glib::ParamSpecBoxed::builder::("audio-caps") .nick("Audio encoder caps") - .blurb("Governs what audio codecs will be proposed") + .blurb(&format!("Governs what audio codecs will be proposed. Valid values: [{}]", + Codecs::audio_codecs() + .map(|c| c.caps.to_string()) + .join("; ") + )) .mutable_ready() .build(), glib::ParamSpecString::builder("stun-server") @@ -4397,7 +4421,8 @@ impl ElementImpl for BaseWebRTCSink { gst::CapsFeatures::new([D3D11_MEMORY_FEATURE]), ); - for codec in Codecs::video_codecs() { + // Ignore specific raw caps from Codecs: they are covered by video/x-raw & audio/x-raw + for codec in Codecs::video_codecs().filter(|codec| !codec.is_raw) { caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned()); } @@ -4412,7 +4437,7 @@ impl ElementImpl for BaseWebRTCSink { let mut caps_builder = gst::Caps::builder_full().structure(gst::Structure::builder("audio/x-raw").build()); - for codec in Codecs::audio_codecs() { + for codec in Codecs::audio_codecs().filter(|codec| !codec.is_raw) { caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned()); } let audio_pad_template = gst::PadTemplate::with_gtype( diff --git a/net/webrtc/src/webrtcsrc/imp.rs b/net/webrtc/src/webrtcsrc/imp.rs index 419b9b77..a28354c4 100644 --- a/net/webrtc/src/webrtcsrc/imp.rs +++ b/net/webrtc/src/webrtcsrc/imp.rs @@ -9,6 +9,7 @@ use anyhow::{Context, Error}; use gst::glib; use gst::subclass::prelude::*; use gst_webrtc::WebRTCDataChannel; +use itertools::Itertools; use once_cell::sync::Lazy; use std::borrow::BorrowMut; use std::collections::{HashMap, HashSet}; @@ -94,14 +95,18 @@ impl ObjectImpl for BaseWebRTCSrc { gst::ParamSpecArray::builder("video-codecs") .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) .blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]", - Codecs::video_codec_names().into_iter().collect::>().join(", ") + Codecs::video_codecs() + .map(|c| c.name.as_str()) + .join(", ") )) .element_spec(&glib::ParamSpecString::builder("video-codec-name").build()) .build(), gst::ParamSpecArray::builder("audio-codecs") .flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY) .blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]", - Codecs::audio_codec_names().into_iter().collect::>().join(", ") + Codecs::audio_codecs() + .map(|c| c.name.as_str()) + .join(", ") )) .element_spec(&glib::ParamSpecString::builder("audio-codec-name").build()) .build(), @@ -271,12 +276,12 @@ impl Default for Settings { signaller: signaller.upcast(), meta: Default::default(), audio_codecs: Codecs::audio_codecs() - .into_iter() - .filter(|codec| codec.has_decoder()) + .filter(|codec| !codec.is_raw) + .cloned() .collect(), video_codecs: Codecs::video_codecs() - .into_iter() - .filter(|codec| codec.has_decoder()) + .filter(|codec| !codec.is_raw) + .cloned() .collect(), enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION, do_retransmission: DEFAULT_DO_RETRANSMISSION, @@ -630,7 +635,13 @@ impl Session { } srcpad.set_target(Some(&ghostpad)).unwrap(); } else { - gst::debug!(CAT, obj = element, "Unused webrtcbin pad {webrtcbin_pad:?}"); + gst::debug!( + CAT, + obj = element, + "Unused webrtcbin pad {} {:?}", + webrtcbin_pad.name(), + webrtcbin_pad.current_caps(), + ); } ghostpad } @@ -1326,11 +1337,13 @@ impl BaseWebRTCSrc { impl ElementImpl for BaseWebRTCSrc { fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + // Ignore specific raw caps from Codecs: they are covered by VIDEO_CAPS & AUDIO_CAPS + let mut video_caps_builder = gst::Caps::builder_full() .structure_with_any_features(VIDEO_CAPS.structure(0).unwrap().to_owned()) .structure(RTP_CAPS.structure(0).unwrap().to_owned()); - for codec in Codecs::video_codecs() { + for codec in Codecs::video_codecs().filter(|codec| !codec.is_raw) { video_caps_builder = video_caps_builder.structure(codec.caps.structure(0).unwrap().to_owned()); } @@ -1339,7 +1352,7 @@ impl ElementImpl for BaseWebRTCSrc { .structure_with_any_features(AUDIO_CAPS.structure(0).unwrap().to_owned()) .structure(RTP_CAPS.structure(0).unwrap().to_owned()); - for codec in Codecs::audio_codecs() { + for codec in Codecs::audio_codecs().filter(|codec| !codec.is_raw) { audio_caps_builder = audio_caps_builder.structure(codec.caps.structure(0).unwrap().to_owned()); }