cea708mux: expose "discarded-services" property on sink pads

This can be useful when muxing in an original caption stream with
a newly-created one (eg transcription / translation), in which case one
might wish to discard select services from the original stream in order
to avoid garbled captions.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2389>
This commit is contained in:
Mathieu Duponchelle 2025-07-23 00:45:24 +02:00 committed by GStreamer Marge Bot
parent 486a373dcb
commit 9451c821ae
3 changed files with 178 additions and 6 deletions

View file

@ -9103,6 +9103,9 @@
"GInitiallyUnowned",
"GObject"
],
"interfaces": [
"GstChildProxy"
],
"klass": "Muxer",
"pad-templates": {
"sink_%%u": {
@ -10065,7 +10068,20 @@
"GInitiallyUnowned",
"GObject"
],
"kind": "object"
"kind": "object",
"properties": {
"discarded-services": {
"blurb": "List of services to discard",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"mutable": "playing",
"readable": true,
"type": "GstValueArray",
"writable": true
}
}
},
"GstSt2038AncMuxSinkPad": {
"hierarchy": [

View file

@ -6,7 +6,7 @@
//
// SPDX-License-Identifier: MPL-2.0
use std::collections::{HashMap, VecDeque};
use std::collections::{HashMap, HashSet, VecDeque};
use std::sync::Mutex;
use std::time::Duration;
@ -240,9 +240,43 @@ impl AggregatorImpl for Cea708Mux {
cc_data.extend(mapped.iter());
pad_state.ccp_parser.push(&cc_data).unwrap();
if let Some(cea608) = pad_state.ccp_parser.cea608() {
let cea608 = pad_state
.ccp_parser
.cea608()
.map(|cea608| cea608.to_owned());
if let Some(cea608) = cea608 {
for pair in cea608 {
state.writer.push_cea608(*pair);
if !pad_state.discarded_services.is_empty() {
if let (Ok(decoded), field) = match pair {
cea708_types::Cea608::Field1(a, b) => (
pad_state.cea608_parsers[0].decode([a, b]),
cea608_types::tables::Field::ONE,
),
cea708_types::Cea608::Field2(a, b) => (
pad_state.cea608_parsers[1].decode([a, b]),
cea608_types::tables::Field::TWO,
),
} {
if let Some(channel) = decoded.map(|d| d.channel()) {
let channel_id =
match cea608_types::Id::from_caption_field_channel(
field, channel,
) {
cea608_types::Id::CC1 => -1,
cea608_types::Id::CC2 => -2,
cea608_types::Id::CC3 => -3,
cea608_types::Id::CC4 => -4,
};
if pad_state.discarded_services.contains(&channel_id) {
continue;
}
}
}
}
state.writer.push_cea608(pair);
}
}
}
@ -307,6 +341,13 @@ impl AggregatorImpl for Cea708Mux {
continue;
}
if pad_state
.discarded_services
.contains(&(service.number() as i32))
{
continue;
}
let mut overflowed = false;
for code in service.codes() {
gst::trace!(
@ -656,6 +697,22 @@ impl ElementImpl for Cea708Mux {
Ok(ret)
}
fn request_new_pad(
&self,
templ: &gst::PadTemplate,
name: Option<&str>,
caps: Option<&gst::Caps>,
) -> Option<gst::Pad> {
let ret = self.parent_request_new_pad(templ, name, caps);
if let Some(ref ret) = ret {
self.obj()
.child_added(ret.upcast_ref::<gst::Object>(), &ret.name());
}
ret
}
}
impl GstObjectImpl for Cea708Mux {}
@ -709,17 +766,41 @@ impl ObjectImpl for Cea708Mux {
}
}
impl ChildProxyImpl for Cea708Mux {
fn child_by_index(&self, index: u32) -> Option<glib::Object> {
self.obj()
.pads()
.into_iter()
.nth(index as usize)
.map(|p| p.upcast())
}
fn children_count(&self) -> u32 {
let object = self.obj();
object.num_pads() as u32
}
fn child_by_name(&self, name: &str) -> Option<glib::Object> {
let ret = self.obj().static_pad(name).map(|pad| pad.upcast());
ret
}
}
#[glib::object_subclass]
impl ObjectSubclass for Cea708Mux {
const NAME: &'static str = "GstCea708Mux";
type Type = super::Cea708Mux;
type ParentType = gst_base::Aggregator;
type Interfaces = (gst::ChildProxy,);
}
struct PadState {
format: CeaFormat,
ccp_parser: CCDataParser,
pending_buffer: Option<gst::Buffer>,
discarded_services: HashSet<i32>,
cea608_parsers: [cea608_types::Cea608State; 2],
}
impl Default for PadState {
@ -730,13 +811,24 @@ impl Default for PadState {
format: CeaFormat::default(),
ccp_parser,
pending_buffer: None,
discarded_services: HashSet::new(),
cea608_parsers: [
cea608_types::Cea608State::default(),
cea608_types::Cea608State::default(),
],
}
}
}
#[derive(Default)]
struct PadSettings {
discarded_services: gst::Array,
}
#[derive(Default)]
pub struct Cea708MuxSinkPad {
pad_state: Mutex<PadState>,
pad_settings: Mutex<PadSettings>,
}
impl Cea708MuxSinkPad {}
@ -756,7 +848,71 @@ impl PadImpl for Cea708MuxSinkPad {}
impl GstObjectImpl for Cea708MuxSinkPad {}
impl ObjectImpl for Cea708MuxSinkPad {}
impl ObjectImpl for Cea708MuxSinkPad {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
vec![
/**
* GstCea708MuxSinkPad:discarded-services:
*
* Configure a list of services for discarding.
*
* Each value in the array is an integer with the following meaning:
*
* Negative values in the range [-4, -1] correspond to CEA-608 caption service
* numbers.
*
* Positive values in the range [1, 63] correspond to CEA-708 caption service
* numbers.
*/
gst::ParamSpecArray::builder("discarded-services")
.nick("Discarded Services")
.blurb("List of services to discard")
.element_spec(&glib::ParamSpecInt::builder("service").minimum(-4).maximum(63).build())
.mutable_playing()
.build()]
});
PROPERTIES.as_ref()
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"discarded-services" => {
let s: gst::Array = value.get().expect("type checked upstream");
let mut discarded_services = HashSet::new();
for entry in s.iter() {
let Ok(integer) = entry.get::<i32>() else {
gst::warning!(CAT, "list member is not an integer");
continue;
};
if (1..=63).contains(&integer) || (-4..0).contains(&integer) {
discarded_services.insert(integer);
} else {
gst::warning!(CAT, "invalid service / channel {integer} id provided");
}
}
let mut state = self.pad_state.lock().unwrap();
let mut settings = self.pad_settings.lock().unwrap();
settings.discarded_services = s;
state.discarded_services = discarded_services;
}
_ => unimplemented!(),
}
}
fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"discarded-services" => {
let settings = self.pad_settings.lock().unwrap();
settings.discarded_services.to_value()
}
_ => unimplemented!(),
}
}
}
#[glib::object_subclass]
impl ObjectSubclass for Cea708MuxSinkPad {

View file

@ -12,7 +12,7 @@ use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct Cea708Mux(ObjectSubclass<imp::Cea708Mux>) @extends gst_base::Aggregator, gst::Element, gst::Object;
pub struct Cea708Mux(ObjectSubclass<imp::Cea708Mux>) @extends gst_base::Aggregator, gst::Element, gst::Object, @implements gst::ChildProxy;
}
glib::wrapper! {