From d0db66d61e335ad14b34842ec45fe5f75f1a732a Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Wed, 12 Feb 2025 19:01:53 +0100 Subject: [PATCH] transcriberbin: implement dynamic language update Previously, transcriberbin only supported updating translation languages while playing by resetting the state of the transcriber to NULL beforehand, as for instance the speechmatics transcriber needs to reestablish a connection to request new languages. Now that translationbin exists, we can request new languages without restarting the transcriber (this commit also implements support for this in translationbin). There is some code duplication as the old method still needs to be supported, and not all code was trivially factorizable, but after some refactoring most of the code for updating languages is shared nevertheless. Part-of: --- docs/plugins/gst_plugins_cache.json | 2 +- video/closedcaption/src/transcriberbin/imp.rs | 957 +++++++++++++++--- video/closedcaption/src/translationbin/imp.rs | 168 ++- 3 files changed, 934 insertions(+), 193 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 07f62fa47..7d37daed6 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -9669,7 +9669,7 @@ "construct-only": false, "controllable": false, "default": "fr-FR", - "mutable": "ready", + "mutable": "null", "readable": true, "type": "gchararray", "writable": true diff --git a/video/closedcaption/src/transcriberbin/imp.rs b/video/closedcaption/src/transcriberbin/imp.rs index 86944b2ca..d89a09f37 100644 --- a/video/closedcaption/src/transcriberbin/imp.rs +++ b/video/closedcaption/src/transcriberbin/imp.rs @@ -12,7 +12,7 @@ use anyhow::{anyhow, Error}; use gst::glib; use gst::prelude::*; use gst::subclass::prelude::*; -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; use std::sync::Mutex; use std::sync::LazyLock; @@ -53,17 +53,82 @@ enum PassthroughState { Disabled, } +#[derive(Debug)] +enum CaptionChannelUpdate { + Upsert(HashSet), + Remove, +} + +#[derive(Debug)] +enum CustomChannelUpdate { + Upsert(String), + Remove, +} + +fn parse_language_pair( + mux_method: MuxMethod, + key: &str, + value: &gst::glib::Value, +) -> Result<(String, HashSet), Error> { + let lowercased_key = key.to_lowercase(); + Ok(match mux_method { + MuxMethod::Cea608 => { + if ["cc1", "cc3"].contains(&lowercased_key.as_str()) { + ( + value.get::()?, + HashSet::from([lowercased_key.to_string()]), + ) + } else if let Ok(caption_stream) = value.get::() { + if !["cc1", "cc3"].contains(&caption_stream.as_str()) { + anyhow::bail!( + "Unknown 608 channel {}, valid values are cc1, cc3", + caption_stream + ); + } + (key.to_string(), HashSet::from([caption_stream])) + } else { + anyhow::bail!("Unknown 608 channel/language {}", key); + } + } + MuxMethod::Cea708 => { + if let Ok(caption_stream) = value.get::() { + (key.to_string(), HashSet::from([caption_stream])) + } else if let Ok(caption_streams) = value.get::() { + let mut streams = HashSet::new(); + for s in caption_streams.iter() { + let service = s.get::()?; + if ["cc1", "cc3"].contains(&service.as_str()) || service.starts_with("708_") { + streams.insert(service); + } else { + anyhow::bail!( + "Unknown 708 service {}, valid values are cc1, cc3 or 708_*", + key + ); + } + } + (key.to_string(), streams) + } else { + anyhow::bail!("Unknown 708 translation language field {}", key); + } + } + }) +} + /* One per language, including original */ -struct TranscriptionChannel { +#[derive(Clone)] +struct CaptionChannel { bin: gst::Bin, textwrap: gst::Element, tttoceax08: gst::Element, language: String, ccmux_pad_name: String, cccapsfilter: gst::Element, + caption_streams: HashSet, } +#[derive(Clone)] struct CustomOutputChannel { + bin_description: String, bin: gst::Bin, language: String, latency: gst::ClockTime, @@ -237,7 +302,7 @@ impl TranscriberBin { ) -> Result<(), Error> { for (language, bin) in itertools::chain!( pad_state - .transcription_channels + .caption_channels .values() .map(|c| (c.language.as_str(), c.bin.clone())), pad_state @@ -261,7 +326,7 @@ impl TranscriberBin { pad_state: &TranscriberSinkPadState, pad_settings: &TranscriberSinkPadSettings, ) -> Result<(), Error> { - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { pad_state.link_transcription_channel(channel, state, pad_settings.passthrough)?; } @@ -269,7 +334,13 @@ impl TranscriberBin { pad_state.synthesis_channels.values(), pad_state.subtitle_channels.values() ) { - pad_state.expose_custom_output_pads(&self.obj(), state, channel)?; + self.expose_custom_output_pads( + &pad_state.transcription_bin, + &state.transcription_bin, + &state.internal_bin, + pad_state.serial, + channel, + )?; } Ok(()) @@ -310,6 +381,7 @@ impl TranscriberBin { bin.add_pad(&srcpad)?; Ok(CustomOutputChannel { + bin_description: bin_description.to_string(), bin, language: String::from(lang), latency, @@ -346,6 +418,7 @@ impl TranscriberBin { bin.add_pad(&srcpad)?; Ok(CustomOutputChannel { + bin_description: bin_description.to_string(), bin, language: String::from(lang), latency, @@ -357,8 +430,8 @@ impl TranscriberBin { &self, lang: &str, mux_method: MuxMethod, - caption_streams: Vec, - ) -> Result { + caption_streams: HashSet, + ) -> Result { let bin = gst::Bin::new(); let queue = gst::ElementFactory::make("queue").build()?; let textwrap = gst::ElementFactory::make("textwrap").build()?; @@ -369,7 +442,7 @@ impl TranscriberBin { } ( gst::ElementFactory::make("tttocea608").build()?, - caption_streams[0].clone(), + caption_streams.iter().next().unwrap().clone(), ) } MuxMethod::Cea708 => { @@ -447,13 +520,14 @@ impl TranscriberBin { bin.add_pad(&sinkpad)?; bin.add_pad(&srcpad)?; - Ok(TranscriptionChannel { + Ok(CaptionChannel { bin, textwrap, tttoceax08, language: String::from(lang), ccmux_pad_name, cccapsfilter, + caption_streams, }) } @@ -712,7 +786,7 @@ impl TranscriberBin { for pad in state.audio_sink_pads.values() { let ps = pad.imp().state.lock().unwrap(); if let Ok(pad_state) = ps.as_ref() { - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { channel.cccapsfilter.set_property("caps", &caps); } } @@ -834,7 +908,7 @@ impl TranscriberBin { pad_state.audio_tee.release_request_pad(&audio_tee_pad); } - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { let srcpad = pad_state .transcription_bin .static_pad(&format!("src_{}", channel.language)) @@ -869,7 +943,7 @@ impl TranscriberBin { gst::debug!(CAT, imp = sinkpad, "enabling transcription bin"); - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { let srcpad = pad_state .transcription_bin .static_pad(&format!("src_{}", channel.language)) @@ -1014,7 +1088,7 @@ impl TranscriberBin { pad ); - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { match mux_method { MuxMethod::Cea608 => channel.tttoceax08.set_property("mode", mode), MuxMethod::Cea708 => match mode { @@ -1081,7 +1155,7 @@ impl TranscriberBin { pad_state: &mut TranscriberSinkPadState, pad_settings: &TranscriberSinkPadSettings, ) -> Result<(), Error> { - self.construct_transcription_channels(pad_settings, mux_method, pad_state) + self.construct_caption_channels(pad_settings, mux_method, pad_state) .unwrap(); self.construct_synthesis_channels(accumulate_time, pad_settings, pad_state) @@ -1091,7 +1165,7 @@ impl TranscriberBin { .unwrap(); for k in pad_state - .transcription_channels + .caption_channels .keys() .chain(pad_state.synthesis_channels.keys()) .chain(pad_state.subtitle_channels.keys()) @@ -1145,11 +1219,11 @@ impl TranscriberBin { ) -> Result<(), Error> { pad_state.unlink_language_tees(self.obj().as_ref(), state, pad_state); - self.tear_down_transcription_channels(state, pad_state)?; + self.tear_down_caption_channels(state, pad_state)?; - self.tear_down_synthesis_channels(&self.obj(), state, pad_state)?; + self.tear_down_synthesis_channels(state, pad_state)?; - self.tear_down_subtitle_channels(&self.obj(), state, pad_state)?; + self.tear_down_subtitle_channels(state, pad_state)?; for (language, tee) in pad_state.language_tees.drain() { if let Some(filter) = pad_state.language_filters.remove(&language) { @@ -1168,41 +1242,102 @@ impl TranscriberBin { fn tear_down_custom_output_channel( &self, - topbin: &super::TranscriberBin, channel: CustomOutputChannel, - state: &State, - pad_state: &mut TranscriberSinkPadState, + pad_transcription_bin: &gst::Bin, + transcription_bin: &gst::Bin, + internal_bin: &gst::Bin, + serial: Option, ) -> Result<(), Error> { let mut pad_name = format!("src_{}_{}", channel.suffix, channel.language); - let srcpad = pad_state.transcription_bin.static_pad(&pad_name).unwrap(); + let srcpad = pad_transcription_bin.static_pad(&pad_name).unwrap(); - let _ = pad_state.transcription_bin.remove_pad(&srcpad); + let _ = pad_transcription_bin.remove_pad(&srcpad); - pad_state.transcription_bin.remove(&channel.bin)?; + let channel_sinkpad = channel.bin.static_pad("sink").unwrap(); - if let Some(serial) = pad_state.serial { + if let Some(tee_srcpad) = channel_sinkpad.peer() { + let _ = tee_srcpad.unlink(&channel_sinkpad); + if let Some(tee) = tee_srcpad + .parent() + .and_then(|p| p.downcast::().ok()) + { + let _ = tee.remove_pad(&tee_srcpad); + } + } + + pad_transcription_bin.remove(&channel.bin)?; + + let _ = channel.bin.set_state(gst::State::Null); + + if let Some(serial) = serial { pad_name = format!("{}_{}", pad_name, serial); } - let srcpad = state.transcription_bin.static_pad(&pad_name).unwrap(); - let _ = state.transcription_bin.remove_pad(&srcpad); + let srcpad = transcription_bin.static_pad(&pad_name).unwrap(); + let _ = transcription_bin.remove_pad(&srcpad); - let srcpad = state.internal_bin.static_pad(&pad_name).unwrap(); - let _ = state.internal_bin.remove_pad(&srcpad); - let srcpad = topbin.static_pad(&pad_name).unwrap(); - let _ = topbin.remove_pad(&srcpad); + let srcpad = internal_bin.static_pad(&pad_name).unwrap(); + let _ = internal_bin.remove_pad(&srcpad); + let srcpad = self.obj().static_pad(&pad_name).unwrap(); + let _ = self.obj().remove_pad(&srcpad); + + Ok(()) + } + + fn tear_down_caption_channel( + &self, + channel: CaptionChannel, + pad_transcription_bin: &gst::Bin, + ccmux: &gst::Element, + ) -> Result<(), Error> { + let channel_sinkpad = channel.bin.static_pad("sink").unwrap(); + + if let Some(tee_srcpad) = channel_sinkpad.peer() { + let _ = tee_srcpad.unlink(&channel_sinkpad); + if let Some(tee) = tee_srcpad + .parent() + .and_then(|p| p.downcast::().ok()) + { + let _ = tee.remove_pad(&tee_srcpad); + } + } + + let srcpad = pad_transcription_bin + .static_pad(&format!("src_{}", channel.language)) + .unwrap(); + + if let Some(peer) = srcpad.peer() { + // The source pad might not have been linked to the muxer initially, for + // instance in case of a collision with another source pad's + // translation-languages mapping + if peer.parent().and_downcast_ref::() == Some(ccmux) { + srcpad.unlink(&peer)?; + ccmux.release_request_pad(&peer); + } + } + + let _ = pad_transcription_bin.remove_pad(&srcpad); + + pad_transcription_bin.remove(&channel.bin)?; + + let _ = channel.bin.set_state(gst::State::Null); Ok(()) } fn tear_down_synthesis_channels( &self, - topbin: &super::TranscriberBin, state: &State, pad_state: &mut TranscriberSinkPadState, ) -> Result<(), Error> { let channels: Vec<_> = pad_state.synthesis_channels.drain().collect(); for (_, channel) in channels { - self.tear_down_custom_output_channel(topbin, channel, state, pad_state)?; + self.tear_down_custom_output_channel( + channel, + &pad_state.transcription_bin, + &state.transcription_bin, + &state.internal_bin, + pad_state.serial, + )?; } Ok(()) @@ -1210,46 +1345,34 @@ impl TranscriberBin { fn tear_down_subtitle_channels( &self, - topbin: &super::TranscriberBin, state: &State, pad_state: &mut TranscriberSinkPadState, ) -> Result<(), Error> { let channels: Vec<_> = pad_state.subtitle_channels.drain().collect(); for (_, channel) in channels { - self.tear_down_custom_output_channel(topbin, channel, state, pad_state)?; + self.tear_down_custom_output_channel( + channel, + &pad_state.transcription_bin, + &state.transcription_bin, + &state.internal_bin, + pad_state.serial, + )?; } Ok(()) } - fn tear_down_transcription_channels( + fn tear_down_caption_channels( &self, state: &State, pad_state: &mut TranscriberSinkPadState, ) -> Result<(), Error> { - for channel in pad_state.transcription_channels.values() { - let srcpad = pad_state - .transcription_bin - .static_pad(&format!("src_{}", channel.language)) - .unwrap(); + let channels: Vec<_> = pad_state.caption_channels.drain().collect(); - if let Some(peer) = srcpad.peer() { - // The source pad might not have been linked to the muxer initially, for - // instance in case of a collision with another source pad's - // translation-languages mapping - if peer.parent().and_downcast_ref::() == Some(state.ccmux.as_ref()) { - srcpad.unlink(&peer)?; - state.ccmux.release_request_pad(&peer); - } - } - - let _ = pad_state.transcription_bin.remove_pad(&srcpad); - - pad_state.transcription_bin.remove(&channel.bin)?; + for (_, channel) in channels { + self.tear_down_caption_channel(channel, &pad_state.transcription_bin, &state.ccmux)?; } - pad_state.transcription_channels.clear(); - Ok(()) } @@ -1298,7 +1421,7 @@ impl TranscriberBin { Ok(()) } - fn construct_transcription_channels( + fn construct_caption_channels( &self, settings: &TranscriberSinkPadSettings, mux_method: MuxMethod, @@ -1306,46 +1429,9 @@ impl TranscriberBin { ) -> Result<(), Error> { if let Some(ref map) = settings.translation_languages { for (key, value) in map.iter() { - let lowercased_key = key.to_lowercase(); - let (language_code, caption_streams) = match mux_method { - MuxMethod::Cea608 => { - if ["cc1", "cc3"].contains(&lowercased_key.as_str()) { - (value.get::()?, vec![lowercased_key.to_string()]) - } else if let Ok(caption_stream) = value.get::() { - if !["cc1", "cc3"].contains(&caption_stream.as_str()) { - anyhow::bail!( - "Unknown 608 channel {}, valid values are cc1, cc3", - caption_stream - ); - } - (key.to_string(), vec![caption_stream]) - } else { - anyhow::bail!("Unknown 608 channel/language {}", key); - } - } - MuxMethod::Cea708 => { - if let Ok(caption_stream) = value.get::() { - (key.to_string(), vec![caption_stream]) - } else if let Ok(caption_streams) = value.get::() { - let mut streams = vec![]; - for s in caption_streams.iter() { - let service = s.get::()?; - if ["cc1", "cc3"].contains(&service.as_str()) - || service.starts_with("708_") - { - streams.push(service); - } else { - anyhow::bail!("Unknown 708 service {}, valid values are cc1, cc3 or 708_*", key); - } - } - (key.to_string(), streams) - } else { - anyhow::bail!("Unknown 708 translation language field {}", key); - } - } - }; + let (language_code, caption_streams) = parse_language_pair(mux_method, key, value)?; - pad_state.transcription_channels.insert( + pad_state.caption_channels.insert( language_code.to_owned(), self.construct_transcription_channel( &language_code, @@ -1356,22 +1442,596 @@ impl TranscriberBin { } } else { let caption_streams = match mux_method { - MuxMethod::Cea608 => vec!["cc1".to_string()], - MuxMethod::Cea708 => vec!["cc1".to_string(), "708_1".to_string()], + MuxMethod::Cea608 => HashSet::from(["cc1".to_string()]), + MuxMethod::Cea708 => HashSet::from(["cc1".to_string(), "708_1".to_string()]), }; - pad_state.transcription_channels.insert( + pad_state.caption_channels.insert( "transcript".to_string(), self.construct_transcription_channel("transcript", mux_method, caption_streams)?, ); } - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { pad_state.transcription_bin.add(&channel.bin)?; } Ok(()) } + fn prepare_caption_channel_updates( + &self, + state: &State, + pad_state: &mut TranscriberSinkPadState, + settings: &TranscriberSinkPadSettings, + ) -> Result<(Vec, Vec), Error> { + let mut updates = HashMap::new(); + let mut old_languages: HashSet = + pad_state.caption_channels.keys().cloned().collect(); + + let translation_languages = + settings + .translation_languages + .clone() + .unwrap_or(match state.mux_method { + MuxMethod::Cea608 => gst::Structure::builder("languages") + .field("transcript", "cc1") + .build(), + MuxMethod::Cea708 => gst::Structure::builder("languages") + .field( + "transcript", + gst::List::from_values([ + "cc1".to_string().to_send_value(), + "708_1".to_string().to_send_value(), + ]), + ) + .build(), + }); + + for (key, value) in translation_languages.iter() { + let (language_code, caption_streams) = + parse_language_pair(state.mux_method, key, value)?; + + old_languages.remove(&language_code); + + if let Some(channel) = pad_state.caption_channels.get(&language_code) { + if channel.caption_streams != caption_streams { + updates.insert(language_code, CaptionChannelUpdate::Upsert(caption_streams)); + } + } else { + updates.insert(language_code, CaptionChannelUpdate::Upsert(caption_streams)); + } + } + + for language_code in old_languages.drain() { + updates.insert(language_code, CaptionChannelUpdate::Remove); + } + + let mut channels_to_remove = vec![]; + let mut channels_to_add = vec![]; + + for (language_code, update) in updates { + match update { + CaptionChannelUpdate::Remove => { + let transcription_channel = + pad_state.caption_channels.remove(&language_code).unwrap(); + + channels_to_remove.push(transcription_channel); + + gst::debug!( + CAT, + imp = self, + "caption channel {language_code} will be removed" + ); + } + CaptionChannelUpdate::Upsert(caption_streams) => { + if let Some(transcription_channel) = + pad_state.caption_channels.remove(&language_code) + { + channels_to_remove.push(transcription_channel); + + gst::debug!( + CAT, + imp = self, + "caption channel {language_code} will be removed" + ); + } + + let transcription_channel = self.construct_transcription_channel( + &language_code, + state.mux_method, + caption_streams, + )?; + channels_to_add.push(transcription_channel.clone()); + pad_state + .caption_channels + .insert(language_code.clone(), transcription_channel); + + gst::debug!( + CAT, + imp = self, + "caption channel {language_code} will be added" + ); + } + } + } + + Ok((channels_to_remove, channels_to_add)) + } + + fn prepare_custom_output_channel_updates( + &self, + settings: &Settings, + languages: &Option, + channels_map: &mut HashMap, + channel_type_logname: &str, + ) -> Result<(Vec, Vec), Error> { + let mut updates = HashMap::new(); + let mut old_languages: HashSet = channels_map.keys().cloned().collect(); + + if let Some(ref map) = languages { + for (key, value) in map.iter() { + let language_code = key.to_string(); + + old_languages.remove(&language_code); + + let bin_description = value.get::()?; + if let Some(channel) = channels_map.get(&language_code) { + if channel.bin_description != bin_description { + updates.insert(language_code, CustomChannelUpdate::Upsert(bin_description)); + } + } else { + updates.insert(language_code, CustomChannelUpdate::Upsert(bin_description)); + } + } + } + + for language_code in old_languages.drain() { + updates.insert(language_code, CustomChannelUpdate::Remove); + } + + let mut channels_to_remove = vec![]; + let mut channels_to_add = vec![]; + + for (language_code, update) in updates { + match update { + CustomChannelUpdate::Remove => { + let synthesis_channel = channels_map.remove(&language_code).unwrap(); + + channels_to_remove.push(synthesis_channel); + + gst::debug!( + CAT, + imp = self, + "{channel_type_logname} channel {language_code} will be removed" + ); + } + CustomChannelUpdate::Upsert(bin_description) => { + if let Some(synthesis_channel) = channels_map.remove(&language_code) { + channels_to_remove.push(synthesis_channel); + + gst::debug!( + CAT, + imp = self, + "{channel_type_logname} channel {language_code} will be removed" + ); + } + + let synthesis_channel = self.construct_synthesis_channel( + &language_code, + settings.accumulate_time, + &bin_description, + )?; + channels_to_add.push(synthesis_channel.clone()); + channels_map.insert(language_code.clone(), synthesis_channel); + + gst::debug!( + CAT, + imp = self, + "{channel_type_logname} channel {language_code} will be added" + ); + } + } + } + + Ok((channels_to_remove, channels_to_add)) + } + + #[allow(clippy::type_complexity)] + fn prepare_language_updates( + &self, + pad_state: &mut TranscriberSinkPadState, + pad_settings: &TranscriberSinkPadSettings, + mut old_languages: HashSet, + ) -> Result< + ( + Vec<(gst::Element, Option)>, + HashMap)>, + ), + Error, + > { + let mut languages_to_add: HashMap)> = + HashMap::new(); + + for k in pad_state + .caption_channels + .keys() + .chain(pad_state.synthesis_channels.keys()) + .chain(pad_state.subtitle_channels.keys()) + { + use std::collections::hash_map::Entry::*; + if let Vacant(e) = pad_state.language_tees.entry(k.clone()) { + let tee = gst::ElementFactory::make("tee") + .name(format!("tee-{}", k)) + .property("allow-not-linked", true) + .build()?; + + if let Some(val) = pad_settings + .language_filters + .as_ref() + .and_then(|f| f.value(k).ok()) + { + let filter = if val.is::() { + let bin_description = val.get::().unwrap(); + gst::parse::bin_from_description_full( + &bin_description, + true, + None, + gst::ParseFlags::NO_SINGLE_ELEMENT_BINS, + )? + } else if val.is::() { + val.get::().unwrap() + } else { + return Err(anyhow!( + "Value for language filter map must be string or element" + )); + }; + + languages_to_add.insert(k.clone(), (tee.clone(), Some(filter.clone()))); + pad_state.language_filters.insert(k.clone(), filter); + + gst::debug!(CAT, imp = self, "language {k} will be added"); + } else { + languages_to_add.insert(k.clone(), (tee.clone(), None)); + + gst::debug!(CAT, imp = self, "language {k} will be added"); + } + + e.insert(tee); + }; + + old_languages.remove(k); + } + + let mut languages_to_remove: Vec<(gst::Element, Option)> = vec![]; + + for language in old_languages.drain() { + if let Some(tee) = pad_state.language_tees.remove(&language) { + let filter = pad_state.language_filters.remove(&language); + languages_to_remove.push((tee, filter)); + + gst::debug!(CAT, imp = self, "language {language} will be removed"); + } + } + + Ok((languages_to_remove, languages_to_add)) + } + + fn remove_caption_channels( + &self, + pad_transcription_bin: &gst::Bin, + ccmux: &gst::Element, + mut channels_to_remove: Vec, + ) -> Result<(), Error> { + for channel in channels_to_remove.drain(..) { + self.tear_down_caption_channel(channel, pad_transcription_bin, ccmux)?; + } + + Ok(()) + } + + fn remove_custom_output_channels( + &self, + pad_transcription_bin: &gst::Bin, + transcription_bin: &gst::Bin, + internal_bin: &gst::Bin, + serial: Option, + mut channels_to_remove: Vec, + ) -> Result<(), Error> { + for channel in channels_to_remove.drain(..) { + self.tear_down_custom_output_channel( + channel, + pad_transcription_bin, + transcription_bin, + internal_bin, + serial, + )?; + } + + Ok(()) + } + + fn remove_languages( + &self, + pad_transcription_bin: &gst::Bin, + mut languages_to_remove: Vec<(gst::Element, Option)>, + ) { + for (tee, filter) in languages_to_remove.drain(..) { + let sinkpad = if let Some(ref filter) = filter { + filter.static_pad("sink").unwrap() + } else { + tee.static_pad("sink").unwrap() + }; + + if let Some(srcpad) = sinkpad.peer() { + let _ = srcpad.unlink(&sinkpad); + + if srcpad.name() != "src" { + srcpad + .parent() + .unwrap() + .downcast::() + .unwrap() + .release_request_pad(&srcpad); + } + } + + let _ = pad_transcription_bin.remove(&tee); + let _ = tee.set_state(gst::State::Null); + if let Some(filter) = filter { + let _ = pad_transcription_bin.remove(&filter); + let _ = filter.set_state(gst::State::Null); + } + } + } + + fn add_languages( + &self, + transcriber: &gst::Element, + pad_transcription_bin: &gst::Bin, + mut languages_to_add: HashMap)>, + ) -> Result<(), Error> { + for (language, (tee, filter)) in languages_to_add.drain() { + let transcriber_srcpad = match language.as_str() { + "transcript" => transcriber + .static_pad("src") + .ok_or(anyhow!("Failed to retrieve transcription source pad"))?, + language => { + let pad = transcriber + .request_pad_simple("translate_src_%u") + .ok_or(anyhow!("Failed to request translation source pad"))?; + pad.set_property("language-code", language); + pad + } + }; + + pad_transcription_bin.add(&tee)?; + tee.sync_state_with_parent()?; + + if let Some(filter) = filter { + pad_transcription_bin.add(&filter)?; + filter.link(&tee)?; + filter.sync_state_with_parent()?; + transcriber_srcpad.link(&filter.static_pad("sink").unwrap())?; + } else { + transcriber_srcpad.link(&tee.static_pad("sink").unwrap())?; + } + } + + Ok(()) + } + + fn add_caption_channels( + &self, + pad_transcription_bin: &gst::Bin, + ccmux: &gst::Element, + language_tees: &HashMap, + passthrough: bool, + mut channels_to_add: Vec, + ) -> Result<(), Error> { + for channel in channels_to_add.drain(..) { + pad_transcription_bin.add(&channel.bin)?; + + let srcpad = + gst::GhostPad::builder_with_target(&channel.bin.static_pad("src").unwrap()) + .unwrap() + .name(format!("src_{}", channel.language)) + .build(); + + pad_transcription_bin.add_pad(&srcpad)?; + + if !passthrough { + let sinkpad = ccmux + .static_pad(&channel.ccmux_pad_name) + .unwrap_or_else(|| ccmux.request_pad_simple(&channel.ccmux_pad_name).unwrap()); + + srcpad.link(&sinkpad)?; + } + + channel.bin.sync_state_with_parent()?; + + let tee = language_tees.get(&channel.language).unwrap(); + + let tee_srcpad = tee.request_pad_simple("src_%u").unwrap(); + + gst::debug!(CAT, obj = tee_srcpad, "Linking language tee to channel"); + + tee_srcpad.link(&channel.bin.static_pad("sink").unwrap())?; + } + + Ok(()) + } + + fn expose_custom_output_pads( + &self, + pad_transcription_bin: &gst::Bin, + transcription_bin: &gst::Bin, + internal_bin: &gst::Bin, + serial: Option, + channel: &CustomOutputChannel, + ) -> Result<(), Error> { + let mut pad_name = format!("src_{}_{}", channel.suffix, channel.language); + let srcpad = gst::GhostPad::builder_with_target(&channel.bin.static_pad("src").unwrap()) + .unwrap() + .name(&pad_name) + .build(); + + pad_transcription_bin.add_pad(&srcpad)?; + + if let Some(serial) = serial { + pad_name = format!("{}_{}", pad_name, serial); + } + + let srcpad = gst::GhostPad::builder_with_target(&srcpad) + .unwrap() + .name(pad_name.clone()) + .build(); + transcription_bin.add_pad(&srcpad)?; + + let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); + internal_bin.add_pad(&srcpad)?; + + let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); + self.obj().add_pad(&srcpad)?; + + Ok(()) + } + + fn add_custom_output_channels( + &self, + pad_transcription_bin: &gst::Bin, + transcription_bin: &gst::Bin, + internal_bin: &gst::Bin, + serial: Option, + language_tees: &HashMap, + mut channels_to_add: Vec, + ) -> Result<(), Error> { + for channel in channels_to_add.drain(..) { + pad_transcription_bin.add(&channel.bin)?; + + self.expose_custom_output_pads( + pad_transcription_bin, + transcription_bin, + internal_bin, + serial, + &channel, + )?; + + channel.bin.sync_state_with_parent()?; + + let tee = language_tees.get(&channel.language).unwrap(); + + let tee_srcpad = tee.request_pad_simple("src_%u").unwrap(); + + gst::debug!(CAT, obj = tee_srcpad, "Linking language tee to channel"); + + tee_srcpad.link(&channel.bin.static_pad("sink").unwrap())?; + } + + Ok(()) + } + + fn reconfigure_transcription_bin_dynamic(&self, pad: &TranscriberSinkPad) -> Result<(), Error> { + let mut s = self.state.lock().unwrap(); + let settings = self.settings.lock().unwrap(); + + gst::info!(CAT, imp = self, "Reconfiguring transcription bin"); + + if let Some(ref mut state) = s.as_mut() { + let mut ps = pad.state.lock().unwrap(); + let pad_state = ps.as_mut().unwrap(); + let pad_settings = pad.settings.lock().unwrap(); + + let old_languages: HashSet = pad_state + .caption_channels + .keys() + .chain(pad_state.synthesis_channels.keys()) + .chain(pad_state.subtitle_channels.keys()) + .cloned() + .collect(); + + gst::debug!(CAT, imp = self, "old languages: {old_languages:?}"); + + let (caption_channels_to_remove, caption_channels_to_add) = + self.prepare_caption_channel_updates(state, pad_state, &pad_settings)?; + + let (synthesis_channels_to_remove, synthesis_channels_to_add) = self + .prepare_custom_output_channel_updates( + &settings, + &pad_settings.synthesis_languages, + &mut pad_state.synthesis_channels, + "synthesis", + )?; + + let (subtitle_channels_to_remove, subtitle_channels_to_add) = self + .prepare_custom_output_channel_updates( + &settings, + &pad_settings.subtitle_languages, + &mut pad_state.subtitle_channels, + "subtitle", + )?; + + let (languages_to_remove, languages_to_add) = + self.prepare_language_updates(pad_state, &pad_settings, old_languages)?; + + let ccmux = state.ccmux.clone(); + let pad_transcription_bin = pad_state.transcription_bin.clone(); + let transcriber = pad_state.transcriber.clone().unwrap(); + let language_tees = pad_state.language_tees.clone(); + let passthrough = pad_settings.passthrough; + let serial = pad_state.serial; + let transcription_bin = state.transcription_bin.clone(); + let internal_bin = state.internal_bin.clone(); + + drop(s); + drop(settings); + drop(pad_settings); + drop(ps); + + gst::debug!( + CAT, + imp = self, + "update prepared, releasing locks and applying" + ); + + self.remove_caption_channels( + &pad_transcription_bin, + &ccmux, + caption_channels_to_remove, + )?; + + self.remove_custom_output_channels( + &pad_transcription_bin, + &transcription_bin, + &internal_bin, + serial, + [synthesis_channels_to_remove, subtitle_channels_to_remove].concat(), + )?; + + self.remove_languages(&pad_transcription_bin, languages_to_remove); + + self.add_languages(&transcriber, &pad_transcription_bin, languages_to_add)?; + + self.add_caption_channels( + &pad_transcription_bin, + &ccmux, + &language_tees, + passthrough, + caption_channels_to_add, + )?; + + self.add_custom_output_channels( + &pad_transcription_bin, + &transcription_bin, + &internal_bin, + serial, + &language_tees, + [synthesis_channels_to_add, subtitle_channels_to_add].concat(), + )?; + } + + Ok(()) + } + fn reconfigure_transcription_bin( &self, pad: &TranscriberSinkPad, @@ -2239,7 +2899,7 @@ impl ElementImpl for TranscriberBin { let _ = pad_state.transcription_bin.set_state(gst::State::Null); if let Some(ref mut state) = s.as_mut() { - for channel in pad_state.transcription_channels.values() { + for channel in pad_state.caption_channels.values() { if let Some(srcpad) = pad_state .transcription_bin .static_pad(&format!("src_{}", channel.language)) @@ -2515,7 +3175,7 @@ struct TranscriberSinkPadState { transcriber_resample: gst::Element, transcriber: Option, queue_passthrough: gst::Element, - transcription_channels: HashMap, + caption_channels: HashMap, synthesis_channels: HashMap, subtitle_channels: HashMap, srcpad_name: Option, @@ -2526,6 +3186,20 @@ struct TranscriberSinkPadState { language_filters: HashMap, } +impl TranscriberSinkPad { + fn uses_translation_bin(&self) -> bool { + let s = self.state.lock().unwrap(); + let state = s.as_ref().unwrap(); + + state + .transcriber + .as_ref() + .and_then(|t| t.factory()) + .map(|f| f.name() == "translationbin") + .unwrap_or(false) + } +} + impl TranscriberSinkPadState { fn try_new() -> Result { Ok(Self { @@ -2549,7 +3223,7 @@ impl TranscriberSinkPadState { .build() .ok(), queue_passthrough: gst::ElementFactory::make("queue").build()?, - transcription_channels: HashMap::new(), + caption_channels: HashMap::new(), synthesis_channels: HashMap::new(), subtitle_channels: HashMap::new(), srcpad_name: None, @@ -2691,7 +3365,7 @@ impl TranscriberSinkPadState { fn link_transcription_channel( &self, - channel: &TranscriptionChannel, + channel: &CaptionChannel, state: &State, passthrough: bool, ) -> Result<(), Error> { @@ -2712,39 +3386,6 @@ impl TranscriberSinkPadState { Ok(()) } - - fn expose_custom_output_pads( - &self, - topbin: &super::TranscriberBin, - state: &State, - channel: &CustomOutputChannel, - ) -> Result<(), Error> { - let mut pad_name = format!("src_{}_{}", channel.suffix, channel.language); - let srcpad = gst::GhostPad::builder_with_target(&channel.bin.static_pad("src").unwrap()) - .unwrap() - .name(&pad_name) - .build(); - - self.transcription_bin.add_pad(&srcpad)?; - - if let Some(serial) = self.serial { - pad_name = format!("{}_{}", pad_name, serial); - } - - let srcpad = gst::GhostPad::builder_with_target(&srcpad) - .unwrap() - .name(pad_name.clone()) - .build(); - state.transcription_bin.add_pad(&srcpad)?; - - let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); - state.internal_bin.add_pad(&srcpad)?; - - let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); - topbin.add_pad(&srcpad)?; - - Ok(()) - } } pub struct TranscriberSinkPad { @@ -2885,7 +3526,19 @@ impl ObjectImpl for TranscriberSinkPad { drop(settings); if let Some(this) = self.obj().parent().and_downcast::() { - this.imp().update_languages(&self.obj(), false); + if self.uses_translation_bin() { + if let Err(e) = this.imp().reconfigure_transcription_bin_dynamic(self) { + gst::error!(CAT, "Couldn't reconfigure caption channels: {e}"); + gst::element_imp_error!( + this.imp(), + gst::StreamError::Failed, + ["Couldn't reconfigure caption channels: {}", e] + ); + *this.imp().state.lock().unwrap() = None; + } + } else { + this.imp().update_languages(&self.obj(), false) + } } } "synthesis-languages" => { @@ -2903,7 +3556,19 @@ impl ObjectImpl for TranscriberSinkPad { drop(settings); if let Some(this) = self.obj().parent().and_downcast::() { - this.imp().update_languages(&self.obj(), false); + if self.uses_translation_bin() { + if let Err(e) = this.imp().reconfigure_transcription_bin_dynamic(self) { + gst::error!(CAT, "Couldn't reconfigure synthesis channels: {e}"); + gst::element_imp_error!( + this.imp(), + gst::StreamError::Failed, + ["Couldn't reconfigure synthesis channels: {}", e] + ); + *this.imp().state.lock().unwrap() = None; + } + } else { + this.imp().update_languages(&self.obj(), false) + } } } "subtitle-languages" => { @@ -2921,7 +3586,19 @@ impl ObjectImpl for TranscriberSinkPad { drop(settings); if let Some(this) = self.obj().parent().and_downcast::() { - this.imp().update_languages(&self.obj(), false); + if self.uses_translation_bin() { + if let Err(e) = this.imp().reconfigure_transcription_bin_dynamic(self) { + gst::error!(CAT, "Couldn't reconfigure subtitle channels: {e}"); + gst::element_imp_error!( + this.imp(), + gst::StreamError::Failed, + ["Couldn't reconfigure subtitle channels: {}", e] + ); + *this.imp().state.lock().unwrap() = None; + } + } else { + this.imp().update_languages(&self.obj(), false) + } } } "language-filters" => { diff --git a/video/closedcaption/src/translationbin/imp.rs b/video/closedcaption/src/translationbin/imp.rs index 034756c95..c46c75b20 100644 --- a/video/closedcaption/src/translationbin/imp.rs +++ b/video/closedcaption/src/translationbin/imp.rs @@ -64,6 +64,74 @@ pub struct TranslationBin { } impl TranslationBin { + fn prepare_translation_srcpad( + &self, + elem: &super::TranslationBin, + srcpad: &super::TranslationSrcPad, + input_language_code: &str, + translate_latency_ms: u32, + tee: &gst::Element, + ) -> Result<(), Error> { + let output_language_code = srcpad.imp().settings.lock().unwrap().language_code.clone(); + + let queue = gst::ElementFactory::make("queue").build()?; + let translator = gst::ElementFactory::make("awstranslate") + .property("input-language-code", input_language_code) + .property("output-language-code", output_language_code) + .build()?; + + if translator.has_property_with_type("latency", u32::static_type()) { + translator.set_property("latency", translate_latency_ms); + } + + elem.add_many([&queue, &translator])?; + queue.sync_state_with_parent()?; + translator.sync_state_with_parent()?; + + tee.link(&queue)?; + queue.link(&translator)?; + + srcpad.set_target(Some( + &translator + .static_pad("src") + .ok_or(anyhow!("No pad named src on translator"))?, + ))?; + + let mut pad_state = srcpad.imp().state.lock().unwrap(); + + pad_state.queue = Some(queue); + pad_state.translator = Some(translator); + + Ok(()) + } + + fn unprepare_translation_srcpad( + &self, + elem: &super::TranslationBin, + tee: &gst::Element, + srcpad: &super::TranslationSrcPad, + ) -> Result<(), Error> { + let (queue, translator) = { + let mut pad_state = srcpad.imp().state.lock().unwrap(); + + ( + pad_state.queue.take().unwrap(), + pad_state.translator.take().unwrap(), + ) + }; + + tee.unlink(&queue); + + elem.remove_many([&queue, &translator])?; + + srcpad.set_target(None::<&gst::Pad>)?; + + let _ = queue.set_state(gst::State::Null); + let _ = translator.set_state(gst::State::Null); + + Ok(()) + } + fn prepare(&self) -> Result<(), Error> { let (transcriber, srcpads) = { let state = self.state.lock().unwrap(); @@ -124,35 +192,13 @@ impl TranslationBin { .set_target(Some(&queue.static_pad("src").unwrap()))?; for srcpad in srcpads { - let output_language_code = srcpad.imp().settings.lock().unwrap().language_code.clone(); - - let queue = gst::ElementFactory::make("queue").build()?; - let translator = gst::ElementFactory::make("awstranslate") - .property("input-language-code", &language_code) - .property("output-language-code", output_language_code) - .build()?; - - if translator.has_property_with_type("latency", u32::static_type()) { - translator.set_property("latency", translate_latency_ms); - } - - obj.add_many([&queue, &translator])?; - queue.sync_state_with_parent()?; - translator.sync_state_with_parent()?; - - tee.link(&queue)?; - queue.link(&translator)?; - - srcpad.set_target(Some( - &translator - .static_pad("src") - .ok_or(anyhow!("No pad named src on translator"))?, - ))?; - - let mut pad_state = srcpad.imp().state.lock().unwrap(); - - pad_state.queue = Some(queue); - pad_state.translator = Some(translator); + self.prepare_translation_srcpad( + &obj, + &srcpad, + &language_code, + translate_latency_ms, + &tee, + )?; } let mut state = self.state.lock().unwrap(); @@ -176,6 +222,12 @@ impl TranslationBin { }; let obj = self.obj(); + let srcpads = self.state.lock().unwrap().srcpads.clone(); + + for srcpad in srcpads { + self.unprepare_translation_srcpad(&obj, &tee, &srcpad)?; + } + transcriber.unlink(&tee); obj.remove_many([&transcriber, &tee, &queue])?; @@ -183,26 +235,6 @@ impl TranslationBin { self.audio_sinkpad.set_target(None::<&gst::Pad>)?; self.transcript_srcpad.set_target(None::<&gst::Pad>)?; - let srcpads = self.state.lock().unwrap().srcpads.clone(); - - for srcpad in srcpads { - let (queue, translator) = { - let mut pad_state = srcpad.imp().state.lock().unwrap(); - - ( - pad_state.queue.take().unwrap(), - pad_state.translator.take().unwrap(), - ) - }; - - obj.remove_many([&queue, &translator])?; - - srcpad.set_target(None::<&gst::Pad>)?; - - let _ = queue.set_state(gst::State::Null); - let _ = translator.set_state(gst::State::Null); - } - let _ = transcriber.set_state(gst::State::Null); let _ = tee.set_state(gst::State::Null); let _ = queue.set_state(gst::State::Null); @@ -370,11 +402,40 @@ impl ElementImpl for TranslationBin { self.state.lock().unwrap().srcpads.insert(pad.clone()); + if let Some(tee) = self.state.lock().unwrap().tee.clone() { + let Settings { + translate_latency, + language_code, + .. + } = self.settings.lock().unwrap().clone(); + + let translate_latency_ms = translate_latency.mseconds() as u32; + + if let Err(err) = self.prepare_translation_srcpad( + &self.obj(), + &pad, + &language_code, + translate_latency_ms, + &tee, + ) { + gst::error!(CAT, "Failed to prepare translation source pad: {err:?}"); + return None; + } + } + Some(pad.upcast()) } fn release_pad(&self, pad: &gst::Pad) { - let _ = self.state.lock().unwrap().srcpads.remove(pad); + let srcpad = self.state.lock().unwrap().srcpads.take(pad); + + if let Some(srcpad) = srcpad { + if let Some(tee) = self.state.lock().unwrap().tee.clone() { + if let Err(err) = self.unprepare_translation_srcpad(&self.obj(), &tee, &srcpad) { + gst::warning!(CAT, "Failed to unprepare translation source pad: {err:?}"); + } + } + } let _ = self.obj().remove_pad(pad); } @@ -511,7 +572,6 @@ impl ObjectImpl for TranslationSrcPad { .nick("Language Code") .blurb("The language of the output stream") .default_value(Some(DEFAULT_OUTPUT_LANG_CODE)) - .mutable_ready() .build()] }); @@ -523,7 +583,11 @@ impl ObjectImpl for TranslationSrcPad { "language-code" => { let language_code: String = value.get().expect("type checked upstream"); let mut settings = self.settings.lock().unwrap(); - settings.language_code = language_code; + settings.language_code = language_code.clone(); + + if let Some(translator) = self.state.lock().unwrap().translator.as_ref() { + translator.set_property("output-language-code", language_code); + } } _ => unimplemented!(), }