webrtcsink: add define-encoder-bitrates signal

When congestion control is used for a session with multiple encoders,
the default implementation simply divides the overall bitrate equally
between encoders.

This is not always desirable, and this patch exposes a new signal
that users can register to, with two arguments:

* The overall bitrate to allocate
* A structure with an encoder.stream_name -> bitrate mapping

Handlers should return a similar structure with a custom mapping.

An example is also provided.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1792>
This commit is contained in:
Mathieu Duponchelle 2024-09-20 15:45:38 +02:00 committed by GStreamer Marge Bot
parent f532d523b2
commit 87c6719e1d
4 changed files with 190 additions and 3 deletions

View file

@ -10766,6 +10766,24 @@
"return-type": "void",
"when": "last"
},
"define-encoder-bitrates": {
"args": [
{
"name": "arg0",
"type": "gchararray"
},
{
"name": "arg1",
"type": "gint"
},
{
"name": "arg2",
"type": "GstStructure"
}
],
"return-type": "GstStructure",
"when": "last"
},
"encoder-setup": {
"args": [
{

View file

@ -127,3 +127,6 @@ name = "webrtc-precise-sync-recv"
[[example]]
name = "whipserver"
required-features = [ "whip" ]
[[example]]
name = "webrtcsink-define-encoder-bitrates"

View file

@ -0,0 +1,85 @@
// The goal of this example is to demonstrate how to affect bitrate allocation
// when congestion control is happening in a session with multiple encoders
use anyhow::Error;
use gst::glib;
use gst::prelude::*;
fn main() -> Result<(), Error> {
gst::init()?;
// Create a very simple webrtc producer, offering a single video stream
let pipeline = gst::Pipeline::builder().build();
let videotestsrc = gst::ElementFactory::make("videotestsrc").build()?;
let queue = gst::ElementFactory::make("queue").build()?;
let webrtcsink = gst::ElementFactory::make("webrtcsink").build()?;
webrtcsink.connect_closure(
"define-encoder-bitrates",
false,
glib::closure!(|_webrtcsink: &gst::Element,
_consumer_id: &str,
overall: i32,
_in_structure: gst::Structure| {
let out_s = gst::Structure::builder("webrtcsink/encoder-bitrates")
.field(
"video_0",
overall.mul_div_round(75, 100).expect("should be scalable"),
)
.field(
"video_1",
overall.mul_div_round(25, 100).expect("should be scalable"),
)
.build();
Some(out_s)
}),
);
webrtcsink.set_property("run-signalling-server", true);
webrtcsink.set_property("run-web-server", true);
pipeline.add_many([&videotestsrc, &queue, &webrtcsink])?;
gst::Element::link_many([&videotestsrc, &queue, &webrtcsink])?;
let videotestsrc = gst::ElementFactory::make("videotestsrc").build()?;
let queue = gst::ElementFactory::make("queue").build()?;
pipeline.add_many([&videotestsrc, &queue])?;
gst::Element::link_many([&videotestsrc, &queue, &webrtcsink])?;
// Now we simply run the pipeline to completion
pipeline.set_state(gst::State::Playing)?;
let bus = pipeline.bus().expect("Pipeline should have a bus");
for msg in bus.iter_timed(gst::ClockTime::NONE) {
use gst::MessageView;
match msg.view() {
MessageView::Eos(..) => {
println!("EOS");
break;
}
MessageView::Error(err) => {
pipeline.set_state(gst::State::Null)?;
eprintln!(
"Got error from {}: {} ({})",
msg.src()
.map(|s| String::from(s.path_string()))
.unwrap_or_else(|| "None".into()),
err.error(),
err.debug().unwrap_or_else(|| "".into()),
);
break;
}
_ => (),
}
}
pipeline.set_state(gst::State::Null)?;
Ok(())
}

View file

@ -3622,15 +3622,53 @@ impl BaseWebRTCSink {
};
let fec_percentage = fec_ratio * 50f64;
let encoders_bitrate =
((bitrate as f64) / (1. + (fec_percentage / 100.)) / (n_encoders as f64)) as i32;
let encoders_bitrate = (bitrate as f64) / (1. + (fec_percentage / 100.));
let encoder_bitrate = (encoders_bitrate / (n_encoders as f64)) as i32;
if let Some(rtpxsend) = session.rtprtxsend.as_ref() {
rtpxsend.set_property("stuffing-kbps", (bitrate as f64 / 1000.) as i32);
}
let mut s_builder = gst::Structure::builder("webrtcsink/encoder-bitrates");
for encoder in session.encoders.iter() {
s_builder = s_builder.field(&encoder.stream_name, encoder_bitrate);
}
let s = s_builder.build();
let updated_bitrates = self.obj().emit_by_name::<gst::Structure>(
"define-encoder-bitrates",
&[&session.peer_id, &(encoders_bitrate as i32), &s],
);
for encoder in session.encoders.iter_mut() {
if encoder.set_bitrate(&self.obj(), encoders_bitrate).is_ok() {
let defined_encoder_bitrate =
match updated_bitrates.get::<i32>(&encoder.stream_name) {
Ok(bitrate) => {
gst::log!(
CAT,
imp = self,
"using defined bitrate {bitrate} for encoder {}",
encoder.stream_name
);
bitrate
}
Err(e) => {
gst::log!(
CAT,
imp = self,
"Error in defined bitrate: {e}, falling back to default bitrate \
{encoder_bitrate} for encoder {}",
encoder.stream_name
);
encoder_bitrate
}
};
if encoder
.set_bitrate(&self.obj(), defined_encoder_bitrate)
.is_ok()
{
encoder
.transceiver
.set_property("fec-percentage", (fec_percentage as u32).min(100));
@ -5059,6 +5097,49 @@ impl ObjectImpl for BaseWebRTCSink {
])
.return_type::<gst::Element>()
.build(),
/**
* GstBaseWebRTCSink::define-encoder-bitrates:
* @consumer_id: Identifier of the consumer
* @overall_bitrate: The total bitrate to allocate
* @structure: A structure describing the default per-encoder bitrates
*
* When a session carries multiple video streams, the congestion
* control mechanism will simply divide the overall allocated bitrate
* by the number of encoders and set the result as the bitrate for each
* individual encoder.
*
* With this signal, the application can affect how the overall bitrate
* gets allocated.
*
* The structure is named "webrtcsink/encoder-bitrates" and
* contains one gchararray to gint32 mapping per video stream
* name, for instance:
*
* "video_1234": 5000i32
* "video_5678": 10000i32
*
* The total of the bitrates in the returned structure should match
* the overall bitrate, as it does in the input structure.
*
* Returns: the updated encoder bitrates.
* Since: plugins-rs-0.14.0
*/
glib::subclass::Signal::builder("define-encoder-bitrates")
.param_types([
String::static_type(),
i32::static_type(),
gst::Structure::static_type(),
])
.return_type::<gst::Structure>()
.run_last()
.class_handler(|_token, args| {
Some(args[3usize].get::<gst::Structure>().expect("wrong argument").to_value())
})
.accumulator(move |_hint, output, input| {
*output = input.clone();
false
})
.build(),
]
});