rtp: gccbwe: don't break downstream assumptions pushing buffer lists

Some elements in the RTP stack assume all buffers in a `gst::BufferList`
correspond to the same timestamp. See in [`rtpsession`] for instance.
This also had the effect that `rtpsession` did not create correct RTCP as it
only saw some of the SSRCs in the stream.

`rtpgccbwe` formed a packet group by gathering buffers in a `gst::BufferList`,
regardless of whether they corresponded to the same timestamp, which broke
synchronization under certain circonstances.

This commit makes `rtpgccbwe` push the buffers as they were received: one by one.

[`rtpsession`]: bc858976db/subprojects/gst-plugins-good/gst/rtpmanager/gstrtpsession.c (L2462)

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1502>
This commit is contained in:
François Laignel 2024-03-20 16:23:26 +01:00 committed by GStreamer Marge Bot
parent 2b9272c7eb
commit cc7b7d508d

View file

@ -19,6 +19,7 @@
*/ */
use gst::{glib, prelude::*, subclass::prelude::*}; use gst::{glib, prelude::*, subclass::prelude::*};
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use smallvec::SmallVec;
use std::{ use std::{
collections::{BTreeMap, VecDeque}, collections::{BTreeMap, VecDeque},
fmt, fmt,
@ -29,6 +30,7 @@ use std::{
use time::Duration; use time::Duration;
type Bitrate = u32; type Bitrate = u32;
type BufferList = SmallVec<[gst::Buffer; 10]>;
const DEFAULT_MIN_BITRATE: Bitrate = 1000; const DEFAULT_MIN_BITRATE: Bitrate = 1000;
const DEFAULT_ESTIMATED_BITRATE: Bitrate = 2_048_000; const DEFAULT_ESTIMATED_BITRATE: Bitrate = 2_048_000;
@ -748,7 +750,7 @@ impl Default for State {
impl State { impl State {
// 4. sending engine implementing a "leaky bucket" // 4. sending engine implementing a "leaky bucket"
fn create_buffer_list(&mut self, bwe: &super::BandwidthEstimator) -> gst::BufferList { fn create_buffer_list(&mut self, bwe: &super::BandwidthEstimator) -> BufferList {
let now = time::Instant::now(); let now = time::Instant::now();
let elapsed = now - self.last_push; let elapsed = now - self.last_push;
let mut budget = (elapsed.whole_nanoseconds() as i64) let mut budget = (elapsed.whole_nanoseconds() as i64)
@ -762,8 +764,8 @@ impl State {
let mut remaining = self.buffers.iter().map(|b| b.size() as f64).sum::<f64>() * 8.; let mut remaining = self.buffers.iter().map(|b| b.size() as f64).sum::<f64>() * 8.;
let total_size = remaining; let total_size = remaining;
let mut list = gst::BufferList::new(); let mut list_size = 0;
let mutlist = list.get_mut().unwrap(); let mut list = BufferList::new();
// Leak the bucket so it can hold at most 30ms of data // Leak the bucket so it can hold at most 30ms of data
let maximum_remaining_bits = 30. * self.estimated_bitrate as f64 / 1000.; let maximum_remaining_bits = 30. * self.estimated_bitrate as f64 / 1000.;
@ -773,7 +775,8 @@ impl State {
let n_bits = buf.size() * 8; let n_bits = buf.size() * 8;
leaked = budget <= 0 && remaining > maximum_remaining_bits; leaked = budget <= 0 && remaining > maximum_remaining_bits;
mutlist.add(buf); list_size += buf.size();
list.push(buf);
budget -= n_bits as i64; budget -= n_bits as i64;
remaining -= n_bits as f64; remaining -= n_bits as f64;
} }
@ -786,7 +789,7 @@ impl State {
human_kbits(self.estimated_bitrate), human_kbits(self.estimated_bitrate),
human_kbits(budget as f64), human_kbits(budget as f64),
human_kbits(total_budget as f64), human_kbits(total_budget as f64),
human_kbits(list.calculate_size() as f64 * 8.), human_kbits(list_size as f64 * 8.),
human_kbits(remaining), human_kbits(remaining),
human_kbits(total_size) human_kbits(total_size)
); );
@ -1026,8 +1029,14 @@ pub struct BandwidthEstimator {
} }
impl BandwidthEstimator { impl BandwidthEstimator {
fn push_list(&self, list: gst::BufferList) -> Result<gst::FlowSuccess, gst::FlowError> { fn push_list(&self, list: BufferList) -> Result<gst::FlowSuccess, gst::FlowError> {
let res = self.srcpad.push_list(list); let mut res = Ok(gst::FlowSuccess::Ok);
for buf in list {
res = self.srcpad.push(buf);
if res.is_err() {
break;
}
}
self.state.lock().unwrap().flow_return = res; self.state.lock().unwrap().flow_return = res;