cea708mux: support configuring a limit to how much data will be pending

Can prevent a build up of data and reduce the amount of delayed captions.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2311>
This commit is contained in:
Matthew Waters 2025-06-20 20:04:29 +10:00 committed by GStreamer Marge Bot
parent 6ae07945ac
commit d80f91da51
3 changed files with 245 additions and 101 deletions

View file

@ -8715,6 +8715,20 @@
"readable": true,
"type": "gboolean",
"writable": true
},
"max-time": {
"blurb": "Maximum amount of time that captions can be stored before output",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "18446744073709551615",
"max": "18446744073709551615",
"min": "0",
"mutable": "null",
"readable": true,
"type": "guint64",
"writable": true
}
},
"rank": "none"

View file

@ -109,9 +109,15 @@ impl Default for State {
}
}
#[derive(Clone, Debug, Default)]
struct Settings {
max_time: Option<gst::ClockTime>,
}
#[derive(Default)]
pub struct Cea708Mux {
state: Mutex<State>,
settings: Mutex<Settings>,
}
pub(crate) static CAT: LazyLock<gst::DebugCategory> = LazyLock::new(|| {
@ -351,6 +357,49 @@ impl AggregatorImpl for Cea708Mux {
state.writer.push_packet(output);
let _ = state.writer.write(fps, &mut data);
state.n_frames += 1;
let settings = self.settings.lock().unwrap().clone();
if let Some(max_time) = settings.max_time {
let written_buffer_time = gst::ClockTime::from_nseconds(
state
.writer
.buffered_cea608_field1_duration()
.max(state.writer.buffered_cea608_field2_duration())
.max(state.writer.buffered_packet_duration())
.as_nanos() as u64,
);
let max_pending_code_bytes = state
.pending_services
.values()
.map(|codes| codes.iter().map(|code| code.byte_len()).sum())
.max()
.unwrap_or(0);
let max_pending_code_time = gst::ClockTime::from_useconds(
(max_pending_code_bytes.div_ceil(2) as u64)
.mul_div_ceil(2 * 1001 * 1_000_000, 9_600_000 / 8)
.unwrap_or(0),
);
if written_buffer_time + max_pending_code_time > max_time {
gst::warning!(
CAT,
imp = self,
"Stored data of {} has overrun the configured limit of {}, flushing",
written_buffer_time.display(),
max_time.display()
);
state.writer.flush();
state.pending_services.clear();
for pad in sinkpads.iter().map(|pad| {
pad.downcast_ref::<super::Cea708MuxSinkPad>()
.expect("Not a Cea708MuxSinkPad?!")
}) {
let mut pad_state = pad.imp().pad_state.lock().unwrap();
pad_state.ccp_parser.flush();
}
}
}
drop(state);
// remove 2 byte header that our cc_data format does not use
@ -613,12 +662,21 @@ impl GstObjectImpl for Cea708Mux {}
impl ObjectImpl for Cea708Mux {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![glib::ParamSpecBoolean::builder("force-live")
.nick("Force live")
.blurb("Always operate in live mode and aggregate on timeout")
.default_value(DEFAULT_FORCE_LIVE)
.construct_only()
.build()]
vec![
glib::ParamSpecBoolean::builder("force-live")
.nick("Force live")
.blurb("Always operate in live mode and aggregate on timeout")
.default_value(DEFAULT_FORCE_LIVE)
.construct_only()
.build(),
glib::ParamSpecUInt64::builder("max-time")
.nick("Max Time")
.blurb("Maximum amount of time that captions can be stored before output")
.minimum(0)
.maximum(u64::MAX)
.default_value(u64::MAX)
.build(),
]
});
PROPERTIES.as_ref()
@ -630,6 +688,10 @@ impl ObjectImpl for Cea708Mux {
self.obj()
.set_force_live(value.get().expect("type checked upstream"));
}
"max-time" => {
let mut settings = self.settings.lock().unwrap();
settings.max_time = value.get().expect("Type checked upstream");
}
_ => unimplemented!(),
}
}
@ -637,6 +699,10 @@ impl ObjectImpl for Cea708Mux {
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"force-live" => self.obj().is_force_live().to_value(),
"max-time" => {
let settings = self.settings.lock().unwrap();
settings.max_time.to_value()
}
_ => unimplemented!(),
}
}

View file

@ -23,8 +23,23 @@ fn init() {
gstrsclosedcaption::plugin_register_static().unwrap();
});
}
fn gen_cc_data(seq: u8, service_no: u8, codes: &[Code]) -> gst::Buffer {
gen_cc_data_with_times(
seq,
service_no,
codes,
0.nseconds(),
gst::ClockTime::from_mseconds(400),
)
}
fn gen_cc_data_with_times(
seq: u8,
service_no: u8,
codes: &[Code],
pts: gst::ClockTime,
duration: gst::ClockTime,
) -> gst::Buffer {
assert!(seq < 4);
assert!(service_no < 64);
@ -44,8 +59,8 @@ fn gen_cc_data(seq: u8, service_no: u8, codes: &[Code]) -> gst::Buffer {
let mut buf = gst::Buffer::from_mut_slice(data);
{
let buf = buf.get_mut().unwrap();
buf.set_pts(0.nseconds());
buf.set_duration(gst::ClockTime::from_mseconds(400));
buf.set_pts(pts);
buf.set_duration(duration);
}
buf
}
@ -133,53 +148,59 @@ fn test_cea708mux_2pads_cc_data() {
}
}
static CODES: [Code; 46] = [
Code::LatinLowerA,
Code::LatinLowerB,
Code::LatinLowerC,
Code::LatinLowerD,
Code::LatinLowerE,
Code::LatinLowerF,
Code::LatinLowerG,
Code::LatinLowerH,
Code::LatinLowerI,
Code::LatinLowerJ,
Code::LatinLowerK,
Code::LatinLowerL,
Code::LatinLowerM,
Code::LatinLowerN,
Code::LatinLowerO,
Code::LatinLowerP,
Code::LatinLowerQ,
Code::LatinLowerR,
Code::LatinLowerS,
Code::LatinLowerT,
Code::LatinLowerU,
Code::LatinLowerV,
Code::LatinLowerW,
Code::LatinLowerX,
Code::LatinLowerY,
Code::LatinLowerZ,
Code::LatinCapitalA,
Code::LatinCapitalB,
Code::LatinCapitalC,
Code::LatinCapitalD,
Code::LatinCapitalE,
Code::LatinCapitalF,
Code::LatinCapitalG,
Code::LatinCapitalH,
Code::LatinCapitalI,
Code::LatinCapitalJ,
Code::LatinCapitalK,
Code::LatinCapitalL,
Code::LatinCapitalM,
Code::LatinCapitalN,
Code::LatinCapitalO,
Code::LatinCapitalP,
Code::LatinCapitalQ,
Code::LatinCapitalR,
Code::LatinCapitalS,
Code::LatinCapitalT,
];
#[test]
fn test_cea708mux_inputs_overflow_output() {
init();
static CODES: [Code; 40] = [
Code::LatinLowerA,
Code::LatinLowerB,
Code::LatinLowerC,
Code::LatinLowerD,
Code::LatinLowerE,
Code::LatinLowerF,
Code::LatinLowerG,
Code::LatinLowerH,
Code::LatinLowerI,
Code::LatinLowerJ,
Code::LatinLowerK,
Code::LatinLowerL,
Code::LatinLowerM,
Code::LatinLowerN,
Code::LatinLowerO,
Code::LatinLowerP,
Code::LatinLowerQ,
Code::LatinLowerR,
Code::LatinLowerS,
Code::LatinLowerT,
Code::LatinLowerU,
Code::LatinLowerV,
Code::LatinLowerW,
Code::LatinLowerX,
Code::LatinLowerY,
Code::LatinLowerZ,
Code::LatinCapitalA,
Code::LatinCapitalB,
Code::LatinCapitalC,
Code::LatinCapitalD,
Code::LatinCapitalE,
Code::LatinCapitalF,
Code::LatinCapitalG,
Code::LatinCapitalH,
Code::LatinCapitalI,
Code::LatinCapitalJ,
Code::LatinCapitalK,
Code::LatinCapitalL,
Code::LatinCapitalM,
Code::LatinCapitalN,
];
let mut h = gst_check::Harness::with_padnames("cea708mux", None, Some("src"));
let mut sinks = (0..10)
.map(|idx| {
@ -232,59 +253,11 @@ fn test_cea708mux_inputs_overflow_output() {
assert_eq!(&codes[..30], &CODES[no as usize..no as usize + 30]);
}
}
#[test]
fn test_cea708mux_inputs_overflow_output_new_service() {
init();
static CODES: [Code; 46] = [
Code::LatinLowerA,
Code::LatinLowerB,
Code::LatinLowerC,
Code::LatinLowerD,
Code::LatinLowerE,
Code::LatinLowerF,
Code::LatinLowerG,
Code::LatinLowerH,
Code::LatinLowerI,
Code::LatinLowerJ,
Code::LatinLowerK,
Code::LatinLowerL,
Code::LatinLowerM,
Code::LatinLowerN,
Code::LatinLowerO,
Code::LatinLowerP,
Code::LatinLowerQ,
Code::LatinLowerR,
Code::LatinLowerS,
Code::LatinLowerT,
Code::LatinLowerU,
Code::LatinLowerV,
Code::LatinLowerW,
Code::LatinLowerX,
Code::LatinLowerY,
Code::LatinLowerZ,
Code::LatinCapitalA,
Code::LatinCapitalB,
Code::LatinCapitalC,
Code::LatinCapitalD,
Code::LatinCapitalE,
Code::LatinCapitalF,
Code::LatinCapitalG,
Code::LatinCapitalH,
Code::LatinCapitalI,
Code::LatinCapitalJ,
Code::LatinCapitalK,
Code::LatinCapitalL,
Code::LatinCapitalM,
Code::LatinCapitalN,
Code::LatinCapitalO,
Code::LatinCapitalP,
Code::LatinCapitalQ,
Code::LatinCapitalR,
Code::LatinCapitalS,
Code::LatinCapitalT,
];
let mut h = gst_check::Harness::with_padnames("cea708mux", None, Some("src"));
let mut sinks = (0..6)
.map(|idx| {
@ -366,3 +339,94 @@ fn test_cea708mux_inputs_overflow_output_new_service() {
assert_eq!(&codes[30..60], &CODES[offset + 6..offset + 36]);
}
}
#[test]
fn test_cea708mux_output_overflow_max_time() {
init();
let mut h = gst_check::Harness::with_padnames("cea708mux", None, Some("src"));
h.element()
.unwrap()
.set_property("max-time", 100_000_000.nseconds());
let mut sinks = (0..6)
.map(|idx| {
let mut sink = gst_check::Harness::with_element(
&h.element().unwrap(),
Some(&format!("sink_{idx}")),
None,
);
sink.set_src_caps_str("closedcaption/x-cea-708,format=cc_data,framerate=60/1");
sink
})
.collect::<Vec<_>>();
let eos = gst::event::Eos::new();
for (i, sink) in sinks.iter_mut().enumerate() {
let buf = gen_cc_data(0, i as u8 + 1, &CODES[i..i + 30]);
sink.push(buf).unwrap();
}
for (i, sink) in sinks.iter_mut().enumerate() {
let buf = gen_cc_data_with_times(
0,
i as u8 + 1,
&CODES[i..i + 10],
gst::ClockTime::from_mseconds(400),
gst::ClockTime::from_mseconds(400),
);
sink.push(buf).unwrap();
sink.push_event(eos.clone());
}
let mut parser = CCDataParser::new();
let mut seen_services: HashMap<u8, Vec<Code>, _> = HashMap::new();
let mut parsed_packet = None;
while parsed_packet.is_none() {
let out = h.pull().unwrap();
let readable = out.map_readable().unwrap();
let mut cc_data = vec![0; 2];
cc_data[0] = 0x80 | 0x40 | ((readable.len() / 3) & 0x1f) as u8;
cc_data[1] = 0xFF;
cc_data.extend(readable.iter());
println!("pushed {cc_data:x?}");
parser.push(&cc_data).unwrap();
println!("parser: {parser:x?}");
parsed_packet = parser.pop_packet();
}
let parsed_packet = parsed_packet.unwrap();
println!("parsed: {parsed_packet:?}");
// the first (0) packet will be dropped
assert_eq!(parsed_packet.sequence_no(), 1);
let services = parsed_packet.services();
// TODO: deterministic service ordering?
for service in services {
assert!((1..=6).contains(&service.number()));
seen_services
.entry(service.number())
.and_modify(|entry| entry.extend(service.codes().iter().cloned()))
.or_insert(service.codes().to_vec());
}
for (service_no, codes) in seen_services.iter() {
println!(
"seen service: {service_no}, codes (len: {}): {codes:?}",
codes.len()
);
}
while let Ok(event) = h.pull_event() {
println!("have event: {event:?}");
if event.type_() == gst::EventType::Eos {
break;
}
}
let mut service_numbers = seen_services.keys().copied().collect::<Vec<_>>();
service_numbers.sort();
assert_eq!(service_numbers, (1..=6).collect::<Vec<_>>());
for (&no, codes) in seen_services.iter() {
println!("service {no}: {:?}", codes);
let offset = no as usize - 1;
assert_eq!(&codes[..10], &CODES[offset..offset + 10]);
}
}