webrtc: add raw payload support

This commit adds support for raw payloads such as L24 audio to `webrtcsink` &
`webrtcsrc`.

Most changes take place within the `Codec` helper structure:

* A `Codec` can now advertise a depayloader. This also ensures that a format
  not only can be decoded when necessary, but it can also be depayloaded in the
  first place.
* It is possible to declare raw `Codec`s, meaning that their caps are compatible
  with a payloader and a depayloader without the need for an encoder and decoder.
* Previous accessor `has_decoder` was renamed as `can_be_received` to account
  for codecs which can be handled by an available depayloader with or without
  the need for a decoder.
* New codecs were added for the following formats:
  * L24, L16, L8 audio.
  * RAW video.

The `webrtc-precise-sync` examples were updated to demonstrate streaming of raw
audio or video.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1501>
This commit is contained in:
François Laignel 2024-07-05 17:38:30 +02:00 committed by GStreamer Marge Bot
parent 9c84748fc3
commit 34b791ff5e
9 changed files with 402 additions and 143 deletions

1
Cargo.lock generated
View file

@ -3051,6 +3051,7 @@ dependencies = [
"gstreamer-webrtc",
"http 1.1.0",
"human_bytes",
"itertools 0.12.1",
"livekit-api",
"livekit-protocol",
"once_cell",

View file

@ -10183,7 +10183,7 @@
"kind": "object",
"properties": {
"audio-caps": {
"blurb": "Governs what audio codecs will be proposed",
"blurb": "Governs what audio codecs will be proposed. Valid values: [audio/x-opus; audio/x-raw, format=(string)S24BE, layout=(string)interleaved, rate=(int)[ 1, 2147483647 ], channels=(int)[ 1, 2147483647 ]; audio/x-raw, format=(string)S16BE, layout=(string)interleaved, rate=(int)[ 1, 2147483647 ], channels=(int)[ 1, 2147483647 ]; audio/x-raw, format=(string)U8, layout=(string)interleaved, rate=(int)[ 1, 2147483647 ], channels=(int)[ 1, 2147483647 ]]",
"conditionally-available": false,
"construct": false,
"construct-only": false,
@ -10366,7 +10366,7 @@
"writable": true
},
"video-caps": {
"blurb": "Governs what video codecs will be proposed",
"blurb": "Governs what video codecs will be proposed. Valid values: [video/x-vp8; video/x-h264; video/x-vp9; video/x-h265; video/x-av1; video/x-raw, format=(string){ RGB, RGBA, BGR, BGRA, AYUV, UYVY, I420, Y41B, UYVP }, width=(int)[ 1, 32767 ], height=(int)[ 1, 32767 ]]",
"conditionally-available": false,
"construct": false,
"construct-only": false,
@ -10498,7 +10498,7 @@
"kind": "object",
"properties": {
"audio-codecs": {
"blurb": "Names of audio codecs to be be used during the SDP negotiation. Valid values: [OPUS]",
"blurb": "Names of audio codecs to be be used during the SDP negotiation. Valid values: [OPUS, L24, L16, L8]",
"conditionally-available": false,
"construct": false,
"construct-only": false,
@ -10578,7 +10578,7 @@
"writable": true
},
"video-codecs": {
"blurb": "Names of video codecs to be be used during the SDP negotiation. Valid values: [VP8, H264, VP9, H265, AV1]",
"blurb": "Names of video codecs to be be used during the SDP negotiation. Valid values: [VP8, H264, VP9, H265, AV1, RAW]",
"conditionally-available": false,
"construct": false,
"construct-only": false,

View file

@ -25,6 +25,7 @@ anyhow = "1"
chrono = "0.4"
thiserror = "1"
futures = "0.3"
itertools = "0.12"
tokio = { version = "1", features = ["fs", "macros", "rt-multi-thread", "time"] }
tokio-native-tls = "0.3.0"
tokio-stream = "0.1.11"

View file

@ -34,6 +34,13 @@ mode and an example based on RTSP instead of WebRTC.
The examples can also be used for [RFC 7273] NTP or PTP clock signalling and
synchronization.
Finally, raw payloads (e.g. L24 audio) can be negotiated.
Note: you can have your host act as an NTP server, which can help the examples
with clock synchronization. For `chrony`, this can be configure by editing
`/etc/chrony.conf` and uncommenting / editing the `allow` entry. The examples
can then be launched with `--ntp-server _ip_address_`.
[RFC 6051]: https://datatracker.ietf.org/doc/html/rfc6051
[RFC 7273]: https://datatracker.ietf.org/doc/html/rfc7273
[Instantaneous RTP synchronization...]: https://coaxion.net/blog/2022/05/instantaneous-rtp-synchronization-retrieval-of-absolute-sender-clock-times-with-gstreamer/
@ -44,7 +51,7 @@ The example uses the default WebRTC signaller. Launch it using the following
command:
```shell
cargo run --bin gst-webrtc-signalling-server
cargo run --bin gst-webrtc-signalling-server --no-default-features
```
### Receiver
@ -53,14 +60,14 @@ The receiver awaits for new audio & video stream publishers and render the
streams using auto sink elements. Launch it using the following command:
```shell
cargo r --example webrtc-precise-sync-recv
cargo r --example webrtc-precise-sync-recv --no-default-features
```
The default configuration should work for a local test. For a multi-host setup,
see the available options:
```shell
cargo r --example webrtc-precise-sync-recv -- --help
cargo r --example webrtc-precise-sync-recv --no-default-features -- --help
```
E.g.: the following will force `avdec_h264` over hardware decoders, activate
@ -70,7 +77,8 @@ specified address:
```shell
GST_PLUGIN_FEATURE_RANK=avdec_h264:MAX \
WEBRTC_PRECISE_SYNC_RECV_LOG=debug \
cargo r --example webrtc-precise-sync-recv -- --server 192.168.1.22
cargo r --example webrtc-precise-sync-recv --no-default-features -- \
--server 192.168.1.22
```
### Sender
@ -79,7 +87,7 @@ The sender publishes audio & video test streams. Launch it using the following
command:
```shell
cargo r --example webrtc-precise-sync-send
cargo r --example webrtc-precise-sync-send --no-default-features
```
The default configuration should work for a local test. For a multi-host setup,
@ -87,7 +95,7 @@ to set the number of audio / video streams, to enable rapid synchronization or
to force the video encoder, see the available options:
```shell
cargo r --example webrtc-precise-sync-send -- --help
cargo r --example webrtc-precise-sync-send --no-default-features -- --help
```
E.g.: the following will force H264 and `x264enc` over hardware encoders,
@ -97,7 +105,7 @@ specified address:
```shell
GST_PLUGIN_FEATURE_RANK=264enc:MAX \
WEBRTC_PRECISE_SYNC_SEND_LOG=debug \
cargo r --example webrtc-precise-sync-send -- \
cargo r --example webrtc-precise-sync-send --no-default-features -- \
--server 192.168.1.22 --video-caps video/x-h264
```
@ -120,16 +128,49 @@ commands such as:
#### Receiver
```shell
cargo r --example webrtc-precise-sync-recv -- --expect-clock-signalling
cargo r --example webrtc-precise-sync-recv --no-default-features -- \
--expect-clock-signalling
```
#### Sender
```shell
cargo r --example webrtc-precise-sync-send -- --clock ntp --do-clock-signalling \
cargo r --example webrtc-precise-sync-send --no-default-features -- \
--clock ntp --do-clock-signalling \
--video-streams 0 --audio-streams 2
```
### Raw payload
The sender can be instructed to send raw payloads. Note that raw payloads
are not activated by default and must be selected explicitly.
This command will stream two stereo L24 streams:
```shell
cargo r --example webrtc-precise-sync-send --no-default-features -- \
--video-streams 0 \
--audio-streams 2 --audio-codecs L24
```
Launch the receiver with:
```shell
cargo r --example webrtc-precise-sync-recv --no-default-features -- \
--audio-codecs L24
```
This can be used to stream multiple RAW video streams using specific CAPS for
the streams and allowing fallback to VP8 & OPUS if remote doesn't support raw
payloads:
```shell
cargo r --example webrtc-precise-sync-send --no-default-features -- \
--video-streams 2 --audio-streams 1 \
--video-codecs RAW --video-codecs VP8 --video-caps video/x-raw,format=I420,width=400 \
--audio-codecs L24 --audio-codecs OPUS --audio-caps audio/x-raw,rate=48000,channels=2
```
## Android
### `webrtcsrc` based Android application

View file

@ -43,6 +43,18 @@ struct Args {
#[clap(long, help = "RTP jitterbuffer latency (ms)", default_value = "40")]
pub rtp_latency: u32,
#[clap(
long,
help = "Force accepted audio codecs. See 'webrtcsrc' 'audio-codecs' property (ex. 'OPUS'). Accepts several occurrences."
)]
pub audio_codecs: Vec<String>,
#[clap(
long,
help = "Force accepted video codecs. See 'webrtcsrc' 'video-codecs' property (ex. 'VP8'). Accepts several occurrences."
)]
pub video_codecs: Vec<String>,
#[clap(long, help = "Signalling server host", default_value = "localhost")]
pub server: String,
@ -123,6 +135,14 @@ fn spawn_consumer(
webrtcsrc.set_property("do-retransmission", false);
}
if !args.audio_codecs.is_empty() {
webrtcsrc.set_property("audio-codecs", gst::Array::new(&args.audio_codecs));
}
if !args.video_codecs.is_empty() {
webrtcsrc.set_property("video-codecs", gst::Array::new(&args.video_codecs));
}
bin.add(&webrtcsrc).context("Adding webrtcsrc")?;
let signaller = webrtcsrc.property::<gst::glib::Object>("signaller");

View file

@ -1,7 +1,9 @@
use anyhow::{bail, Context};
use anyhow::{anyhow, bail, Context};
use futures::prelude::*;
use gst::prelude::*;
use gst_rtp::prelude::*;
use gstrswebrtc::utils::Codecs;
use itertools::Itertools;
use tracing::{debug, error, info};
use url::Url;
@ -45,7 +47,28 @@ struct Args {
)]
pub video_streams: usize,
#[clap(long, help = "Force video caps (ex. 'video/x-h264')")]
#[clap(
long,
help = "Audio codecs that will be proposed in the SDP (ex. 'L24', defaults to ['OPUS']). Accepts several occurrences."
)]
pub audio_codecs: Vec<String>,
#[clap(
long,
help = "Use specific audio caps (ex. 'audio/x-raw,rate=48000,channels=2')"
)]
pub audio_caps: Option<String>,
#[clap(
long,
help = "Video codecs that will be proposed in the SDP (ex. 'RAW', defaults to ['VP8', 'H264', 'VP9', 'H265']). Accepts several occurrences."
)]
pub video_codecs: Vec<String>,
#[clap(
long,
help = "Use specific video caps (ex. 'video/x-raw,format=I420,width=400')"
)]
pub video_caps: Option<String>,
#[clap(long, help = "Use RFC 6051 64-bit NTP timestamp RTP header extension.")]
@ -135,6 +158,8 @@ impl App {
}
async fn prepare(&mut self) -> anyhow::Result<()> {
use std::str::FromStr;
debug!("Preparing");
self.pipeline = Some(gst::Pipeline::new());
@ -217,6 +242,26 @@ impl App {
});
}
if !self.args.audio_codecs.is_empty() {
let mut audio_caps = gst::Caps::new_empty();
for codec in self.args.audio_codecs.iter() {
audio_caps.merge(
Codecs::audio_codecs()
.find(|c| &c.name == codec)
.ok_or_else(|| {
anyhow!(
"Unknown audio codec {codec}. Valid values are: {}",
Codecs::audio_codecs().map(|c| c.name.as_str()).join(", ")
)
})?
.caps
.clone(),
);
}
webrtcsink.set_property("audio-caps", audio_caps);
}
for idx in 0..self.args.audio_streams {
let audiosrc = gst::ElementFactory::make("audiotestsrc")
.property("is-live", true)
@ -226,11 +271,65 @@ impl App {
.context("Creating audiotestsrc")?;
self.pipeline().add(&audiosrc).context("Adding audiosrc")?;
audiosrc
.link_pads(None, &webrtcsink, Some("audio_%u"))
.context("Linking audiosrc")?;
if let Some(ref caps) = self.args.audio_caps {
audiosrc
.link_pads_filtered(
None,
&webrtcsink,
Some("audio_%u"),
&gst::Caps::from_str(caps).context("Parsing audio caps")?,
)
.context("Linking audiosrc")?;
} else {
audiosrc
.link_pads(None, &webrtcsink, Some("audio_%u"))
.context("Linking audiosrc")?;
}
}
if !self.args.video_codecs.is_empty() {
let mut video_caps = gst::Caps::new_empty();
for codec in self.args.video_codecs.iter() {
video_caps.merge(
Codecs::video_codecs()
.find(|c| &c.name == codec)
.ok_or_else(|| {
anyhow!(
"Unknown video codec {codec}. Valid values are: {}",
Codecs::video_codecs().map(|c| c.name.as_str()).join(", ")
)
})?
.caps
.clone(),
);
}
webrtcsink.set_property("video-caps", video_caps);
}
let raw_video_caps = {
let mut raw_video_caps = if let Some(ref video_caps) = self.args.video_caps {
gst::Caps::from_str(video_caps).context("Parsing video caps")?
} else {
gst::Caps::new_empty_simple("video/x-raw")
};
// If no width / height are specified, set something big enough
let caps_mut = raw_video_caps.make_mut();
let s = caps_mut.structure_mut(0).expect("created above");
match (s.get::<i32>("width").ok(), s.get::<i32>("height").ok()) {
(None, None) => {
s.set("width", 800i32);
s.set("height", 600i32);
}
(Some(width), None) => s.set("height", 3 * width / 4),
(None, Some(height)) => s.set("width", 4 * height / 3),
_ => (),
}
raw_video_caps
};
for idx in 0..self.args.video_streams {
let videosrc = gst::ElementFactory::make("videotestsrc")
.property("is-live", true)
@ -247,13 +346,7 @@ impl App {
.expect("adding video elements");
videosrc
.link_filtered(
&video_overlay,
&gst::Caps::builder("video/x-raw")
.field("width", 800i32)
.field("height", 600i32)
.build(),
)
.link_filtered(&video_overlay, &raw_video_caps)
.context("Linking videosrc to timeoverlay")?;
video_overlay
@ -261,10 +354,6 @@ impl App {
.context("Linking video overlay")?;
}
if let Some(ref video_caps) = self.args.video_caps {
webrtcsink.set_property("video-caps", &gst::Caps::builder(video_caps).build());
}
Ok(())
}

View file

@ -427,7 +427,7 @@ impl Clone for DecodingInfo {
#[derive(Clone, Debug)]
struct EncodingInfo {
encoder: gst::ElementFactory,
encoder: Option<gst::ElementFactory>,
payloader: gst::ElementFactory,
output_filter: Option<gst::Caps>,
}
@ -437,6 +437,7 @@ pub struct Codec {
pub name: String,
pub caps: gst::Caps,
pub stream_type: gst::StreamType,
pub is_raw: bool,
payload_type: Option<i32>,
decoding_info: Option<DecodingInfo>,
@ -449,16 +450,27 @@ impl Codec {
stream_type: gst::StreamType,
caps: &gst::Caps,
decoders: &glib::List<gst::ElementFactory>,
depayloaders: &glib::List<gst::ElementFactory>,
encoders: &glib::List<gst::ElementFactory>,
payloaders: &glib::List<gst::ElementFactory>,
) -> Self {
let has_decoder = Self::has_decoder_for_caps(caps, decoders);
let has_depayloader = Self::has_depayloader_for_codec(name, depayloaders);
let decoding_info = if has_depayloader && has_decoder {
Some(DecodingInfo {
has_decoder: AtomicBool::new(has_decoder),
})
} else {
None
};
let encoder = Self::get_encoder_for_caps(caps, encoders);
let payloader = Self::get_payloader_for_codec(name, payloaders);
let encoding_info = if let (Some(encoder), Some(payloader)) = (encoder, payloader) {
Some(EncodingInfo {
encoder,
encoder: Some(encoder),
payloader,
output_filter: None,
})
@ -470,12 +482,52 @@ impl Codec {
caps: caps.clone(),
stream_type,
name: name.into(),
is_raw: false,
payload_type: None,
decoding_info,
encoding_info,
}
}
decoding_info: Some(DecodingInfo {
has_decoder: AtomicBool::new(has_decoder),
}),
pub fn new_raw(
name: &str,
stream_type: gst::StreamType,
depayloaders: &glib::List<gst::ElementFactory>,
payloaders: &glib::List<gst::ElementFactory>,
) -> Self {
let decoding_info = if Self::has_depayloader_for_codec(name, depayloaders) {
Some(DecodingInfo {
has_decoder: AtomicBool::new(false),
})
} else {
None
};
let payloader = Self::get_payloader_for_codec(name, payloaders);
let encoding_info = payloader.map(|payloader| EncodingInfo {
encoder: None,
payloader,
output_filter: None,
});
let mut caps = None;
if let Some(elem) = Codec::get_payloader_for_codec(name, payloaders) {
if let Some(tmpl) = elem
.static_pad_templates()
.iter()
.find(|template| template.direction() == gst::PadDirection::Sink)
{
caps = Some(tmpl.caps());
}
}
Self {
caps: caps.unwrap_or_else(gst::Caps::new_empty),
stream_type,
name: name.into(),
is_raw: true,
payload_type: None,
decoding_info,
encoding_info,
}
}
@ -488,33 +540,15 @@ impl Codec {
self.payload_type = Some(pt);
}
pub fn new_decoding(
name: &str,
stream_type: gst::StreamType,
caps: &gst::Caps,
decoders: &glib::List<gst::ElementFactory>,
) -> Self {
let has_decoder = Self::has_decoder_for_caps(caps, decoders);
Self {
caps: caps.clone(),
stream_type,
name: name.into(),
payload_type: None,
decoding_info: Some(DecodingInfo {
has_decoder: AtomicBool::new(has_decoder),
}),
encoding_info: None,
}
}
pub fn has_decoder(&self) -> bool {
pub fn can_be_received(&self) -> bool {
if self.decoding_info.is_none() {
return false;
}
if self.is_raw {
return true;
}
let decoder_info = self.decoding_info.as_ref().unwrap();
if decoder_info.has_decoder.load(Ordering::SeqCst) {
true
@ -599,6 +633,40 @@ impl Codec {
})
}
fn has_depayloader_for_codec(
codec: &str,
depayloaders: &glib::List<gst::ElementFactory>,
) -> bool {
depayloaders.iter().any(|factory| {
factory.static_pad_templates().iter().any(|template| {
let template_caps = template.caps();
if template.direction() != gst::PadDirection::Sink {
return false;
}
template_caps.iter().any(|s| {
s.has_field("encoding-name")
&& s.get::<gst::List>("encoding-name").map_or_else(
|_| {
if let Ok(encoding_name) = s.get::<&str>("encoding-name") {
encoding_name == codec
} else {
false
}
},
|encoding_names| {
encoding_names.iter().any(|v| {
v.get::<&str>()
.map_or(false, |encoding_name| encoding_name == codec)
})
},
)
})
})
})
}
pub fn is_video(&self) -> bool {
matches!(self.stream_type, gst::StreamType::VIDEO)
}
@ -608,11 +676,13 @@ impl Codec {
}
pub fn build_encoder(&self) -> Option<Result<gst::Element, Error>> {
self.encoding_info.as_ref().map(|info| {
info.encoder
.create()
.build()
.with_context(|| format!("Creating encoder {}", info.encoder.name()))
self.encoding_info.as_ref().and_then(|info| {
info.encoder.as_ref().map(|encoder| {
encoder
.create()
.build()
.with_context(|| format!("Creating encoder {}", encoder.name()))
})
})
}
@ -655,13 +725,17 @@ impl Codec {
}
pub fn encoder_factory(&self) -> Option<gst::ElementFactory> {
self.encoding_info.as_ref().map(|info| info.encoder.clone())
self.encoding_info
.as_ref()
.and_then(|info| info.encoder.clone())
}
pub fn encoder_name(&self) -> Option<String> {
self.encoding_info
.as_ref()
.map(|info| info.encoder.name().to_string())
self.encoding_info.as_ref().and_then(|info| {
info.encoder
.as_ref()
.map(|encoder| encoder.name().to_string())
})
}
pub fn set_output_filter(&mut self, caps: gst::Caps) {
@ -753,7 +827,7 @@ impl Codecs {
Self(codecs.values().cloned().collect())
}
pub fn find_for_encoded_caps(&self, caps: &gst::Caps) -> Option<Codec> {
pub fn find_for_payloadable_caps(&self, caps: &gst::Caps) -> Option<Codec> {
self.iter()
.find(|codec| codec.caps.can_intersect(caps) && codec.encoding_info.is_some())
.cloned()
@ -766,6 +840,11 @@ static CODECS: Lazy<Codecs> = Lazy::new(|| {
gst::Rank::MARGINAL,
);
let depayloaders = gst::ElementFactory::factories_with_type(
gst::ElementFactoryType::DEPAYLOADER,
gst::Rank::MARGINAL,
);
let encoders = gst::ElementFactory::factories_with_type(
gst::ElementFactoryType::ENCODER,
gst::Rank::MARGINAL,
@ -782,14 +861,19 @@ static CODECS: Lazy<Codecs> = Lazy::new(|| {
gst::StreamType::AUDIO,
&OPUS_CAPS,
&decoders,
&depayloaders,
&encoders,
&payloaders,
),
Codec::new_raw("L24", gst::StreamType::AUDIO, &depayloaders, &payloaders),
Codec::new_raw("L16", gst::StreamType::AUDIO, &depayloaders, &payloaders),
Codec::new_raw("L8", gst::StreamType::AUDIO, &depayloaders, &payloaders),
Codec::new(
"VP8",
gst::StreamType::VIDEO,
&VP8_CAPS,
&decoders,
&depayloaders,
&encoders,
&payloaders,
),
@ -798,6 +882,7 @@ static CODECS: Lazy<Codecs> = Lazy::new(|| {
gst::StreamType::VIDEO,
&H264_CAPS,
&decoders,
&depayloaders,
&encoders,
&payloaders,
),
@ -806,6 +891,7 @@ static CODECS: Lazy<Codecs> = Lazy::new(|| {
gst::StreamType::VIDEO,
&VP9_CAPS,
&decoders,
&depayloaders,
&encoders,
&payloaders,
),
@ -814,6 +900,7 @@ static CODECS: Lazy<Codecs> = Lazy::new(|| {
gst::StreamType::VIDEO,
&H265_CAPS,
&decoders,
&depayloaders,
&encoders,
&payloaders,
),
@ -822,9 +909,11 @@ static CODECS: Lazy<Codecs> = Lazy::new(|| {
gst::StreamType::VIDEO,
&AV1_CAPS,
&decoders,
&depayloaders,
&encoders,
&payloaders,
),
Codec::new_raw("RAW", gst::StreamType::VIDEO, &depayloaders, &payloaders),
])
});
@ -836,36 +925,16 @@ impl Codecs {
.cloned()
}
pub fn video_codecs() -> Vec<Codec> {
pub fn video_codecs<'a>() -> impl Iterator<Item = &'a Codec> {
CODECS
.iter()
.filter(|codec| codec.stream_type == gst::StreamType::VIDEO)
.cloned()
.collect()
}
pub fn audio_codecs() -> Vec<Codec> {
pub fn audio_codecs<'a>() -> impl Iterator<Item = &'a Codec> {
CODECS
.iter()
.filter(|codec| codec.stream_type == gst::StreamType::AUDIO)
.cloned()
.collect()
}
pub fn video_codec_names() -> Vec<String> {
CODECS
.iter()
.filter(|codec| codec.stream_type == gst::StreamType::VIDEO)
.map(|codec| codec.name.clone())
.collect()
}
pub fn audio_codec_names() -> Vec<String> {
CODECS
.iter()
.filter(|codec| codec.stream_type == gst::StreamType::AUDIO)
.map(|codec| codec.name.clone())
.collect()
}
/// List all codecs that can be used for encoding the given caps and assign
@ -909,9 +978,9 @@ impl Codecs {
}
}
pub fn is_raw_caps(caps: &gst::Caps) -> bool {
assert!(caps.is_fixed());
["video/x-raw", "audio/x-raw"].contains(&caps.structure(0).unwrap().name().as_str())
pub fn has_raw_caps(caps: &gst::Caps) -> bool {
caps.iter()
.any(|s| ["video/x-raw", "audio/x-raw"].contains(&s.name().as_str()))
}
pub fn cleanup_codec_caps(mut caps: gst::Caps) -> gst::Caps {

View file

@ -1,6 +1,8 @@
// SPDX-License-Identifier: MPL-2.0
use crate::utils::{cleanup_codec_caps, is_raw_caps, make_element, Codec, Codecs, NavigationEvent};
use crate::utils::{
cleanup_codec_caps, has_raw_caps, make_element, Codec, Codecs, NavigationEvent,
};
use anyhow::Context;
use gst::glib;
use gst::prelude::*;
@ -13,6 +15,7 @@ use gst_webrtc::{WebRTCDataChannel, WebRTCICETransportPolicy};
use futures::prelude::*;
use anyhow::{anyhow, Error};
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::collections::HashMap;
@ -513,12 +516,12 @@ impl Default for Settings {
Self {
video_caps: Codecs::video_codecs()
.into_iter()
.flat_map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::<Vec<_>>())
.filter(|codec| !codec.is_raw)
.flat_map(|codec| codec.caps.iter().map(ToOwned::to_owned))
.collect::<gst::Caps>(),
audio_caps: Codecs::audio_codecs()
.into_iter()
.flat_map(|codec| codec.caps.iter().map(|s| s.to_owned()).collect::<Vec<_>>())
.filter(|codec| !codec.is_raw)
.flat_map(|codec| codec.caps.iter().map(ToOwned::to_owned))
.collect::<gst::Caps>(),
stun_server: DEFAULT_STUN_SERVER.map(String::from),
turn_servers: gst::Array::new(Vec::new() as Vec<glib::SendValue>),
@ -884,7 +887,12 @@ impl PayloadChainBuilder {
codec = self.codec,
);
let needs_encoding = is_raw_caps(&self.input_caps);
let needs_encoding = if self.codec.is_raw {
!self.codec.caps.can_intersect(&self.input_caps)
} else {
has_raw_caps(&self.input_caps)
};
let mut elements: Vec<gst::Element> = Vec::new();
let (raw_filter, encoder) = if needs_encoding {
@ -898,14 +906,20 @@ impl PayloadChainBuilder {
let raw_filter = self.codec.raw_converter_filter()?;
elements.push(raw_filter.clone());
let encoder = self
.codec
.build_encoder()
.expect("We should always have an encoder for negotiated codecs")?;
elements.push(encoder.clone());
elements.push(make_element("capsfilter", None)?);
let encoder = if self.codec.is_raw {
None
} else {
let encoder = self
.codec
.build_encoder()
.expect("We should always have an encoder for negotiated codecs")?;
elements.push(encoder.clone());
elements.push(make_element("capsfilter", None)?);
(Some(raw_filter), Some(encoder))
Some(encoder)
};
(Some(raw_filter), encoder)
} else {
(None, None)
};
@ -3431,7 +3445,7 @@ impl BaseWebRTCSink {
) -> Result<gst::Structure, Error> {
let pipe = PipelineWrapper(gst::Pipeline::default());
let has_raw_input = is_raw_caps(&input_caps);
let has_raw_input = has_raw_caps(&input_caps);
let src = discovery_info.create_src();
let mut elements = vec![src.clone().upcast::<gst::Element>()];
let encoding_chain_src = if codec.is_video() && has_raw_input {
@ -3616,29 +3630,7 @@ impl BaseWebRTCSink {
output_caps: gst::Caps,
codecs: &Codecs,
) -> Result<(), Error> {
let futs = if let Some(codec) = codecs.find_for_encoded_caps(&discovery_info.caps) {
let mut caps = discovery_info.caps.clone();
gst::info!(
CAT,
imp = self,
"Stream is already encoded with codec {}, still need to payload it",
codec.name
);
caps = cleanup_codec_caps(caps);
vec![self.run_discovery_pipeline(
&name,
&discovery_info,
codec,
caps,
&output_caps,
ExtensionConfigurationType::Auto,
)]
} else {
let sink_caps = discovery_info.caps.clone();
let futs = if has_raw_caps(&discovery_info.caps) {
if codecs.is_empty() {
return Err(anyhow!(
"No codec available for encoding stream {}, \
@ -3648,10 +3640,12 @@ impl BaseWebRTCSink {
));
}
let sink_caps = discovery_info.caps.clone();
let is_video = match sink_caps.structure(0).unwrap().name().as_str() {
"video/x-raw" => true,
"audio/x-raw" => false,
_ => anyhow::bail!("Unsupported caps: {}", discovery_info.caps),
_ => anyhow::bail!("expected audio or video raw caps: {sink_caps}"),
};
codecs
@ -3668,6 +3662,28 @@ impl BaseWebRTCSink {
)
})
.collect()
} else if let Some(codec) = codecs.find_for_payloadable_caps(&discovery_info.caps) {
let mut caps = discovery_info.caps.clone();
gst::info!(
CAT,
imp = self,
"Stream is already in the {} format, we still need to payload it",
codec.name
);
caps = cleanup_codec_caps(caps);
vec![self.run_discovery_pipeline(
&name,
&discovery_info,
codec,
caps,
&output_caps,
ExtensionConfigurationType::Auto,
)]
} else {
anyhow::bail!("Unsupported caps: {}", discovery_info.caps);
};
let mut payloader_caps = gst::Caps::new_empty();
@ -3949,12 +3965,20 @@ impl ObjectImpl for BaseWebRTCSink {
vec![
glib::ParamSpecBoxed::builder::<gst::Caps>("video-caps")
.nick("Video encoder caps")
.blurb("Governs what video codecs will be proposed")
.blurb(&format!("Governs what video codecs will be proposed. Valid values: [{}]",
Codecs::video_codecs()
.map(|c| c.caps.to_string())
.join("; ")
))
.mutable_ready()
.build(),
glib::ParamSpecBoxed::builder::<gst::Caps>("audio-caps")
.nick("Audio encoder caps")
.blurb("Governs what audio codecs will be proposed")
.blurb(&format!("Governs what audio codecs will be proposed. Valid values: [{}]",
Codecs::audio_codecs()
.map(|c| c.caps.to_string())
.join("; ")
))
.mutable_ready()
.build(),
glib::ParamSpecString::builder("stun-server")
@ -4397,7 +4421,8 @@ impl ElementImpl for BaseWebRTCSink {
gst::CapsFeatures::new([D3D11_MEMORY_FEATURE]),
);
for codec in Codecs::video_codecs() {
// Ignore specific raw caps from Codecs: they are covered by video/x-raw & audio/x-raw
for codec in Codecs::video_codecs().filter(|codec| !codec.is_raw) {
caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}
@ -4412,7 +4437,7 @@ impl ElementImpl for BaseWebRTCSink {
let mut caps_builder =
gst::Caps::builder_full().structure(gst::Structure::builder("audio/x-raw").build());
for codec in Codecs::audio_codecs() {
for codec in Codecs::audio_codecs().filter(|codec| !codec.is_raw) {
caps_builder = caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}
let audio_pad_template = gst::PadTemplate::with_gtype(

View file

@ -9,6 +9,7 @@ use anyhow::{Context, Error};
use gst::glib;
use gst::subclass::prelude::*;
use gst_webrtc::WebRTCDataChannel;
use itertools::Itertools;
use once_cell::sync::Lazy;
use std::borrow::BorrowMut;
use std::collections::{HashMap, HashSet};
@ -94,14 +95,18 @@ impl ObjectImpl for BaseWebRTCSrc {
gst::ParamSpecArray::builder("video-codecs")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb(&format!("Names of video codecs to be be used during the SDP negotiation. Valid values: [{}]",
Codecs::video_codec_names().into_iter().collect::<Vec<String>>().join(", ")
Codecs::video_codecs()
.map(|c| c.name.as_str())
.join(", ")
))
.element_spec(&glib::ParamSpecString::builder("video-codec-name").build())
.build(),
gst::ParamSpecArray::builder("audio-codecs")
.flags(glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY)
.blurb(&format!("Names of audio codecs to be be used during the SDP negotiation. Valid values: [{}]",
Codecs::audio_codec_names().into_iter().collect::<Vec<String>>().join(", ")
Codecs::audio_codecs()
.map(|c| c.name.as_str())
.join(", ")
))
.element_spec(&glib::ParamSpecString::builder("audio-codec-name").build())
.build(),
@ -271,12 +276,12 @@ impl Default for Settings {
signaller: signaller.upcast(),
meta: Default::default(),
audio_codecs: Codecs::audio_codecs()
.into_iter()
.filter(|codec| codec.has_decoder())
.filter(|codec| !codec.is_raw)
.cloned()
.collect(),
video_codecs: Codecs::video_codecs()
.into_iter()
.filter(|codec| codec.has_decoder())
.filter(|codec| !codec.is_raw)
.cloned()
.collect(),
enable_data_channel_navigation: DEFAULT_ENABLE_DATA_CHANNEL_NAVIGATION,
do_retransmission: DEFAULT_DO_RETRANSMISSION,
@ -630,7 +635,13 @@ impl Session {
}
srcpad.set_target(Some(&ghostpad)).unwrap();
} else {
gst::debug!(CAT, obj = element, "Unused webrtcbin pad {webrtcbin_pad:?}");
gst::debug!(
CAT,
obj = element,
"Unused webrtcbin pad {} {:?}",
webrtcbin_pad.name(),
webrtcbin_pad.current_caps(),
);
}
ghostpad
}
@ -1326,11 +1337,13 @@ impl BaseWebRTCSrc {
impl ElementImpl for BaseWebRTCSrc {
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
// Ignore specific raw caps from Codecs: they are covered by VIDEO_CAPS & AUDIO_CAPS
let mut video_caps_builder = gst::Caps::builder_full()
.structure_with_any_features(VIDEO_CAPS.structure(0).unwrap().to_owned())
.structure(RTP_CAPS.structure(0).unwrap().to_owned());
for codec in Codecs::video_codecs() {
for codec in Codecs::video_codecs().filter(|codec| !codec.is_raw) {
video_caps_builder =
video_caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}
@ -1339,7 +1352,7 @@ impl ElementImpl for BaseWebRTCSrc {
.structure_with_any_features(AUDIO_CAPS.structure(0).unwrap().to_owned())
.structure(RTP_CAPS.structure(0).unwrap().to_owned());
for codec in Codecs::audio_codecs() {
for codec in Codecs::audio_codecs().filter(|codec| !codec.is_raw) {
audio_caps_builder =
audio_caps_builder.structure(codec.caps.structure(0).unwrap().to_owned());
}