diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 07f62fa47..7d37daed6 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -9669,7 +9669,7 @@ "construct-only": false, "controllable": false, "default": "fr-FR", - "mutable": "ready", + "mutable": "null", "readable": true, "type": "gchararray", "writable": true diff --git a/video/closedcaption/src/transcriberbin/imp.rs b/video/closedcaption/src/transcriberbin/imp.rs index 86944b2ca..d89a09f37 100644 --- a/video/closedcaption/src/transcriberbin/imp.rs +++ b/video/closedcaption/src/transcriberbin/imp.rs @@ -12,7 +12,7 @@ use anyhow::{anyhow, Error}; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Mutex; use std::sync::LazyLock; @@ -53,17 +53,82 @@ enum PassthroughState { Disabled, } +#[derive(Debug)] +enum CaptionChannelUpdate { + Upsert(HashSet), + Remove, +} + +#[derive(Debug)] +enum CustomChannelUpdate { + Upsert(String), + Remove, +} + +fn parse_language_pair( + mux_method: MuxMethod, + key: &str, + value: &gst::glib::Value, +) -> Result<(String, HashSet), Error> { + let lowercased_key = key.to_lowercase(); + Ok(match mux_method { + MuxMethod::Cea608 => { + if ["cc1", "cc3"].contains(&lowercased_key.as_str()) { + ( + value.get::()?, + HashSet::from([lowercased_key.to_string()]), + ) + } else if let Ok(caption_stream) = value.get::() { + if !["cc1", "cc3"].contains(&caption_stream.as_str()) { + anyhow::bail!( + "Unknown 608 channel {}, valid values are cc1, cc3", + caption_stream + ); + } + (key.to_string(), HashSet::from([caption_stream])) + } else { + anyhow::bail!("Unknown 608 channel/language {}", key); + } + } + MuxMethod::Cea708 => { + if let Ok(caption_stream) = value.get::() { + (key.to_string(), HashSet::from([caption_stream])) + } else if let Ok(caption_streams) = value.get::() { + let mut streams = HashSet::new(); + for s in caption_streams.iter() { + let service = s.get::()?; + if ["cc1", "cc3"].contains(&service.as_str()) || service.starts_with("708_") { + streams.insert(service); + } else { + anyhow::bail!( + "Unknown 708 service {}, valid values are cc1, cc3 or 708_*", + key + ); + } + } + (key.to_string(), streams) + } else { + anyhow::bail!("Unknown 708 translation language field {}", key); + } + } + }) +} + /* One per language, including original */ -struct TranscriptionChannel { +#[derive(Clone)] +struct CaptionChannel { bin: gst::Bin, textwrap: gst::Element, tttoceax08: gst::Element, language: String, ccmux_pad_name: String, cccapsfilter: gst::Element, + caption_streams: HashSet, } +#[derive(Clone)] struct CustomOutputChannel { + bin_description: String, bin: gst::Bin, language: String, latency: gst::ClockTime, @@ -237,7 +302,7 @@ impl TranscriberBin { ) -> Result<(), Error> { for (language, bin) in itertools::chain!( pad_state - .transcription_channels + .caption_channels .values() .map(|c| (c.language.as_str(), c.bin.clone())), pad_state @@ -261,7 +326,7 @@ impl TranscriberBin { pad_state: &TranscriberSinkPadState, pad_settings: &TranscriberSinkPadSettings, ) -> Result<(), Error> { - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { pad_state.link_transcription_channel(channel, state, pad_settings.passthrough)?; } @@ -269,7 +334,13 @@ impl TranscriberBin { pad_state.synthesis_channels.values(), pad_state.subtitle_channels.values() ) { - pad_state.expose_custom_output_pads(&self.obj(), state, channel)?; + self.expose_custom_output_pads( + &pad_state.transcription_bin, + &state.transcription_bin, + &state.internal_bin, + pad_state.serial, + channel, + )?; } Ok(()) @@ -310,6 +381,7 @@ impl TranscriberBin { bin.add_pad(&srcpad)?; Ok(CustomOutputChannel { + bin_description: bin_description.to_string(), bin, language: String::from(lang), latency, @@ -346,6 +418,7 @@ impl TranscriberBin { bin.add_pad(&srcpad)?; Ok(CustomOutputChannel { + bin_description: bin_description.to_string(), bin, language: String::from(lang), latency, @@ -357,8 +430,8 @@ impl TranscriberBin { &self, lang: &str, mux_method: MuxMethod, - caption_streams: Vec, - ) -> Result { + caption_streams: HashSet, + ) -> Result { let bin = gst::Bin::new(); let queue = gst::ElementFactory::make("queue").build()?; let textwrap = gst::ElementFactory::make("textwrap").build()?; @@ -369,7 +442,7 @@ impl TranscriberBin { } ( gst::ElementFactory::make("tttocea608").build()?, - caption_streams[0].clone(), + caption_streams.iter().next().unwrap().clone(), ) } MuxMethod::Cea708 => { @@ -447,13 +520,14 @@ impl TranscriberBin { bin.add_pad(&sinkpad)?; bin.add_pad(&srcpad)?; - Ok(TranscriptionChannel { + Ok(CaptionChannel { bin, textwrap, tttoceax08, language: String::from(lang), ccmux_pad_name, cccapsfilter, + caption_streams, }) } @@ -712,7 +786,7 @@ impl TranscriberBin { for pad in state.audio_sink_pads.values() { let ps = pad.imp().state.lock().unwrap(); if let Ok(pad_state) = ps.as_ref() { - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { channel.cccapsfilter.set_property("caps", &caps); } } @@ -834,7 +908,7 @@ impl TranscriberBin { pad_state.audio_tee.release_request_pad(&audio_tee_pad); } - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { let srcpad = pad_state .transcription_bin .static_pad(&format!("src_{}", channel.language)) @@ -869,7 +943,7 @@ impl TranscriberBin { gst::debug!(CAT, imp = sinkpad, "enabling transcription bin"); - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { let srcpad = pad_state .transcription_bin .static_pad(&format!("src_{}", channel.language)) @@ -1014,7 +1088,7 @@ impl TranscriberBin { pad ); - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { match mux_method { MuxMethod::Cea608 => channel.tttoceax08.set_property("mode", mode), MuxMethod::Cea708 => match mode { @@ -1081,7 +1155,7 @@ impl TranscriberBin { pad_state: &mut TranscriberSinkPadState, pad_settings: &TranscriberSinkPadSettings, ) -> Result<(), Error> { - self.construct_transcription_channels(pad_settings, mux_method, pad_state) + self.construct_caption_channels(pad_settings, mux_method, pad_state) .unwrap(); self.construct_synthesis_channels(accumulate_time, pad_settings, pad_state) @@ -1091,7 +1165,7 @@ impl TranscriberBin { .unwrap(); for k in pad_state - .transcription_channels + .caption_channels .keys() .chain(pad_state.synthesis_channels.keys()) .chain(pad_state.subtitle_channels.keys()) @@ -1145,11 +1219,11 @@ impl TranscriberBin { ) -> Result<(), Error> { pad_state.unlink_language_tees(self.obj().as_ref(), state, pad_state); - self.tear_down_transcription_channels(state, pad_state)?; + self.tear_down_caption_channels(state, pad_state)?; - self.tear_down_synthesis_channels(&self.obj(), state, pad_state)?; + self.tear_down_synthesis_channels(state, pad_state)?; - self.tear_down_subtitle_channels(&self.obj(), state, pad_state)?; + self.tear_down_subtitle_channels(state, pad_state)?; for (language, tee) in pad_state.language_tees.drain() { if let Some(filter) = pad_state.language_filters.remove(&language) { @@ -1168,41 +1242,102 @@ impl TranscriberBin { fn tear_down_custom_output_channel( &self, - topbin: &super::TranscriberBin, channel: CustomOutputChannel, - state: &State, - pad_state: &mut TranscriberSinkPadState, + pad_transcription_bin: &gst::Bin, + transcription_bin: &gst::Bin, + internal_bin: &gst::Bin, + serial: Option, ) -> Result<(), Error> { let mut pad_name = format!("src_{}_{}", channel.suffix, channel.language); - let srcpad = pad_state.transcription_bin.static_pad(&pad_name).unwrap(); + let srcpad = pad_transcription_bin.static_pad(&pad_name).unwrap(); - let _ = pad_state.transcription_bin.remove_pad(&srcpad); + let _ = pad_transcription_bin.remove_pad(&srcpad); - pad_state.transcription_bin.remove(&channel.bin)?; + let channel_sinkpad = channel.bin.static_pad("sink").unwrap(); - if let Some(serial) = pad_state.serial { + if let Some(tee_srcpad) = channel_sinkpad.peer() { + let _ = tee_srcpad.unlink(&channel_sinkpad); + if let Some(tee) = tee_srcpad + .parent() + .and_then(|p| p.downcast::().ok()) + { + let _ = tee.remove_pad(&tee_srcpad); + } + } + + pad_transcription_bin.remove(&channel.bin)?; + + let _ = channel.bin.set_state(gst::State::Null); + + if let Some(serial) = serial { pad_name = format!("{}_{}", pad_name, serial); } - let srcpad = state.transcription_bin.static_pad(&pad_name).unwrap(); - let _ = state.transcription_bin.remove_pad(&srcpad); + let srcpad = transcription_bin.static_pad(&pad_name).unwrap(); + let _ = transcription_bin.remove_pad(&srcpad); - let srcpad = state.internal_bin.static_pad(&pad_name).unwrap(); - let _ = state.internal_bin.remove_pad(&srcpad); - let srcpad = topbin.static_pad(&pad_name).unwrap(); - let _ = topbin.remove_pad(&srcpad); + let srcpad = internal_bin.static_pad(&pad_name).unwrap(); + let _ = internal_bin.remove_pad(&srcpad); + let srcpad = self.obj().static_pad(&pad_name).unwrap(); + let _ = self.obj().remove_pad(&srcpad); + + Ok(()) + } + + fn tear_down_caption_channel( + &self, + channel: CaptionChannel, + pad_transcription_bin: &gst::Bin, + ccmux: &gst::Element, + ) -> Result<(), Error> { + let channel_sinkpad = channel.bin.static_pad("sink").unwrap(); + + if let Some(tee_srcpad) = channel_sinkpad.peer() { + let _ = tee_srcpad.unlink(&channel_sinkpad); + if let Some(tee) = tee_srcpad + .parent() + .and_then(|p| p.downcast::().ok()) + { + let _ = tee.remove_pad(&tee_srcpad); + } + } + + let srcpad = pad_transcription_bin + .static_pad(&format!("src_{}", channel.language)) + .unwrap(); + + if let Some(peer) = srcpad.peer() { + // The source pad might not have been linked to the muxer initially, for + // instance in case of a collision with another source pad's + // translation-languages mapping + if peer.parent().and_downcast_ref::() == Some(ccmux) { + srcpad.unlink(&peer)?; + ccmux.release_request_pad(&peer); + } + } + + let _ = pad_transcription_bin.remove_pad(&srcpad); + + pad_transcription_bin.remove(&channel.bin)?; + + let _ = channel.bin.set_state(gst::State::Null); Ok(()) } fn tear_down_synthesis_channels( &self, - topbin: &super::TranscriberBin, state: &State, pad_state: &mut TranscriberSinkPadState, ) -> Result<(), Error> { let channels: Vec<_> = pad_state.synthesis_channels.drain().collect(); for (_, channel) in channels { - self.tear_down_custom_output_channel(topbin, channel, state, pad_state)?; + self.tear_down_custom_output_channel( + channel, + &pad_state.transcription_bin, + &state.transcription_bin, + &state.internal_bin, + pad_state.serial, + )?; } Ok(()) @@ -1210,46 +1345,34 @@ impl TranscriberBin { fn tear_down_subtitle_channels( &self, - topbin: &super::TranscriberBin, state: &State, pad_state: &mut TranscriberSinkPadState, ) -> Result<(), Error> { let channels: Vec<_> = pad_state.subtitle_channels.drain().collect(); for (_, channel) in channels { - self.tear_down_custom_output_channel(topbin, channel, state, pad_state)?; + self.tear_down_custom_output_channel( + channel, + &pad_state.transcription_bin, + &state.transcription_bin, + &state.internal_bin, + pad_state.serial, + )?; } Ok(()) } - fn tear_down_transcription_channels( + fn tear_down_caption_channels( &self, state: &State, pad_state: &mut TranscriberSinkPadState, ) -> Result<(), Error> { - for channel in pad_state.transcription_channels.values() { - let srcpad = pad_state - .transcription_bin - .static_pad(&format!("src_{}", channel.language)) - .unwrap(); + let channels: Vec<_> = pad_state.caption_channels.drain().collect(); - if let Some(peer) = srcpad.peer() { - // The source pad might not have been linked to the muxer initially, for - // instance in case of a collision with another source pad's - // translation-languages mapping - if peer.parent().and_downcast_ref::() == Some(state.ccmux.as_ref()) { - srcpad.unlink(&peer)?; - state.ccmux.release_request_pad(&peer); - } - } - - let _ = pad_state.transcription_bin.remove_pad(&srcpad); - - pad_state.transcription_bin.remove(&channel.bin)?; + for (_, channel) in channels { + self.tear_down_caption_channel(channel, &pad_state.transcription_bin, &state.ccmux)?; } - pad_state.transcription_channels.clear(); - Ok(()) } @@ -1298,7 +1421,7 @@ impl TranscriberBin { Ok(()) } - fn construct_transcription_channels( + fn construct_caption_channels( &self, settings: &TranscriberSinkPadSettings, mux_method: MuxMethod, @@ -1306,46 +1429,9 @@ impl TranscriberBin { ) -> Result<(), Error> { if let Some(ref map) = settings.translation_languages { for (key, value) in map.iter() { - let lowercased_key = key.to_lowercase(); - let (language_code, caption_streams) = match mux_method { - MuxMethod::Cea608 => { - if ["cc1", "cc3"].contains(&lowercased_key.as_str()) { - (value.get::()?, vec![lowercased_key.to_string()]) - } else if let Ok(caption_stream) = value.get::() { - if !["cc1", "cc3"].contains(&caption_stream.as_str()) { - anyhow::bail!( - "Unknown 608 channel {}, valid values are cc1, cc3", - caption_stream - ); - } - (key.to_string(), vec![caption_stream]) - } else { - anyhow::bail!("Unknown 608 channel/language {}", key); - } - } - MuxMethod::Cea708 => { - if let Ok(caption_stream) = value.get::() { - (key.to_string(), vec![caption_stream]) - } else if let Ok(caption_streams) = value.get::() { - let mut streams = vec![]; - for s in caption_streams.iter() { - let service = s.get::()?; - if ["cc1", "cc3"].contains(&service.as_str()) - || service.starts_with("708_") - { - streams.push(service); - } else { - anyhow::bail!("Unknown 708 service {}, valid values are cc1, cc3 or 708_*", key); - } - } - (key.to_string(), streams) - } else { - anyhow::bail!("Unknown 708 translation language field {}", key); - } - } - }; + let (language_code, caption_streams) = parse_language_pair(mux_method, key, value)?; - pad_state.transcription_channels.insert( + pad_state.caption_channels.insert( language_code.to_owned(), self.construct_transcription_channel( &language_code, @@ -1356,22 +1442,596 @@ impl TranscriberBin { } } else { let caption_streams = match mux_method { - MuxMethod::Cea608 => vec!["cc1".to_string()], - MuxMethod::Cea708 => vec!["cc1".to_string(), "708_1".to_string()], + MuxMethod::Cea608 => HashSet::from(["cc1".to_string()]), + MuxMethod::Cea708 => HashSet::from(["cc1".to_string(), "708_1".to_string()]), }; - pad_state.transcription_channels.insert( + pad_state.caption_channels.insert( "transcript".to_string(), self.construct_transcription_channel("transcript", mux_method, caption_streams)?, ); } - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { pad_state.transcription_bin.add(&channel.bin)?; } Ok(()) } + fn prepare_caption_channel_updates( + &self, + state: &State, + pad_state: &mut TranscriberSinkPadState, + settings: &TranscriberSinkPadSettings, + ) -> Result<(Vec, Vec), Error> { + let mut updates = HashMap::new(); + let mut old_languages: HashSet = + pad_state.caption_channels.keys().cloned().collect(); + + let translation_languages = + settings + .translation_languages + .clone() + .unwrap_or(match state.mux_method { + MuxMethod::Cea608 => gst::Structure::builder("languages") + .field("transcript", "cc1") + .build(), + MuxMethod::Cea708 => gst::Structure::builder("languages") + .field( + "transcript", + gst::List::from_values([ + "cc1".to_string().to_send_value(), + "708_1".to_string().to_send_value(), + ]), + ) + .build(), + }); + + for (key, value) in translation_languages.iter() { + let (language_code, caption_streams) = + parse_language_pair(state.mux_method, key, value)?; + + old_languages.remove(&language_code); + + if let Some(channel) = pad_state.caption_channels.get(&language_code) { + if channel.caption_streams != caption_streams { + updates.insert(language_code, CaptionChannelUpdate::Upsert(caption_streams)); + } + } else { + updates.insert(language_code, CaptionChannelUpdate::Upsert(caption_streams)); + } + } + + for language_code in old_languages.drain() { + updates.insert(language_code, CaptionChannelUpdate::Remove); + } + + let mut channels_to_remove = vec![]; + let mut channels_to_add = vec![]; + + for (language_code, update) in updates { + match update { + CaptionChannelUpdate::Remove => { + let transcription_channel = + pad_state.caption_channels.remove(&language_code).unwrap(); + + channels_to_remove.push(transcription_channel); + + gst::debug!( + CAT, + imp = self, + "caption channel {language_code} will be removed" + ); + } + CaptionChannelUpdate::Upsert(caption_streams) => { + if let Some(transcription_channel) = + pad_state.caption_channels.remove(&language_code) + { + channels_to_remove.push(transcription_channel); + + gst::debug!( + CAT, + imp = self, + "caption channel {language_code} will be removed" + ); + } + + let transcription_channel = self.construct_transcription_channel( + &language_code, + state.mux_method, + caption_streams, + )?; + channels_to_add.push(transcription_channel.clone()); + pad_state + .caption_channels + .insert(language_code.clone(), transcription_channel); + + gst::debug!( + CAT, + imp = self, + "caption channel {language_code} will be added" + ); + } + } + } + + Ok((channels_to_remove, channels_to_add)) + } + + fn prepare_custom_output_channel_updates( + &self, + settings: &Settings, + languages: &Option, + channels_map: &mut HashMap, + channel_type_logname: &str, + ) -> Result<(Vec, Vec), Error> { + let mut updates = HashMap::new(); + let mut old_languages: HashSet = channels_map.keys().cloned().collect(); + + if let Some(ref map) = languages { + for (key, value) in map.iter() { + let language_code = key.to_string(); + + old_languages.remove(&language_code); + + let bin_description = value.get::()?; + if let Some(channel) = channels_map.get(&language_code) { + if channel.bin_description != bin_description { + updates.insert(language_code, CustomChannelUpdate::Upsert(bin_description)); + } + } else { + updates.insert(language_code, CustomChannelUpdate::Upsert(bin_description)); + } + } + } + + for language_code in old_languages.drain() { + updates.insert(language_code, CustomChannelUpdate::Remove); + } + + let mut channels_to_remove = vec![]; + let mut channels_to_add = vec![]; + + for (language_code, update) in updates { + match update { + CustomChannelUpdate::Remove => { + let synthesis_channel = channels_map.remove(&language_code).unwrap(); + + channels_to_remove.push(synthesis_channel); + + gst::debug!( + CAT, + imp = self, + "{channel_type_logname} channel {language_code} will be removed" + ); + } + CustomChannelUpdate::Upsert(bin_description) => { + if let Some(synthesis_channel) = channels_map.remove(&language_code) { + channels_to_remove.push(synthesis_channel); + + gst::debug!( + CAT, + imp = self, + "{channel_type_logname} channel {language_code} will be removed" + ); + } + + let synthesis_channel = self.construct_synthesis_channel( + &language_code, + settings.accumulate_time, + &bin_description, + )?; + channels_to_add.push(synthesis_channel.clone()); + channels_map.insert(language_code.clone(), synthesis_channel); + + gst::debug!( + CAT, + imp = self, + "{channel_type_logname} channel {language_code} will be added" + ); + } + } + } + + Ok((channels_to_remove, channels_to_add)) + } + + #[allow(clippy::type_complexity)] + fn prepare_language_updates( + &self, + pad_state: &mut TranscriberSinkPadState, + pad_settings: &TranscriberSinkPadSettings, + mut old_languages: HashSet, + ) -> Result< + ( + Vec<(gst::Element, Option)>, + HashMap)>, + ), + Error, + > { + let mut languages_to_add: HashMap)> = + HashMap::new(); + + for k in pad_state + .caption_channels + .keys() + .chain(pad_state.synthesis_channels.keys()) + .chain(pad_state.subtitle_channels.keys()) + { + use std::collections::hash_map::Entry::*; + if let Vacant(e) = pad_state.language_tees.entry(k.clone()) { + let tee = gst::ElementFactory::make("tee") + .name(format!("tee-{}", k)) + .property("allow-not-linked", true) + .build()?; + + if let Some(val) = pad_settings + .language_filters + .as_ref() + .and_then(|f| f.value(k).ok()) + { + let filter = if val.is::() { + let bin_description = val.get::().unwrap(); + gst::parse::bin_from_description_full( + &bin_description, + true, + None, + gst::ParseFlags::NO_SINGLE_ELEMENT_BINS, + )? + } else if val.is::() { + val.get::().unwrap() + } else { + return Err(anyhow!( + "Value for language filter map must be string or element" + )); + }; + + languages_to_add.insert(k.clone(), (tee.clone(), Some(filter.clone()))); + pad_state.language_filters.insert(k.clone(), filter); + + gst::debug!(CAT, imp = self, "language {k} will be added"); + } else { + languages_to_add.insert(k.clone(), (tee.clone(), None)); + + gst::debug!(CAT, imp = self, "language {k} will be added"); + } + + e.insert(tee); + }; + + old_languages.remove(k); + } + + let mut languages_to_remove: Vec<(gst::Element, Option)> = vec![]; + + for language in old_languages.drain() { + if let Some(tee) = pad_state.language_tees.remove(&language) { + let filter = pad_state.language_filters.remove(&language); + languages_to_remove.push((tee, filter)); + + gst::debug!(CAT, imp = self, "language {language} will be removed"); + } + } + + Ok((languages_to_remove, languages_to_add)) + } + + fn remove_caption_channels( + &self, + pad_transcription_bin: &gst::Bin, + ccmux: &gst::Element, + mut channels_to_remove: Vec, + ) -> Result<(), Error> { + for channel in channels_to_remove.drain(..) { + self.tear_down_caption_channel(channel, pad_transcription_bin, ccmux)?; + } + + Ok(()) + } + + fn remove_custom_output_channels( + &self, + pad_transcription_bin: &gst::Bin, + transcription_bin: &gst::Bin, + internal_bin: &gst::Bin, + serial: Option, + mut channels_to_remove: Vec, + ) -> Result<(), Error> { + for channel in channels_to_remove.drain(..) { + self.tear_down_custom_output_channel( + channel, + pad_transcription_bin, + transcription_bin, + internal_bin, + serial, + )?; + } + + Ok(()) + } + + fn remove_languages( + &self, + pad_transcription_bin: &gst::Bin, + mut languages_to_remove: Vec<(gst::Element, Option)>, + ) { + for (tee, filter) in languages_to_remove.drain(..) { + let sinkpad = if let Some(ref filter) = filter { + filter.static_pad("sink").unwrap() + } else { + tee.static_pad("sink").unwrap() + }; + + if let Some(srcpad) = sinkpad.peer() { + let _ = srcpad.unlink(&sinkpad); + + if srcpad.name() != "src" { + srcpad + .parent() + .unwrap() + .downcast::() + .unwrap() + .release_request_pad(&srcpad); + } + } + + let _ = pad_transcription_bin.remove(&tee); + let _ = tee.set_state(gst::State::Null); + if let Some(filter) = filter { + let _ = pad_transcription_bin.remove(&filter); + let _ = filter.set_state(gst::State::Null); + } + } + } + + fn add_languages( + &self, + transcriber: &gst::Element, + pad_transcription_bin: &gst::Bin, + mut languages_to_add: HashMap)>, + ) -> Result<(), Error> { + for (language, (tee, filter)) in languages_to_add.drain() { + let transcriber_srcpad = match language.as_str() { + "transcript" => transcriber + .static_pad("src") + .ok_or(anyhow!("Failed to retrieve transcription source pad"))?, + language => { + let pad = transcriber + .request_pad_simple("translate_src_%u") + .ok_or(anyhow!("Failed to request translation source pad"))?; + pad.set_property("language-code", language); + pad + } + }; + + pad_transcription_bin.add(&tee)?; + tee.sync_state_with_parent()?; + + if let Some(filter) = filter { + pad_transcription_bin.add(&filter)?; + filter.link(&tee)?; + filter.sync_state_with_parent()?; + transcriber_srcpad.link(&filter.static_pad("sink").unwrap())?; + } else { + transcriber_srcpad.link(&tee.static_pad("sink").unwrap())?; + } + } + + Ok(()) + } + + fn add_caption_channels( + &self, + pad_transcription_bin: &gst::Bin, + ccmux: &gst::Element, + language_tees: &HashMap, + passthrough: bool, + mut channels_to_add: Vec, + ) -> Result<(), Error> { + for channel in channels_to_add.drain(..) { + pad_transcription_bin.add(&channel.bin)?; + + let srcpad = + gst::GhostPad::builder_with_target(&channel.bin.static_pad("src").unwrap()) + .unwrap() + .name(format!("src_{}", channel.language)) + .build(); + + pad_transcription_bin.add_pad(&srcpad)?; + + if !passthrough { + let sinkpad = ccmux + .static_pad(&channel.ccmux_pad_name) + .unwrap_or_else(|| ccmux.request_pad_simple(&channel.ccmux_pad_name).unwrap()); + + srcpad.link(&sinkpad)?; + } + + channel.bin.sync_state_with_parent()?; + + let tee = language_tees.get(&channel.language).unwrap(); + + let tee_srcpad = tee.request_pad_simple("src_%u").unwrap(); + + gst::debug!(CAT, obj = tee_srcpad, "Linking language tee to channel"); + + tee_srcpad.link(&channel.bin.static_pad("sink").unwrap())?; + } + + Ok(()) + } + + fn expose_custom_output_pads( + &self, + pad_transcription_bin: &gst::Bin, + transcription_bin: &gst::Bin, + internal_bin: &gst::Bin, + serial: Option, + channel: &CustomOutputChannel, + ) -> Result<(), Error> { + let mut pad_name = format!("src_{}_{}", channel.suffix, channel.language); + let srcpad = gst::GhostPad::builder_with_target(&channel.bin.static_pad("src").unwrap()) + .unwrap() + .name(&pad_name) + .build(); + + pad_transcription_bin.add_pad(&srcpad)?; + + if let Some(serial) = serial { + pad_name = format!("{}_{}", pad_name, serial); + } + + let srcpad = gst::GhostPad::builder_with_target(&srcpad) + .unwrap() + .name(pad_name.clone()) + .build(); + transcription_bin.add_pad(&srcpad)?; + + let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); + internal_bin.add_pad(&srcpad)?; + + let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); + self.obj().add_pad(&srcpad)?; + + Ok(()) + } + + fn add_custom_output_channels( + &self, + pad_transcription_bin: &gst::Bin, + transcription_bin: &gst::Bin, + internal_bin: &gst::Bin, + serial: Option, + language_tees: &HashMap, + mut channels_to_add: Vec, + ) -> Result<(), Error> { + for channel in channels_to_add.drain(..) { + pad_transcription_bin.add(&channel.bin)?; + + self.expose_custom_output_pads( + pad_transcription_bin, + transcription_bin, + internal_bin, + serial, + &channel, + )?; + + channel.bin.sync_state_with_parent()?; + + let tee = language_tees.get(&channel.language).unwrap(); + + let tee_srcpad = tee.request_pad_simple("src_%u").unwrap(); + + gst::debug!(CAT, obj = tee_srcpad, "Linking language tee to channel"); + + tee_srcpad.link(&channel.bin.static_pad("sink").unwrap())?; + } + + Ok(()) + } + + fn reconfigure_transcription_bin_dynamic(&self, pad: &TranscriberSinkPad) -> Result<(), Error> { + let mut s = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + gst::info!(CAT, imp = self, "Reconfiguring transcription bin"); + + if let Some(ref mut state) = s.as_mut() { + let mut ps = pad.state.lock().unwrap(); + let pad_state = ps.as_mut().unwrap(); + let pad_settings = pad.settings.lock().unwrap(); + + let old_languages: HashSet = pad_state + .caption_channels + .keys() + .chain(pad_state.synthesis_channels.keys()) + .chain(pad_state.subtitle_channels.keys()) + .cloned() + .collect(); + + gst::debug!(CAT, imp = self, "old languages: {old_languages:?}"); + + let (caption_channels_to_remove, caption_channels_to_add) = + self.prepare_caption_channel_updates(state, pad_state, &pad_settings)?; + + let (synthesis_channels_to_remove, synthesis_channels_to_add) = self + .prepare_custom_output_channel_updates( + &settings, + &pad_settings.synthesis_languages, + &mut pad_state.synthesis_channels, + "synthesis", + )?; + + let (subtitle_channels_to_remove, subtitle_channels_to_add) = self + .prepare_custom_output_channel_updates( + &settings, + &pad_settings.subtitle_languages, + &mut pad_state.subtitle_channels, + "subtitle", + )?; + + let (languages_to_remove, languages_to_add) = + self.prepare_language_updates(pad_state, &pad_settings, old_languages)?; + + let ccmux = state.ccmux.clone(); + let pad_transcription_bin = pad_state.transcription_bin.clone(); + let transcriber = pad_state.transcriber.clone().unwrap(); + let language_tees = pad_state.language_tees.clone(); + let passthrough = pad_settings.passthrough; + let serial = pad_state.serial; + let transcription_bin = state.transcription_bin.clone(); + let internal_bin = state.internal_bin.clone(); + + drop(s); + drop(settings); + drop(pad_settings); + drop(ps); + + gst::debug!( + CAT, + imp = self, + "update prepared, releasing locks and applying" + ); + + self.remove_caption_channels( + &pad_transcription_bin, + &ccmux, + caption_channels_to_remove, + )?; + + self.remove_custom_output_channels( + &pad_transcription_bin, + &transcription_bin, + &internal_bin, + serial, + [synthesis_channels_to_remove, subtitle_channels_to_remove].concat(), + )?; + + self.remove_languages(&pad_transcription_bin, languages_to_remove); + + self.add_languages(&transcriber, &pad_transcription_bin, languages_to_add)?; + + self.add_caption_channels( + &pad_transcription_bin, + &ccmux, + &language_tees, + passthrough, + caption_channels_to_add, + )?; + + self.add_custom_output_channels( + &pad_transcription_bin, + &transcription_bin, + &internal_bin, + serial, + &language_tees, + [synthesis_channels_to_add, subtitle_channels_to_add].concat(), + )?; + } + + Ok(()) + } + fn reconfigure_transcription_bin( &self, pad: &TranscriberSinkPad, @@ -2239,7 +2899,7 @@ impl ElementImpl for TranscriberBin { let _ = pad_state.transcription_bin.set_state(gst::State::Null); if let Some(ref mut state) = s.as_mut() { - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { if let Some(srcpad) = pad_state .transcription_bin .static_pad(&format!("src_{}", channel.language)) @@ -2515,7 +3175,7 @@ struct TranscriberSinkPadState { transcriber_resample: gst::Element, transcriber: Option, queue_passthrough: gst::Element, - transcription_channels: HashMap, + caption_channels: HashMap, synthesis_channels: HashMap, subtitle_channels: HashMap, srcpad_name: Option, @@ -2526,6 +3186,20 @@ struct TranscriberSinkPadState { language_filters: HashMap, } +impl TranscriberSinkPad { + fn uses_translation_bin(&self) -> bool { + let s = self.state.lock().unwrap(); + let state = s.as_ref().unwrap(); + + state + .transcriber + .as_ref() + .and_then(|t| t.factory()) + .map(|f| f.name() == "translationbin") + .unwrap_or(false) + } +} + impl TranscriberSinkPadState { fn try_new() -> Result { Ok(Self { @@ -2549,7 +3223,7 @@ impl TranscriberSinkPadState { .build() .ok(), queue_passthrough: gst::ElementFactory::make("queue").build()?, - transcription_channels: HashMap::new(), + caption_channels: HashMap::new(), synthesis_channels: HashMap::new(), subtitle_channels: HashMap::new(), srcpad_name: None, @@ -2691,7 +3365,7 @@ impl TranscriberSinkPadState { fn link_transcription_channel( &self, - channel: &TranscriptionChannel, + channel: &CaptionChannel, state: &State, passthrough: bool, ) -> Result<(), Error> { @@ -2712,39 +3386,6 @@ impl TranscriberSinkPadState { Ok(()) } - - fn expose_custom_output_pads( - &self, - topbin: &super::TranscriberBin, - state: &State, - channel: &CustomOutputChannel, - ) -> Result<(), Error> { - let mut pad_name = format!("src_{}_{}", channel.suffix, channel.language); - let srcpad = gst::GhostPad::builder_with_target(&channel.bin.static_pad("src").unwrap()) - .unwrap() - .name(&pad_name) - .build(); - - self.transcription_bin.add_pad(&srcpad)?; - - if let Some(serial) = self.serial { - pad_name = format!("{}_{}", pad_name, serial); - } - - let srcpad = gst::GhostPad::builder_with_target(&srcpad) - .unwrap() - .name(pad_name.clone()) - .build(); - state.transcription_bin.add_pad(&srcpad)?; - - let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); - state.internal_bin.add_pad(&srcpad)?; - - let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); - topbin.add_pad(&srcpad)?; - - Ok(()) - } } pub struct TranscriberSinkPad { @@ -2885,7 +3526,19 @@ impl ObjectImpl for TranscriberSinkPad { drop(settings); if let Some(this) = self.obj().parent().and_downcast::() { - this.imp().update_languages(&self.obj(), false); + if self.uses_translation_bin() { + if let Err(e) = this.imp().reconfigure_transcription_bin_dynamic(self) { + gst::error!(CAT, "Couldn't reconfigure caption channels: {e}"); + gst::element_imp_error!( + this.imp(), + gst::StreamError::Failed, + ["Couldn't reconfigure caption channels: {}", e] + ); + *this.imp().state.lock().unwrap() = None; + } + } else { + this.imp().update_languages(&self.obj(), false) + } } } "synthesis-languages" => { @@ -2903,7 +3556,19 @@ impl ObjectImpl for TranscriberSinkPad { drop(settings); if let Some(this) = self.obj().parent().and_downcast::() { - this.imp().update_languages(&self.obj(), false); + if self.uses_translation_bin() { + if let Err(e) = this.imp().reconfigure_transcription_bin_dynamic(self) { + gst::error!(CAT, "Couldn't reconfigure synthesis channels: {e}"); + gst::element_imp_error!( + this.imp(), + gst::StreamError::Failed, + ["Couldn't reconfigure synthesis channels: {}", e] + ); + *this.imp().state.lock().unwrap() = None; + } + } else { + this.imp().update_languages(&self.obj(), false) + } } } "subtitle-languages" => { @@ -2921,7 +3586,19 @@ impl ObjectImpl for TranscriberSinkPad { drop(settings); if let Some(this) = self.obj().parent().and_downcast::() { - this.imp().update_languages(&self.obj(), false); + if self.uses_translation_bin() { + if let Err(e) = this.imp().reconfigure_transcription_bin_dynamic(self) { + gst::error!(CAT, "Couldn't reconfigure subtitle channels: {e}"); + gst::element_imp_error!( + this.imp(), + gst::StreamError::Failed, + ["Couldn't reconfigure subtitle channels: {}", e] + ); + *this.imp().state.lock().unwrap() = None; + } + } else { + this.imp().update_languages(&self.obj(), false) + } } } "language-filters" => { diff --git a/video/closedcaption/src/translationbin/imp.rs b/video/closedcaption/src/translationbin/imp.rs index 034756c95..c46c75b20 100644 --- a/video/closedcaption/src/translationbin/imp.rs +++ b/video/closedcaption/src/translationbin/imp.rs @@ -64,6 +64,74 @@ pub struct TranslationBin { } impl TranslationBin { + fn prepare_translation_srcpad( + &self, + elem: &super::TranslationBin, + srcpad: &super::TranslationSrcPad, + input_language_code: &str, + translate_latency_ms: u32, + tee: &gst::Element, + ) -> Result<(), Error> { + let output_language_code = srcpad.imp().settings.lock().unwrap().language_code.clone(); + + let queue = gst::ElementFactory::make("queue").build()?; + let translator = gst::ElementFactory::make("awstranslate") + .property("input-language-code", input_language_code) + .property("output-language-code", output_language_code) + .build()?; + + if translator.has_property_with_type("latency", u32::static_type()) { + translator.set_property("latency", translate_latency_ms); + } + + elem.add_many([&queue, &translator])?; + queue.sync_state_with_parent()?; + translator.sync_state_with_parent()?; + + tee.link(&queue)?; + queue.link(&translator)?; + + srcpad.set_target(Some( + &translator + .static_pad("src") + .ok_or(anyhow!("No pad named src on translator"))?, + ))?; + + let mut pad_state = srcpad.imp().state.lock().unwrap(); + + pad_state.queue = Some(queue); + pad_state.translator = Some(translator); + + Ok(()) + } + + fn unprepare_translation_srcpad( + &self, + elem: &super::TranslationBin, + tee: &gst::Element, + srcpad: &super::TranslationSrcPad, + ) -> Result<(), Error> { + let (queue, translator) = { + let mut pad_state = srcpad.imp().state.lock().unwrap(); + + ( + pad_state.queue.take().unwrap(), + pad_state.translator.take().unwrap(), + ) + }; + + tee.unlink(&queue); + + elem.remove_many([&queue, &translator])?; + + srcpad.set_target(None::<&gst::Pad>)?; + + let _ = queue.set_state(gst::State::Null); + let _ = translator.set_state(gst::State::Null); + + Ok(()) + } + fn prepare(&self) -> Result<(), Error> { let (transcriber, srcpads) = { let state = self.state.lock().unwrap(); @@ -124,35 +192,13 @@ impl TranslationBin { .set_target(Some(&queue.static_pad("src").unwrap()))?; for srcpad in srcpads { - let output_language_code = srcpad.imp().settings.lock().unwrap().language_code.clone(); - - let queue = gst::ElementFactory::make("queue").build()?; - let translator = gst::ElementFactory::make("awstranslate") - .property("input-language-code", &language_code) - .property("output-language-code", output_language_code) - .build()?; - - if translator.has_property_with_type("latency", u32::static_type()) { - translator.set_property("latency", translate_latency_ms); - } - - obj.add_many([&queue, &translator])?; - queue.sync_state_with_parent()?; - translator.sync_state_with_parent()?; - - tee.link(&queue)?; - queue.link(&translator)?; - - srcpad.set_target(Some( - &translator - .static_pad("src") - .ok_or(anyhow!("No pad named src on translator"))?, - ))?; - - let mut pad_state = srcpad.imp().state.lock().unwrap(); - - pad_state.queue = Some(queue); - pad_state.translator = Some(translator); + self.prepare_translation_srcpad( + &obj, + &srcpad, + &language_code, + translate_latency_ms, + &tee, + )?; } let mut state = self.state.lock().unwrap(); @@ -176,6 +222,12 @@ impl TranslationBin { }; let obj = self.obj(); + let srcpads = self.state.lock().unwrap().srcpads.clone(); + + for srcpad in srcpads { + self.unprepare_translation_srcpad(&obj, &tee, &srcpad)?; + } + transcriber.unlink(&tee); obj.remove_many([&transcriber, &tee, &queue])?; @@ -183,26 +235,6 @@ impl TranslationBin { self.audio_sinkpad.set_target(None::<&gst::Pad>)?; self.transcript_srcpad.set_target(None::<&gst::Pad>)?; - let srcpads = self.state.lock().unwrap().srcpads.clone(); - - for srcpad in srcpads { - let (queue, translator) = { - let mut pad_state = srcpad.imp().state.lock().unwrap(); - - ( - pad_state.queue.take().unwrap(), - pad_state.translator.take().unwrap(), - ) - }; - - obj.remove_many([&queue, &translator])?; - - srcpad.set_target(None::<&gst::Pad>)?; - - let _ = queue.set_state(gst::State::Null); - let _ = translator.set_state(gst::State::Null); - } - let _ = transcriber.set_state(gst::State::Null); let _ = tee.set_state(gst::State::Null); let _ = queue.set_state(gst::State::Null); @@ -370,11 +402,40 @@ impl ElementImpl for TranslationBin { self.state.lock().unwrap().srcpads.insert(pad.clone()); + if let Some(tee) = self.state.lock().unwrap().tee.clone() { + let Settings { + translate_latency, + language_code, + .. + } = self.settings.lock().unwrap().clone(); + + let translate_latency_ms = translate_latency.mseconds() as u32; + + if let Err(err) = self.prepare_translation_srcpad( + &self.obj(), + &pad, + &language_code, + translate_latency_ms, + &tee, + ) { + gst::error!(CAT, "Failed to prepare translation source pad: {err:?}"); + return None; + } + } + Some(pad.upcast()) } fn release_pad(&self, pad: &gst::Pad) { - let _ = self.state.lock().unwrap().srcpads.remove(pad); + let srcpad = self.state.lock().unwrap().srcpads.take(pad); + + if let Some(srcpad) = srcpad { + if let Some(tee) = self.state.lock().unwrap().tee.clone() { + if let Err(err) = self.unprepare_translation_srcpad(&self.obj(), &tee, &srcpad) { + gst::warning!(CAT, "Failed to unprepare translation source pad: {err:?}"); + } + } + } let _ = self.obj().remove_pad(pad); } @@ -511,7 +572,6 @@ impl ObjectImpl for TranslationSrcPad { .nick("Language Code") .blurb("The language of the output stream") .default_value(Some(DEFAULT_OUTPUT_LANG_CODE)) - .mutable_ready() .build()] }); @@ -523,7 +583,11 @@ impl ObjectImpl for TranslationSrcPad { "language-code" => { let language_code: String = value.get().expect("type checked upstream"); let mut settings = self.settings.lock().unwrap(); - settings.language_code = language_code; + settings.language_code = language_code.clone(); + + if let Some(translator) = self.state.lock().unwrap().translator.as_ref() { + translator.set_property("output-language-code", language_code); + } } _ => unimplemented!(), }