From 4406851ae7f27aacb438f16a1409cc270bf05915 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Mon, 18 Nov 2024 20:05:38 +0100 Subject: [PATCH] transcriberbin: add support for speech synthesis This commit adds a new "synthesis-languages" property. Users can set it to define a map of languages (typically translations) that should then be routed through a "synthesis" bin, with its description specifiable as the value of the map. The output of this bin is then exposed as a new pad on the top-level bin. Part-of: --- docs/plugins/gst_plugins_cache.json | 11 + video/closedcaption/src/transcriberbin/imp.rs | 545 ++++++++++++++---- 2 files changed, 441 insertions(+), 115 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index 40be09ee..8189ab59 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -8402,6 +8402,17 @@ "type": "gboolean", "writable": true }, + "synthesis-languages": { + "blurb": "A map of language codes to bin descriptions, e.g. synthesis-languages=\"languages, fr=awspolly\" will use the awspolly element to synthesize speech from French translations", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "playing", + "readable": true, + "type": "GstStructure", + "writable": true + }, "transcriber": { "blurb": "The transcriber element to use", "conditionally-available": false, diff --git a/video/closedcaption/src/transcriberbin/imp.rs b/video/closedcaption/src/transcriberbin/imp.rs index 71fe8996..889a68e6 100644 --- a/video/closedcaption/src/transcriberbin/imp.rs +++ b/video/closedcaption/src/transcriberbin/imp.rs @@ -56,32 +56,10 @@ struct TranscriptionChannel { ccmux_pad_name: String, } -impl TranscriptionChannel { - // Returns the name of the transcriber source pad - fn link_transcriber(&self, transcriber: &gst::Element) -> Result { - let transcriber_src_pad = match self.language.as_str() { - "transcript" => transcriber - .static_pad("src") - .ok_or(anyhow!("Failed to retrieve transcription source pad"))?, - language => { - let pad = transcriber - .request_pad_simple("translate_src_%u") - .ok_or(anyhow!("Failed to request translation source pad"))?; - pad.set_property("language-code", language); - pad - } - }; - - gst::debug!( - CAT, - obj = transcriber_src_pad, - "Linking transcriber source pad to channel" - ); - - transcriber_src_pad.link(&self.bin.static_pad("sink").unwrap())?; - - Ok(transcriber_src_pad.name().to_string()) - } +struct SynthesisChannel { + bin: gst::Bin, + language: String, + latency: gst::ClockTime, } /* Locking order: State, Settings, PadState, PadSettings */ @@ -139,6 +117,153 @@ pub struct TranscriberBin { } impl TranscriberBin { + fn link_transcriber_to_channel( + &self, + topbin: &super::TranscriberBin, + state: &State, + pad_state: &TranscriberSinkPadState, + language: &str, + channel_bin: &gst::Bin, + ) -> Result<(), Error> { + let Some(ref transcriber) = pad_state.transcriber else { + return Err(anyhow!("no transcriber to link")); + }; + + let language_tee = pad_state + .language_tees + .get(language) + .expect("language tee created"); + + let tee_sinkpad = language_tee.static_pad("sink").unwrap(); + + if !tee_sinkpad.is_linked() { + let transcriber_src_pad = match language { + "transcript" => transcriber + .static_pad("src") + .ok_or(anyhow!("Failed to retrieve transcription source pad"))?, + language => { + let pad = transcriber + .request_pad_simple("translate_src_%u") + .ok_or(anyhow!("Failed to request translation source pad"))?; + pad.set_property("language-code", language); + pad + } + }; + + gst::debug!( + CAT, + obj = transcriber_src_pad, + "Linking transcriber source pad to language tee" + ); + + transcriber_src_pad.link(&tee_sinkpad)?; + + pad_state.expose_unsynced_pads(topbin, state, transcriber_src_pad.name().as_str())?; + } + + let tee_srcpad = language_tee.request_pad_simple("src_%u").unwrap(); + + gst::debug!(CAT, obj = tee_srcpad, "Linking language tee to channel"); + + tee_srcpad.link(&channel_bin.static_pad("sink").unwrap())?; + + Ok(()) + } + + fn link_transcriber_to_channels( + &self, + state: &State, + pad_state: &TranscriberSinkPadState, + ) -> Result<(), Error> { + for (language, bin) in Iterator::chain( + pad_state + .transcription_channels + .values() + .map(|c| (c.language.as_str(), c.bin.clone())), + pad_state + .synthesis_channels + .values() + .map(|c| (c.language.as_str(), c.bin.clone())), + ) { + self.link_transcriber_to_channel(&self.obj(), state, pad_state, language, &bin)?; + } + + Ok(()) + } + + fn expose_channel_outputs( + &self, + state: &State, + pad_state: &TranscriberSinkPadState, + pad_settings: &TranscriberSinkPadSettings, + ) -> Result<(), Error> { + for channel in pad_state.transcription_channels.values() { + pad_state.link_transcription_channel(channel, state, pad_settings.passthrough)?; + } + + for channel in pad_state.synthesis_channels.values() { + pad_state.expose_synthesis_pads(&self.obj(), state, channel)?; + } + + Ok(()) + } + + fn construct_synthesis_channel( + &self, + lang: &str, + accumulate_time: gst::ClockTime, + bin_description: &str, + ) -> Result { + let bin = gst::Bin::new(); + let queue = gst::ElementFactory::make("queue").build()?; + let textwrap = gst::ElementFactory::make("textwrap").build()?; + let synthesizer = gst::parse::bin_from_description_full( + bin_description, + true, + None, + gst::ParseFlags::NO_SINGLE_ELEMENT_BINS, + )?; + + bin.add_many([&queue, &textwrap, &synthesizer])?; + gst::Element::link_many([&queue, &textwrap, &synthesizer])?; + + queue.set_property("max-size-buffers", 0u32); + queue.set_property("max-size-time", 0u64); + + textwrap.set_property("lines", 1u32); + textwrap.set_property("columns", u32::MAX); + textwrap.set_property("accumulate-time", accumulate_time); + + let sinkpad = gst::GhostPad::with_target(&queue.static_pad("sink").unwrap()).unwrap(); + bin.add_pad(&sinkpad)?; + + let srcpad = gst::GhostPad::with_target(&synthesizer.static_pad("src").unwrap()).unwrap(); + bin.add_pad(&srcpad)?; + + // FIXME: this won't work if an actual bin gets created, in which + // case we will need to figure out how to query the latency properly + + if synthesizer.is::() { + gst::warning!( + CAT, + imp = self, + "Synthesis element is bin, cannot get latency" + ); + } + + let latency = if synthesizer.has_property_with_type("latency", u32::static_type()) { + gst::ClockTime::from_mseconds(synthesizer.property::("latency") as u64) + } else { + gst::ClockTime::ZERO + }; + + Ok(SynthesisChannel { + bin, + language: String::from(lang), + latency, + }) + } + fn construct_transcription_channel( &self, lang: &str, @@ -320,20 +445,9 @@ impl TranscriberBin { .transcription_bin .add_pad(&transcription_audio_sinkpad)?; - for channel in pad_state.transcription_channels.values() { - pad_state.transcription_bin.add(&channel.bin)?; + self.link_transcriber_to_channels(state, pad_state)?; - if let Some(ref transcriber) = pad_state.transcriber { - let srcpad_name = channel.link_transcriber(transcriber)?; - pad_state.link_transcriber_pads( - &self.obj(), - &srcpad_name, - channel, - state, - pad_settings.passthrough, - )?; - } - } + self.expose_channel_outputs(state, pad_state, pad_settings)?; Ok(()) } @@ -816,28 +930,156 @@ impl TranscriberBin { ); if let Some(old_transcriber) = old_transcriber { - gst::debug!( - CAT, - imp = self, - "Unlinking old transcriber {old_transcriber:?}" - ); + gst::debug!(CAT, obj = old_transcriber, "Unlinking old transcriber"); pad_state.transcriber_aconv.unlink(old_transcriber); - for channel in pad_state.transcription_channels.values() { - pad_state.unlink_transcriber_pads(self.obj().as_ref(), state, channel); - } + pad_state.unlink_language_tees(self.obj().as_ref(), state, pad_state); let _ = pad_state.transcription_bin.remove(old_transcriber); old_transcriber.set_state(gst::State::Null).unwrap(); } if let Some(ref transcriber) = pad_state.transcriber { - gst::debug!(CAT, imp = self, "Linking new transcriber {transcriber:?}"); + gst::debug!(CAT, obj = transcriber, "Linking new transcriber"); pad_state.transcription_bin.add(transcriber)?; transcriber.sync_state_with_parent().unwrap(); pad_state.transcriber_aconv.link(transcriber)?; - for channel in pad_state.transcription_channels.values() { - let srcpad_name = channel.link_transcriber(transcriber)?; - pad_state.expose_unsynced_pads(self.obj().as_ref(), state, &srcpad_name)?; + self.link_transcriber_to_channels(state, pad_state)?; + } + + Ok(()) + } + + fn construct_channels( + &self, + accumulate_time: gst::ClockTime, + mux_method: MuxMethod, + pad_state: &mut TranscriberSinkPadState, + pad_settings: &TranscriberSinkPadSettings, + ) -> Result<(), Error> { + self.construct_transcription_channels(pad_settings, mux_method, pad_state) + .unwrap(); + + self.construct_synthesis_channels(accumulate_time, pad_settings, pad_state) + .unwrap(); + + for k in pad_state + .transcription_channels + .keys() + .chain(pad_state.synthesis_channels.keys()) + { + use std::collections::hash_map::Entry::*; + if let Vacant(e) = pad_state.language_tees.entry(k.clone()) { + let tee = gst::ElementFactory::make("tee") + .name(format!("tee-{}", k)) + .property("allow-not-linked", true) + .build()?; + + pad_state.transcription_bin.add(&tee)?; + + e.insert(tee); + }; + } + + Ok(()) + } + + fn tear_down_channels( + &self, + state: &State, + pad_state: &mut TranscriberSinkPadState, + ) -> Result<(), Error> { + pad_state.unlink_language_tees(self.obj().as_ref(), state, pad_state); + + self.tear_down_transcription_channels(state, pad_state)?; + + self.tear_down_synthesis_channels(&self.obj(), state, pad_state)?; + + for (_, tee) in pad_state.language_tees.drain() { + let _ = pad_state.transcription_bin.remove(&tee); + } + + Ok(()) + } + + fn tear_down_synthesis_channels( + &self, + topbin: &super::TranscriberBin, + state: &State, + pad_state: &mut TranscriberSinkPadState, + ) -> Result<(), Error> { + for channel in pad_state.synthesis_channels.values() { + let mut pad_name = format!("src_synthesis_{}", channel.language); + let srcpad = pad_state.transcription_bin.static_pad(&pad_name).unwrap(); + + let _ = pad_state.transcription_bin.remove_pad(&srcpad); + + pad_state.transcription_bin.remove(&channel.bin)?; + + if let Some(serial) = pad_state.serial { + pad_name = format!("{}_{}", pad_name, serial); + } + let srcpad = state.transcription_bin.static_pad(&pad_name).unwrap(); + let _ = state.transcription_bin.remove_pad(&srcpad); + + let srcpad = state.internal_bin.static_pad(&pad_name).unwrap(); + let _ = state.internal_bin.remove_pad(&srcpad); + let srcpad = topbin.static_pad(&pad_name).unwrap(); + let _ = topbin.remove_pad(&srcpad); + } + + pad_state.synthesis_channels.clear(); + + Ok(()) + } + + fn tear_down_transcription_channels( + &self, + state: &State, + pad_state: &mut TranscriberSinkPadState, + ) -> Result<(), Error> { + for channel in pad_state.transcription_channels.values() { + let srcpad = pad_state + .transcription_bin + .static_pad(&format!("src_{}", channel.language)) + .unwrap(); + + if let Some(peer) = srcpad.peer() { + // The source pad might not have been linked to the muxer initially, for + // instance in case of a collision with another source pad's + // translation-languages mapping + if peer.parent().and_downcast_ref::() == Some(state.ccmux.as_ref()) { + srcpad.unlink(&peer)?; + state.ccmux.release_request_pad(&peer); + } + } + + let _ = pad_state.transcription_bin.remove_pad(&srcpad); + + pad_state.transcription_bin.remove(&channel.bin)?; + } + + pad_state.transcription_channels.clear(); + + Ok(()) + } + + fn construct_synthesis_channels( + &self, + accumulate_time: gst::ClockTime, + pad_settings: &TranscriberSinkPadSettings, + pad_state: &mut TranscriberSinkPadState, + ) -> Result<(), Error> { + if let Some(ref map) = pad_settings.synthesis_languages { + for (language, value) in map.iter() { + let bin_description = value.get::()?; + pad_state.synthesis_channels.insert( + language.to_string(), + self.construct_synthesis_channel(language, accumulate_time, &bin_description)?, + ); + } + + for channel in pad_state.synthesis_channels.values() { + pad_state.transcription_bin.add(&channel.bin)?; } } @@ -848,7 +1090,7 @@ impl TranscriberBin { &self, settings: &TranscriberSinkPadSettings, mux_method: MuxMethod, - transcription_channels: &mut HashMap, + pad_state: &mut TranscriberSinkPadState, ) -> Result<(), Error> { if let Some(ref map) = settings.translation_languages { for (key, value) in map.iter() { @@ -891,7 +1133,7 @@ impl TranscriberBin { } }; - transcription_channels.insert( + pad_state.transcription_channels.insert( language_code.to_owned(), self.construct_transcription_channel( &language_code, @@ -905,11 +1147,16 @@ impl TranscriberBin { MuxMethod::Cea608 => vec!["cc1".to_string()], MuxMethod::Cea708 => vec!["cc1".to_string(), "708_1".to_string()], }; - transcription_channels.insert( + pad_state.transcription_channels.insert( "transcript".to_string(), self.construct_transcription_channel("transcript", mux_method, caption_streams)?, ); } + + for channel in pad_state.transcription_channels.values() { + pad_state.transcription_bin.add(&channel.bin)?; + } + Ok(()) } @@ -979,53 +1226,18 @@ impl TranscriberBin { return Ok(()); } - for channel in pad_state.transcription_channels.values() { - pad_state.unlink_transcriber_pads(self.obj().as_ref(), state, channel); + self.tear_down_channels(state, pad_state)?; - let srcpad = pad_state - .transcription_bin - .static_pad(&format!("src_{}", channel.language)) - .unwrap(); - - if let Some(peer) = srcpad.peer() { - // The source pad might not have been linked to the muxer initially, for - // instance in case of a collision with another source pad's - // translation-languages mapping - if peer.parent().and_downcast_ref::() - == Some(state.ccmux.as_ref()) - { - srcpad.unlink(&peer)?; - state.ccmux.release_request_pad(&peer); - } - } - - let _ = pad_state.transcription_bin.remove_pad(&srcpad); - - pad_state.transcription_bin.remove(&channel.bin)?; - } - - pad_state.transcription_channels.clear(); - - self.construct_transcription_channels( - &pad_settings, + self.construct_channels( + settings.accumulate_time, state.mux_method, - &mut pad_state.transcription_channels, + pad_state, + &pad_settings, )?; - for channel in pad_state.transcription_channels.values() { - pad_state.transcription_bin.add(&channel.bin)?; + self.link_transcriber_to_channels(state, pad_state)?; - if let Some(ref transcriber) = pad_state.transcriber { - let srcpad_name = channel.link_transcriber(transcriber)?; - pad_state.link_transcriber_pads( - &self.obj(), - &srcpad_name, - channel, - state, - pad_settings.passthrough, - )?; - } - } + self.expose_channel_outputs(state, pad_state, &pad_settings)?; self.setup_cc_mode(&pad.obj(), pad_state, state.mux_method, pad_settings.mode); @@ -1127,6 +1339,24 @@ impl TranscriberBin { min } + fn synthesis_latency(&self, state: &State) -> gst::ClockTime { + let mut ret = gst::ClockTime::ZERO; + + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + + if let Ok(pad_state) = ps.as_ref() { + for channel in pad_state.synthesis_channels.values() { + if channel.latency > ret { + ret = channel.latency; + } + } + } + } + + ret + } + #[allow(clippy::single_match)] fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { use gst::QueryViewMut; @@ -1138,8 +1368,10 @@ impl TranscriberBin { let state = self.state.lock().unwrap(); if let Some(state) = state.as_ref() { let upstream_min = self.query_upstream_latency(state); + let synthesis_min = self.synthesis_latency(state); let settings = self.settings.lock().unwrap(); let min = upstream_min + + synthesis_min + settings.latency + settings.accumulate_time + CEAX08MUX_LATENCY @@ -1213,10 +1445,11 @@ impl TranscriberBin { .as_mut() .map_err(|err| anyhow!("Sink pad state creation failed: {err}"))?; let pad_settings = pad.imp().settings.lock().unwrap(); - self.construct_transcription_channels( - &pad_settings, + self.construct_channels( + settings.accumulate_time, settings.mux_method, - &mut pad_state.transcription_channels, + pad_state, + &pad_settings, )?; Ok(State { @@ -1719,10 +1952,11 @@ impl ElementImpl for TranscriberBin { } }; let pad_settings = sink_pad.imp().settings.lock().unwrap(); - self.construct_transcription_channels( - &pad_settings, + self.construct_channels( + settings.accumulate_time, settings.mux_method, - &mut pad_state.transcription_channels, + pad_state, + &pad_settings, ) .unwrap(); @@ -1881,6 +2115,7 @@ impl BinImpl for TranscriberBin { #[derive(Debug, Clone)] struct TranscriberSinkPadSettings { translation_languages: Option, + synthesis_languages: Option, language_code: String, mode: Cea608Mode, passthrough: bool, @@ -1890,6 +2125,7 @@ impl Default for TranscriberSinkPadSettings { fn default() -> Self { Self { translation_languages: None, + synthesis_languages: None, language_code: String::from(DEFAULT_INPUT_LANG_CODE), mode: DEFAULT_MODE, passthrough: DEFAULT_PASSTHROUGH, @@ -1907,9 +2143,11 @@ struct TranscriberSinkPadState { transcriber: Option, queue_passthrough: gst::Element, transcription_channels: HashMap, + synthesis_channels: HashMap, srcpad_name: Option, target_passthrough_state: TargetPassthroughState, serial: Option, + language_tees: HashMap, } impl TranscriberSinkPadState { @@ -1936,9 +2174,11 @@ impl TranscriberSinkPadState { .ok(), queue_passthrough: gst::ElementFactory::make("queue").build()?, transcription_channels: HashMap::new(), + synthesis_channels: HashMap::new(), srcpad_name: None, target_passthrough_state: TargetPassthroughState::None, serial: None, + language_tees: HashMap::new(), }) } @@ -1976,19 +2216,38 @@ impl TranscriberSinkPadState { } } - fn unlink_transcriber_pads( + fn unlink_language_tees( &self, topbin: &super::TranscriberBin, state: &State, - channel: &TranscriptionChannel, + pad_state: &TranscriberSinkPadState, ) { - let sinkpad = channel.bin.static_pad("sink").unwrap(); - let Some(srcpad) = sinkpad.peer() else { - return; - }; - srcpad.unlink(&sinkpad).unwrap(); + for tee in pad_state.language_tees.values() { + let sinkpad = tee.static_pad("sink").unwrap(); - self.remove_unsynced_pads(topbin, state, srcpad.name().as_str()); + if let Some(srcpad) = sinkpad.peer() { + let _ = srcpad.unlink(&sinkpad); + self.remove_unsynced_pads(topbin, state, srcpad.name().as_str()); + if srcpad.name() != "src" { + srcpad + .parent() + .unwrap() + .downcast::() + .unwrap() + .release_request_pad(&srcpad); + } + } + + for srcpad in tee.iterate_src_pads() { + let Ok(srcpad) = srcpad else { + continue; + }; + + if let Some(sinkpad) = srcpad.peer() { + let _ = srcpad.unlink(&sinkpad); + } + } + } } fn expose_unsynced_pads( @@ -2030,16 +2289,12 @@ impl TranscriberSinkPadState { Ok(()) } - fn link_transcriber_pads( + fn link_transcription_channel( &self, - topbin: &super::TranscriberBin, - srcpad_name: &str, channel: &TranscriptionChannel, state: &State, passthrough: bool, ) -> Result<(), Error> { - self.expose_unsynced_pads(topbin, state, srcpad_name)?; - let srcpad = gst::GhostPad::builder_with_target(&channel.bin.static_pad("src").unwrap()) .unwrap() .name(format!("src_{}", channel.language)) @@ -2057,6 +2312,39 @@ impl TranscriberSinkPadState { Ok(()) } + + fn expose_synthesis_pads( + &self, + topbin: &super::TranscriberBin, + state: &State, + channel: &SynthesisChannel, + ) -> Result<(), Error> { + let mut pad_name = format!("src_synthesis_{}", channel.language); + let srcpad = gst::GhostPad::builder_with_target(&channel.bin.static_pad("src").unwrap()) + .unwrap() + .name(&pad_name) + .build(); + + self.transcription_bin.add_pad(&srcpad)?; + + if let Some(serial) = self.serial { + pad_name = format!("{}_{}", pad_name, serial); + } + + let srcpad = gst::GhostPad::builder_with_target(&srcpad) + .unwrap() + .name(pad_name.clone()) + .build(); + state.transcription_bin.add_pad(&srcpad)?; + + let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); + state.internal_bin.add_pad(&srcpad)?; + + let srcpad = gst::GhostPad::with_target(&srcpad).unwrap(); + topbin.add_pad(&srcpad)?; + + Ok(()) + } } pub struct TranscriberSinkPad { @@ -2109,6 +2397,11 @@ impl ObjectImpl for TranscriberSinkPad { .blurb("The transcriber element to use") .mutable_ready() .build(), + glib::ParamSpecBoxed::builder::("synthesis-languages") + .nick("Synthesis languages") + .blurb("A map of language codes to bin descriptions, e.g. synthesis-languages=\"languages, fr=awspolly\" will use the awspolly element to synthesize speech from French translations") + .mutable_playing() + .build(), ] }); @@ -2165,6 +2458,24 @@ impl ObjectImpl for TranscriberSinkPad { this.imp().update_languages(&self.obj(), false); } } + "synthesis-languages" => { + let mut settings = self.settings.lock().unwrap(); + settings.synthesis_languages = value + .get::>() + .expect("type checked upstream"); + gst::debug!( + CAT, + imp = self, + "Updated synthesis-languages {:?}", + settings.synthesis_languages + ); + + drop(settings); + + if let Some(this) = self.obj().parent().and_downcast::() { + this.imp().update_languages(&self.obj(), false); + } + } "mode" => { let mut settings = self.settings.lock().unwrap(); @@ -2255,6 +2566,10 @@ impl ObjectImpl for TranscriberSinkPad { let settings = self.settings.lock().unwrap(); settings.translation_languages.to_value() } + "synthesis-languages" => { + let settings = self.settings.lock().unwrap(); + settings.synthesis_languages.to_value() + } "language-code" => { let settings = self.settings.lock().unwrap(); settings.language_code.to_value()