webrtcsink: add payloader-setup signal

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1389>
This commit is contained in:
Maksym Khomenko 2023-11-08 02:23:30 +02:00 committed by GStreamer Marge Bot
parent 1ef47cb48e
commit 17f0b61576
3 changed files with 280 additions and 87 deletions

View file

@ -7038,6 +7038,24 @@
"return-type": "GStrv",
"when": "last"
},
"payloader-setup": {
"args": [
{
"name": "arg0",
"type": "gchararray"
},
{
"name": "arg1",
"type": "gchararray"
},
{
"name": "arg2",
"type": "GstElement"
}
],
"return-type": "gboolean",
"when": "last"
},
"request-encoded-filter": {
"args": [
{

View file

@ -1,5 +1,5 @@
use std::{
collections::{BTreeMap, HashMap},
collections::{BTreeMap, HashMap, HashSet},
ops::Deref,
sync::atomic::{AtomicBool, Ordering},
};
@ -611,28 +611,10 @@ impl Codec {
})
}
pub fn build_payloader(&self, pt: u32) -> Option<gst::Element> {
self.encoding_info.as_ref().map(|info| {
let mut res = info
.payloader
.create()
.property("mtu", 1200_u32)
.property("pt", pt);
match info.payloader.name().as_str() {
"rtpvp8pay" | "rtpvp9pay" => {
res = res.property_from_str("picture-id-mode", "15-bit");
}
"rtph264pay" | "rtph265pay" => {
res = res
.property_from_str("aggregate-mode", "zero-latency")
.property("config-interval", -1i32);
}
_ => (),
}
res.build().unwrap()
})
pub fn create_payloader(&self) -> Option<gst::Element> {
self.encoding_info
.as_ref()
.map(|info| info.payloader.create().build().unwrap())
}
pub fn raw_converter_filter(&self) -> Result<gst::Element, Error> {
@ -938,3 +920,39 @@ pub struct NavigationEvent {
#[serde(flatten)]
pub event: gst_video::NavigationEvent,
}
pub fn find_smallest_available_ext_id(ids: impl IntoIterator<Item = u32>) -> u32 {
let used_numbers: HashSet<_> = ids.into_iter().collect();
(1..).find(|&num| !used_numbers.contains(&num)).unwrap()
}
#[cfg(test)]
mod tests {
use super::*;
fn test_find_smallest_available_ext_id_case(
ids: impl IntoIterator<Item = u32>,
expected: u32,
) -> Result<(), String> {
let actual = find_smallest_available_ext_id(ids);
if actual != expected {
return Err(format!("Expected {}, got {}", expected, actual));
}
Ok(())
}
#[test]
fn test_find_smallest_available_ext_id() -> Result<(), String> {
[
(vec![], 1u32),
(vec![2u32, 3u32, 4u32], 1u32),
(vec![1u32, 3u32, 4u32], 2u32),
(vec![4u32, 1u32, 3u32], 2u32),
(vec![1u32, 2u32, 3u32], 4u32),
]
.into_iter()
.try_for_each(|(input, expected)| test_find_smallest_available_ext_id_case(input, expected))
}
}

View file

@ -25,7 +25,7 @@ use crate::aws_kvs_signaller::AwsKvsSignaller;
use crate::livekit_signaller::LiveKitSignaller;
use crate::signaller::{prelude::*, Signallable, Signaller, WebRTCSignallerRole};
use crate::whip_signaller::WhipClientSignaller;
use crate::RUNTIME;
use crate::{utils, RUNTIME};
use std::collections::{BTreeMap, HashSet};
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -749,6 +749,36 @@ fn configure_encoder(enc: &gst::Element, start_bitrate: u32) {
}
}
/// Default configuration for known payloaders, can be disabled
/// by returning True from an payloader-setup handler.
fn configure_payloader(pay: &gst::Element) {
pay.set_property("mtu", 1200_u32);
match pay.factory().unwrap().name().as_str() {
"rtpvp8pay" | "rtpvp9pay" => {
pay.set_property_from_str("picture-id-mode", "15-bit");
}
"rtph264pay" | "rtph265pay" => {
pay.set_property_from_str("aggregate-mode", "zero-latency");
pay.set_property("config-interval", -1i32);
}
_ => (),
}
}
fn setup_signal_accumulator(
_hint: &glib::subclass::SignalInvocationHint,
ret: &mut glib::Value,
value: &glib::Value,
) -> bool {
let is_configured = value.get::<bool>().unwrap();
let continue_emission = !is_configured;
*ret = value.clone();
continue_emission
}
/// Set of elements used in an EncodingChain
struct EncodingChain {
raw_filter: Option<gst::Element>,
@ -756,22 +786,24 @@ struct EncodingChain {
pay_filter: gst::Element,
}
struct EncodingChainBuilder {
/// A set of elements that transform raw data into RTP packets
struct PayloadChain {
encoding_chain: EncodingChain,
payloader: gst::Element,
}
struct PayloadChainBuilder {
/// Caps of the input chain
input_caps: gst::Caps,
//// Caps expected after the payloader
/// Caps expected after the payloader
output_caps: gst::Caps,
/// The Codec representing wanted encoding
codec: Codec,
/// The SSRC to use for the RTP stream if any
/// Filter element between the encoder and the payloader.
encoded_filter: Option<gst::Element>,
ssrc: Option<u32>,
/// The TWCC ID to use for payloaded stream
twcc: Option<u32>,
}
impl EncodingChainBuilder {
impl PayloadChainBuilder {
fn new(
input_caps: &gst::Caps,
output_caps: &gst::Caps,
@ -783,31 +815,18 @@ impl EncodingChainBuilder {
output_caps: output_caps.clone(),
codec: codec.clone(),
encoded_filter,
ssrc: None,
twcc: None,
}
}
fn ssrc(mut self, ssrc: u32) -> Self {
self.ssrc = Some(ssrc);
self
}
fn twcc(mut self, twcc: u32) -> Self {
self.twcc = Some(twcc);
self
}
fn build(self, pipeline: &gst::Pipeline, src: &gst::Element) -> Result<EncodingChain, Error> {
fn build(self, pipeline: &gst::Pipeline, src: &gst::Element) -> Result<PayloadChain, Error> {
gst::trace!(
CAT,
obj: pipeline,
"Setting up encoding, input caps: {input_caps}, \
output caps: {output_caps}, codec: {codec:?}, twcc: {twcc:?}",
output caps: {output_caps}, codec: {codec:?}",
input_caps = self.input_caps,
output_caps = self.output_caps,
codec = self.codec,
twcc = self.twcc,
);
let needs_encoding = is_raw_caps(&self.input_caps);
@ -856,33 +875,10 @@ impl EncodingChainBuilder {
let pay = self
.codec
.build_payloader(
self.codec
.payload()
.expect("Negotiated codec should always have pt set") as u32,
)
.create_payloader()
.expect("Payloaders should always have been set in the CodecInfo we handle");
if let Some(ssrc) = self.ssrc {
pay.set_property("ssrc", ssrc);
}
/* We only enforce TWCC in the offer caps, once a remote description
* has been set it will get automatically negotiated. This is necessary
* because the implementor in Firefox had apparently not understood the
* concept of *transport-wide* congestion control, and firefox doesn't
* provide feedback for audio packets.
*/
if let Some(idx) = self.twcc {
if let Some(twcc_extension) = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI)
{
twcc_extension.set_id(idx);
pay.emit_by_name::<()>("add-extension", &[&twcc_extension]);
} else {
anyhow::bail!("Failed to add TWCC extension, make sure 'gst-plugins-good:rtpmanager' is installed");
}
}
elements.push(pay);
elements.push(pay.clone());
let pay_filter = gst::ElementFactory::make("capsfilter")
.property("caps", self.output_caps)
@ -898,10 +894,13 @@ impl EncodingChainBuilder {
gst::Element::link_many(elements.iter().collect::<Vec<&gst::Element>>().as_slice())
.with_context(|| "Linking encoding elements")?;
Ok(EncodingChain {
raw_filter,
encoder,
pay_filter,
Ok(PayloadChain {
encoding_chain: EncodingChain {
raw_filter,
encoder,
pay_filter,
},
payloader: pay,
})
}
}
@ -1236,7 +1235,10 @@ impl Session {
let output_caps = codec.output_filter().unwrap_or_else(gst::Caps::new_any);
let encoding_chain = EncodingChainBuilder::new(
let PayloadChain {
payloader,
encoding_chain,
} = PayloadChainBuilder::new(
&webrtc_pad.in_caps,
&output_caps,
&codec,
@ -1245,13 +1247,21 @@ impl Session {
&[&Some(&self.peer_id), &stream_name, &codec.caps],
),
)
.ssrc(webrtc_pad.ssrc)
.build(&self.pipeline, &appsrc)?;
if let Some(ref enc) = encoding_chain.encoder {
element.emit_by_name::<bool>("encoder-setup", &[&self.peer_id, &stream_name, &enc]);
}
element.imp().configure_payloader(
&self.peer_id,
stream_name,
&payloader,
&codec,
Some(webrtc_pad.ssrc),
ExtensionConfigurationType::Skip,
)?;
// At this point, the peer has provided its answer, and we want to
// let the payloader / encoder perform negotiation according to that.
//
@ -1448,7 +1458,103 @@ impl NavigationEventHandler {
}
}
/// How to configure RTP extensions for payloaders, if at all
enum ExtensionConfigurationType {
/// Skip configuration, do not add any extensions
Skip,
/// Configure extensions and assign IDs automatically, based on already enabled extensions
Auto,
/// Configure extensions, use specific ids that were provided
Apply { twcc_id: u32 },
}
impl BaseWebRTCSink {
fn configure_congestion_control(
&self,
payloader: &gst::Element,
extension_configuration_type: ExtensionConfigurationType,
) -> Result<(), Error> {
if let ExtensionConfigurationType::Skip = extension_configuration_type {
return Ok(());
}
let settings = self.settings.lock().unwrap();
if settings.cc_info.heuristic == WebRTCSinkCongestionControl::Disabled {
return Ok(());
}
let enabled_extensions: gst::Array = payloader.property("extensions");
let twcc = enabled_extensions
.iter()
.find(|value| {
let value = value.get::<gst_rtp::RTPHeaderExtension>().unwrap();
match value.uri() {
Some(v) => v == RTP_TWCC_URI,
None => false,
}
})
.map(|value| value.get::<gst_rtp::RTPHeaderExtension>().unwrap());
if let Some(ext) = twcc {
gst::debug!(CAT, obj: payloader, "TWCC extension is already mapped to id {} by application", ext.id());
return Ok(());
}
let twcc_id = match extension_configuration_type {
ExtensionConfigurationType::Auto => utils::find_smallest_available_ext_id(
enabled_extensions
.iter()
.map(|value| value.get::<gst_rtp::RTPHeaderExtension>().unwrap().id()),
),
ExtensionConfigurationType::Apply { twcc_id } => twcc_id,
ExtensionConfigurationType::Skip => unreachable!(),
};
gst::debug!(CAT, obj: payloader, "Mapping TWCC extension to ID {}", twcc_id);
/* We only enforce TWCC in the offer caps, once a remote description
* has been set it will get automatically negotiated. This is necessary
* because the implementor in Firefox had apparently not understood the
* concept of *transport-wide* congestion control, and firefox doesn't
* provide feedback for audio packets.
*/
if let Some(twcc_extension) = gst_rtp::RTPHeaderExtension::create_from_uri(RTP_TWCC_URI) {
twcc_extension.set_id(twcc_id);
payloader.emit_by_name::<()>("add-extension", &[&twcc_extension]);
} else {
anyhow::bail!("Failed to add TWCC extension, make sure 'gst-plugins-good:rtpmanager' is installed");
}
Ok(())
}
fn configure_payloader(
&self,
peer_id: &str,
stream_name: &str,
payloader: &gst::Element,
codec: &Codec,
ssrc: Option<u32>,
extension_configuration_type: ExtensionConfigurationType,
) -> Result<(), Error> {
self.obj()
.emit_by_name::<bool>("payloader-setup", &[&peer_id, &stream_name, &payloader]);
payloader.set_property(
"pt",
codec
.payload()
.expect("Negotiated codec should always have pt set") as u32,
);
if let Some(ssrc) = ssrc {
payloader.set_property("ssrc", ssrc);
}
self.configure_congestion_control(payloader, extension_configuration_type)
}
fn generate_ssrc(
element: &super::BaseWebRTCSink,
webrtc_pads: &HashMap<u32, WebRTCPad>,
@ -2026,6 +2132,10 @@ impl BaseWebRTCSink {
.iter()
.flat_map(|(_, codecs_and_caps)| codecs_and_caps)
.map(|(codec, caps)| async move {
let extension_configuration_type = twcc_idx
.map(|twcc_id| ExtensionConfigurationType::Apply { twcc_id })
.unwrap_or(ExtensionConfigurationType::Skip);
BaseWebRTCSink::run_discovery_pipeline(
element,
stream_name,
@ -2033,7 +2143,7 @@ impl BaseWebRTCSink {
codec.clone(),
in_caps.clone(),
caps,
twcc_idx,
extension_configuration_type,
)
.await
.map(|s| {
@ -2999,7 +3109,7 @@ impl BaseWebRTCSink {
codec: Codec,
input_caps: gst::Caps,
output_caps: &gst::Caps,
twcc: Option<u32>,
extension_configuration_type: ExtensionConfigurationType,
) -> Result<gst::Structure, Error> {
let pipe = PipelineWrapper(gst::Pipeline::default());
@ -3029,7 +3139,7 @@ impl BaseWebRTCSink {
gst::Element::link_many(elements_slice)
.with_context(|| format!("Running discovery pipeline for caps {input_caps}"))?;
let mut encoding_chain_builder = EncodingChainBuilder::new(
let payload_chain_builder = PayloadChainBuilder::new(
&src.caps()
.expect("Caps should always be set when starting discovery"),
output_caps,
@ -3039,10 +3149,11 @@ impl BaseWebRTCSink {
&[&Option::<String>::None, &stream_name, &codec.caps],
),
);
if let Some(twcc) = twcc {
encoding_chain_builder = encoding_chain_builder.twcc(twcc)
}
let encoding_chain = encoding_chain_builder.build(&pipe.0, &encoding_chain_src)?;
let PayloadChain {
payloader,
encoding_chain,
} = payload_chain_builder.build(&pipe.0, &encoding_chain_src)?;
if let Some(ref enc) = encoding_chain.encoder {
element.emit_by_name::<bool>(
@ -3051,6 +3162,15 @@ impl BaseWebRTCSink {
);
}
element.imp().configure_payloader(
"discovery",
stream_name,
&payloader,
&codec,
None,
extension_configuration_type,
)?;
let sink = gst_app::AppSink::builder()
.callbacks(
gst_app::AppSinkCallbacks::builder()
@ -3196,7 +3316,7 @@ impl BaseWebRTCSink {
codec,
caps,
&output_caps,
Some(1),
ExtensionConfigurationType::Auto,
)]
} else {
let sink_caps = discovery_info.caps.clone();
@ -3218,7 +3338,7 @@ impl BaseWebRTCSink {
codec.clone(),
sink_caps.clone(),
&output_caps,
Some(1),
ExtensionConfigurationType::Auto,
)
})
.collect()
@ -3775,7 +3895,7 @@ impl ObjectImpl for BaseWebRTCSink {
gst::Element::static_type(),
])
.return_type::<bool>()
.accumulator(|_hint, _ret, value| !value.get::<bool>().unwrap())
.accumulator(setup_signal_accumulator)
.class_handler(|_, args| {
let element = args[0].get::<super::BaseWebRTCSink>().expect("signal arg");
let enc = args[3].get::<gst::Element>().unwrap();
@ -3795,6 +3915,43 @@ impl ObjectImpl for BaseWebRTCSink {
Some(false.to_value())
})
.build(),
/**
* RsBaseWebRTCSink::payloader-setup:
* @consumer_id: Identifier of the consumer, or "discovery"
* when the payloader is used in a discovery pipeline.
* @pad_name: The name of the corresponding input pad
* @payloader: The constructed payloader for selected codec
*
* This signal can be used to tweak @payloader properties, in particular, adding
* additional extensions.
*
* Note that payload type and ssrc settings are managed by webrtcsink element and
* trying to change them from an application handler will have no effect.
*
* Returns: True if the encoder is entirely configured,
* False to let other handlers run. Note that unless your intent is to enforce
* your custom settings, it's recommended to let the default handler run
* (by returning true), which would apply the optimal settings.
*/
glib::subclass::Signal::builder("payloader-setup")
.param_types([
String::static_type(),
String::static_type(),
gst::Element::static_type(),
])
.return_type::<bool>()
.accumulator(setup_signal_accumulator)
.class_handler(|_, args| {
let pay = args[3].get::<gst::Element>().unwrap();
configure_payloader(&pay);
// The default handler is no-op: the whole configuration logic happens
// in BaseWebRTCSink::configure_payloader, which is where this signal
// is invoked from
Some(false.to_value())
})
.build(),
/**
* RsWebRTCSink::request-encoded-filter:
* @consumer_id: Identifier of the consumer