diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 6a82fb578..ae7e4c91d 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -8715,6 +8715,20 @@ "readable": true, "type": "gboolean", "writable": true + }, + "max-time": { + "blurb": "Maximum amount of time that captions can be stored before output", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "18446744073709551615", + "max": "18446744073709551615", + "min": "0", + "mutable": "null", + "readable": true, + "type": "guint64", + "writable": true } }, "rank": "none" diff --git a/video/closedcaption/src/cea708mux/imp.rs b/video/closedcaption/src/cea708mux/imp.rs index ecd76329f..7da7c738d 100644 --- a/video/closedcaption/src/cea708mux/imp.rs +++ b/video/closedcaption/src/cea708mux/imp.rs @@ -109,9 +109,15 @@ impl Default for State { } } +#[derive(Clone, Debug, Default)] +struct Settings { + max_time: Option, +} + #[derive(Default)] pub struct Cea708Mux { state: Mutex, + settings: Mutex, } pub(crate) static CAT: LazyLock = LazyLock::new(|| { @@ -351,6 +357,49 @@ impl AggregatorImpl for Cea708Mux { state.writer.push_packet(output); let _ = state.writer.write(fps, &mut data); state.n_frames += 1; + + let settings = self.settings.lock().unwrap().clone(); + if let Some(max_time) = settings.max_time { + let written_buffer_time = gst::ClockTime::from_nseconds( + state + .writer + .buffered_cea608_field1_duration() + .max(state.writer.buffered_cea608_field2_duration()) + .max(state.writer.buffered_packet_duration()) + .as_nanos() as u64, + ); + let max_pending_code_bytes = state + .pending_services + .values() + .map(|codes| codes.iter().map(|code| code.byte_len()).sum()) + .max() + .unwrap_or(0); + let max_pending_code_time = gst::ClockTime::from_useconds( + (max_pending_code_bytes.div_ceil(2) as u64) + .mul_div_ceil(2 * 1001 * 1_000_000, 9_600_000 / 8) + .unwrap_or(0), + ); + + if written_buffer_time + max_pending_code_time > max_time { + gst::warning!( + CAT, + imp = self, + "Stored data of {} has overrun the configured limit of {}, flushing", + written_buffer_time.display(), + max_time.display() + ); + state.writer.flush(); + state.pending_services.clear(); + + for pad in sinkpads.iter().map(|pad| { + pad.downcast_ref::() + .expect("Not a Cea708MuxSinkPad?!") + }) { + let mut pad_state = pad.imp().pad_state.lock().unwrap(); + pad_state.ccp_parser.flush(); + } + } + } drop(state); // remove 2 byte header that our cc_data format does not use @@ -613,12 +662,21 @@ impl GstObjectImpl for Cea708Mux {} impl ObjectImpl for Cea708Mux { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: LazyLock> = LazyLock::new(|| { - vec![glib::ParamSpecBoolean::builder("force-live") - .nick("Force live") - .blurb("Always operate in live mode and aggregate on timeout") - .default_value(DEFAULT_FORCE_LIVE) - .construct_only() - .build()] + vec![ + glib::ParamSpecBoolean::builder("force-live") + .nick("Force live") + .blurb("Always operate in live mode and aggregate on timeout") + .default_value(DEFAULT_FORCE_LIVE) + .construct_only() + .build(), + glib::ParamSpecUInt64::builder("max-time") + .nick("Max Time") + .blurb("Maximum amount of time that captions can be stored before output") + .minimum(0) + .maximum(u64::MAX) + .default_value(u64::MAX) + .build(), + ] }); PROPERTIES.as_ref() @@ -630,6 +688,10 @@ impl ObjectImpl for Cea708Mux { self.obj() .set_force_live(value.get().expect("type checked upstream")); } + "max-time" => { + let mut settings = self.settings.lock().unwrap(); + settings.max_time = value.get().expect("Type checked upstream"); + } _ => unimplemented!(), } } @@ -637,6 +699,10 @@ impl ObjectImpl for Cea708Mux { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { "force-live" => self.obj().is_force_live().to_value(), + "max-time" => { + let settings = self.settings.lock().unwrap(); + settings.max_time.to_value() + } _ => unimplemented!(), } } diff --git a/video/closedcaption/tests/cea708mux.rs b/video/closedcaption/tests/cea708mux.rs index a73bb0a12..8b048ffea 100644 --- a/video/closedcaption/tests/cea708mux.rs +++ b/video/closedcaption/tests/cea708mux.rs @@ -23,8 +23,23 @@ fn init() { gstrsclosedcaption::plugin_register_static().unwrap(); }); } - fn gen_cc_data(seq: u8, service_no: u8, codes: &[Code]) -> gst::Buffer { + gen_cc_data_with_times( + seq, + service_no, + codes, + 0.nseconds(), + gst::ClockTime::from_mseconds(400), + ) +} + +fn gen_cc_data_with_times( + seq: u8, + service_no: u8, + codes: &[Code], + pts: gst::ClockTime, + duration: gst::ClockTime, +) -> gst::Buffer { assert!(seq < 4); assert!(service_no < 64); @@ -44,8 +59,8 @@ fn gen_cc_data(seq: u8, service_no: u8, codes: &[Code]) -> gst::Buffer { let mut buf = gst::Buffer::from_mut_slice(data); { let buf = buf.get_mut().unwrap(); - buf.set_pts(0.nseconds()); - buf.set_duration(gst::ClockTime::from_mseconds(400)); + buf.set_pts(pts); + buf.set_duration(duration); } buf } @@ -133,53 +148,59 @@ fn test_cea708mux_2pads_cc_data() { } } +static CODES: [Code; 46] = [ + Code::LatinLowerA, + Code::LatinLowerB, + Code::LatinLowerC, + Code::LatinLowerD, + Code::LatinLowerE, + Code::LatinLowerF, + Code::LatinLowerG, + Code::LatinLowerH, + Code::LatinLowerI, + Code::LatinLowerJ, + Code::LatinLowerK, + Code::LatinLowerL, + Code::LatinLowerM, + Code::LatinLowerN, + Code::LatinLowerO, + Code::LatinLowerP, + Code::LatinLowerQ, + Code::LatinLowerR, + Code::LatinLowerS, + Code::LatinLowerT, + Code::LatinLowerU, + Code::LatinLowerV, + Code::LatinLowerW, + Code::LatinLowerX, + Code::LatinLowerY, + Code::LatinLowerZ, + Code::LatinCapitalA, + Code::LatinCapitalB, + Code::LatinCapitalC, + Code::LatinCapitalD, + Code::LatinCapitalE, + Code::LatinCapitalF, + Code::LatinCapitalG, + Code::LatinCapitalH, + Code::LatinCapitalI, + Code::LatinCapitalJ, + Code::LatinCapitalK, + Code::LatinCapitalL, + Code::LatinCapitalM, + Code::LatinCapitalN, + Code::LatinCapitalO, + Code::LatinCapitalP, + Code::LatinCapitalQ, + Code::LatinCapitalR, + Code::LatinCapitalS, + Code::LatinCapitalT, +]; + #[test] fn test_cea708mux_inputs_overflow_output() { init(); - static CODES: [Code; 40] = [ - Code::LatinLowerA, - Code::LatinLowerB, - Code::LatinLowerC, - Code::LatinLowerD, - Code::LatinLowerE, - Code::LatinLowerF, - Code::LatinLowerG, - Code::LatinLowerH, - Code::LatinLowerI, - Code::LatinLowerJ, - Code::LatinLowerK, - Code::LatinLowerL, - Code::LatinLowerM, - Code::LatinLowerN, - Code::LatinLowerO, - Code::LatinLowerP, - Code::LatinLowerQ, - Code::LatinLowerR, - Code::LatinLowerS, - Code::LatinLowerT, - Code::LatinLowerU, - Code::LatinLowerV, - Code::LatinLowerW, - Code::LatinLowerX, - Code::LatinLowerY, - Code::LatinLowerZ, - Code::LatinCapitalA, - Code::LatinCapitalB, - Code::LatinCapitalC, - Code::LatinCapitalD, - Code::LatinCapitalE, - Code::LatinCapitalF, - Code::LatinCapitalG, - Code::LatinCapitalH, - Code::LatinCapitalI, - Code::LatinCapitalJ, - Code::LatinCapitalK, - Code::LatinCapitalL, - Code::LatinCapitalM, - Code::LatinCapitalN, - ]; - let mut h = gst_check::Harness::with_padnames("cea708mux", None, Some("src")); let mut sinks = (0..10) .map(|idx| { @@ -232,59 +253,11 @@ fn test_cea708mux_inputs_overflow_output() { assert_eq!(&codes[..30], &CODES[no as usize..no as usize + 30]); } } + #[test] fn test_cea708mux_inputs_overflow_output_new_service() { init(); - static CODES: [Code; 46] = [ - Code::LatinLowerA, - Code::LatinLowerB, - Code::LatinLowerC, - Code::LatinLowerD, - Code::LatinLowerE, - Code::LatinLowerF, - Code::LatinLowerG, - Code::LatinLowerH, - Code::LatinLowerI, - Code::LatinLowerJ, - Code::LatinLowerK, - Code::LatinLowerL, - Code::LatinLowerM, - Code::LatinLowerN, - Code::LatinLowerO, - Code::LatinLowerP, - Code::LatinLowerQ, - Code::LatinLowerR, - Code::LatinLowerS, - Code::LatinLowerT, - Code::LatinLowerU, - Code::LatinLowerV, - Code::LatinLowerW, - Code::LatinLowerX, - Code::LatinLowerY, - Code::LatinLowerZ, - Code::LatinCapitalA, - Code::LatinCapitalB, - Code::LatinCapitalC, - Code::LatinCapitalD, - Code::LatinCapitalE, - Code::LatinCapitalF, - Code::LatinCapitalG, - Code::LatinCapitalH, - Code::LatinCapitalI, - Code::LatinCapitalJ, - Code::LatinCapitalK, - Code::LatinCapitalL, - Code::LatinCapitalM, - Code::LatinCapitalN, - Code::LatinCapitalO, - Code::LatinCapitalP, - Code::LatinCapitalQ, - Code::LatinCapitalR, - Code::LatinCapitalS, - Code::LatinCapitalT, - ]; - let mut h = gst_check::Harness::with_padnames("cea708mux", None, Some("src")); let mut sinks = (0..6) .map(|idx| { @@ -366,3 +339,94 @@ fn test_cea708mux_inputs_overflow_output_new_service() { assert_eq!(&codes[30..60], &CODES[offset + 6..offset + 36]); } } + +#[test] +fn test_cea708mux_output_overflow_max_time() { + init(); + + let mut h = gst_check::Harness::with_padnames("cea708mux", None, Some("src")); + h.element() + .unwrap() + .set_property("max-time", 100_000_000.nseconds()); + let mut sinks = (0..6) + .map(|idx| { + let mut sink = gst_check::Harness::with_element( + &h.element().unwrap(), + Some(&format!("sink_{idx}")), + None, + ); + sink.set_src_caps_str("closedcaption/x-cea-708,format=cc_data,framerate=60/1"); + sink + }) + .collect::>(); + + let eos = gst::event::Eos::new(); + + for (i, sink) in sinks.iter_mut().enumerate() { + let buf = gen_cc_data(0, i as u8 + 1, &CODES[i..i + 30]); + sink.push(buf).unwrap(); + } + + for (i, sink) in sinks.iter_mut().enumerate() { + let buf = gen_cc_data_with_times( + 0, + i as u8 + 1, + &CODES[i..i + 10], + gst::ClockTime::from_mseconds(400), + gst::ClockTime::from_mseconds(400), + ); + sink.push(buf).unwrap(); + sink.push_event(eos.clone()); + } + + let mut parser = CCDataParser::new(); + let mut seen_services: HashMap, _> = HashMap::new(); + let mut parsed_packet = None; + while parsed_packet.is_none() { + let out = h.pull().unwrap(); + let readable = out.map_readable().unwrap(); + let mut cc_data = vec![0; 2]; + cc_data[0] = 0x80 | 0x40 | ((readable.len() / 3) & 0x1f) as u8; + cc_data[1] = 0xFF; + cc_data.extend(readable.iter()); + println!("pushed {cc_data:x?}"); + parser.push(&cc_data).unwrap(); + println!("parser: {parser:x?}"); + parsed_packet = parser.pop_packet(); + } + let parsed_packet = parsed_packet.unwrap(); + println!("parsed: {parsed_packet:?}"); + // the first (0) packet will be dropped + assert_eq!(parsed_packet.sequence_no(), 1); + let services = parsed_packet.services(); + // TODO: deterministic service ordering? + for service in services { + assert!((1..=6).contains(&service.number())); + seen_services + .entry(service.number()) + .and_modify(|entry| entry.extend(service.codes().iter().cloned())) + .or_insert(service.codes().to_vec()); + } + for (service_no, codes) in seen_services.iter() { + println!( + "seen service: {service_no}, codes (len: {}): {codes:?}", + codes.len() + ); + } + + while let Ok(event) = h.pull_event() { + println!("have event: {event:?}"); + if event.type_() == gst::EventType::Eos { + break; + } + } + + let mut service_numbers = seen_services.keys().copied().collect::>(); + service_numbers.sort(); + assert_eq!(service_numbers, (1..=6).collect::>()); + for (&no, codes) in seen_services.iter() { + println!("service {no}: {:?}", codes); + let offset = no as usize - 1; + assert_eq!(&codes[..10], &CODES[offset..offset + 10]); + } +}