From 9451c821aeaf0d796e0e1d6b3e558713da16eeaf Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Wed, 23 Jul 2025 00:45:24 +0200 Subject: [PATCH] cea708mux: expose "discarded-services" property on sink pads This can be useful when muxing in an original caption stream with a newly-created one (eg transcription / translation), in which case one might wish to discard select services from the original stream in order to avoid garbled captions. Part-of: --- docs/plugins/gst_plugins_cache.json | 18 ++- video/closedcaption/src/cea708mux/imp.rs | 164 ++++++++++++++++++++++- video/closedcaption/src/cea708mux/mod.rs | 2 +- 3 files changed, 178 insertions(+), 6 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 35541a484..a0f69b7f5 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -9103,6 +9103,9 @@ "GInitiallyUnowned", "GObject" ], + "interfaces": [ + "GstChildProxy" + ], "klass": "Muxer", "pad-templates": { "sink_%%u": { @@ -10065,7 +10068,20 @@ "GInitiallyUnowned", "GObject" ], - "kind": "object" + "kind": "object", + "properties": { + "discarded-services": { + "blurb": "List of services to discard", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "playing", + "readable": true, + "type": "GstValueArray", + "writable": true + } + } }, "GstSt2038AncMuxSinkPad": { "hierarchy": [ diff --git a/video/closedcaption/src/cea708mux/imp.rs b/video/closedcaption/src/cea708mux/imp.rs index 9591a4ea2..3efa98599 100644 --- a/video/closedcaption/src/cea708mux/imp.rs +++ b/video/closedcaption/src/cea708mux/imp.rs @@ -6,7 +6,7 @@ // // SPDX-License-Identifier: MPL-2.0 -use std::collections::{HashMap, VecDeque}; +use std::collections::{HashMap, HashSet, VecDeque}; use std::sync::Mutex; use std::time::Duration; @@ -240,9 +240,43 @@ impl AggregatorImpl for Cea708Mux { cc_data.extend(mapped.iter()); pad_state.ccp_parser.push(&cc_data).unwrap(); - if let Some(cea608) = pad_state.ccp_parser.cea608() { + let cea608 = pad_state + .ccp_parser + .cea608() + .map(|cea608| cea608.to_owned()); + + if let Some(cea608) = cea608 { for pair in cea608 { - state.writer.push_cea608(*pair); + if !pad_state.discarded_services.is_empty() { + if let (Ok(decoded), field) = match pair { + cea708_types::Cea608::Field1(a, b) => ( + pad_state.cea608_parsers[0].decode([a, b]), + cea608_types::tables::Field::ONE, + ), + cea708_types::Cea608::Field2(a, b) => ( + pad_state.cea608_parsers[1].decode([a, b]), + cea608_types::tables::Field::TWO, + ), + } { + if let Some(channel) = decoded.map(|d| d.channel()) { + let channel_id = + match cea608_types::Id::from_caption_field_channel( + field, channel, + ) { + cea608_types::Id::CC1 => -1, + cea608_types::Id::CC2 => -2, + cea608_types::Id::CC3 => -3, + cea608_types::Id::CC4 => -4, + }; + + if pad_state.discarded_services.contains(&channel_id) { + continue; + } + } + } + } + + state.writer.push_cea608(pair); } } } @@ -307,6 +341,13 @@ impl AggregatorImpl for Cea708Mux { continue; } + if pad_state + .discarded_services + .contains(&(service.number() as i32)) + { + continue; + } + let mut overflowed = false; for code in service.codes() { gst::trace!( @@ -656,6 +697,22 @@ impl ElementImpl for Cea708Mux { Ok(ret) } + + fn request_new_pad( + &self, + templ: &gst::PadTemplate, + name: Option<&str>, + caps: Option<&gst::Caps>, + ) -> Option { + let ret = self.parent_request_new_pad(templ, name, caps); + + if let Some(ref ret) = ret { + self.obj() + .child_added(ret.upcast_ref::(), &ret.name()); + } + + ret + } } impl GstObjectImpl for Cea708Mux {} @@ -709,17 +766,41 @@ impl ObjectImpl for Cea708Mux { } } +impl ChildProxyImpl for Cea708Mux { + fn child_by_index(&self, index: u32) -> Option { + self.obj() + .pads() + .into_iter() + .nth(index as usize) + .map(|p| p.upcast()) + } + + fn children_count(&self) -> u32 { + let object = self.obj(); + object.num_pads() as u32 + } + + fn child_by_name(&self, name: &str) -> Option { + let ret = self.obj().static_pad(name).map(|pad| pad.upcast()); + + ret + } +} + #[glib::object_subclass] impl ObjectSubclass for Cea708Mux { const NAME: &'static str = "GstCea708Mux"; type Type = super::Cea708Mux; type ParentType = gst_base::Aggregator; + type Interfaces = (gst::ChildProxy,); } struct PadState { format: CeaFormat, ccp_parser: CCDataParser, pending_buffer: Option, + discarded_services: HashSet, + cea608_parsers: [cea608_types::Cea608State; 2], } impl Default for PadState { @@ -730,13 +811,24 @@ impl Default for PadState { format: CeaFormat::default(), ccp_parser, pending_buffer: None, + discarded_services: HashSet::new(), + cea608_parsers: [ + cea608_types::Cea608State::default(), + cea608_types::Cea608State::default(), + ], } } } +#[derive(Default)] +struct PadSettings { + discarded_services: gst::Array, +} + #[derive(Default)] pub struct Cea708MuxSinkPad { pad_state: Mutex, + pad_settings: Mutex, } impl Cea708MuxSinkPad {} @@ -756,7 +848,71 @@ impl PadImpl for Cea708MuxSinkPad {} impl GstObjectImpl for Cea708MuxSinkPad {} -impl ObjectImpl for Cea708MuxSinkPad {} +impl ObjectImpl for Cea708MuxSinkPad { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: LazyLock> = LazyLock::new(|| { + vec![ + /** + * GstCea708MuxSinkPad:discarded-services: + * + * Configure a list of services for discarding. + * + * Each value in the array is an integer with the following meaning: + * + * Negative values in the range [-4, -1] correspond to CEA-608 caption service + * numbers. + * + * Positive values in the range [1, 63] correspond to CEA-708 caption service + * numbers. + */ + gst::ParamSpecArray::builder("discarded-services") + .nick("Discarded Services") + .blurb("List of services to discard") + .element_spec(&glib::ParamSpecInt::builder("service").minimum(-4).maximum(63).build()) + .mutable_playing() + .build()] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "discarded-services" => { + let s: gst::Array = value.get().expect("type checked upstream"); + let mut discarded_services = HashSet::new(); + for entry in s.iter() { + let Ok(integer) = entry.get::() else { + gst::warning!(CAT, "list member is not an integer"); + continue; + }; + + if (1..=63).contains(&integer) || (-4..0).contains(&integer) { + discarded_services.insert(integer); + } else { + gst::warning!(CAT, "invalid service / channel {integer} id provided"); + } + } + + let mut state = self.pad_state.lock().unwrap(); + let mut settings = self.pad_settings.lock().unwrap(); + settings.discarded_services = s; + state.discarded_services = discarded_services; + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "discarded-services" => { + let settings = self.pad_settings.lock().unwrap(); + settings.discarded_services.to_value() + } + _ => unimplemented!(), + } + } +} #[glib::object_subclass] impl ObjectSubclass for Cea708MuxSinkPad { diff --git a/video/closedcaption/src/cea708mux/mod.rs b/video/closedcaption/src/cea708mux/mod.rs index 317ff5d17..0701d1b15 100644 --- a/video/closedcaption/src/cea708mux/mod.rs +++ b/video/closedcaption/src/cea708mux/mod.rs @@ -12,7 +12,7 @@ use gst::prelude::*; mod imp; glib::wrapper! { - pub struct Cea708Mux(ObjectSubclass) @extends gst_base::Aggregator, gst::Element, gst::Object; + pub struct Cea708Mux(ObjectSubclass) @extends gst_base::Aggregator, gst::Element, gst::Object, @implements gst::ChildProxy; } glib::wrapper! {