From 635a83a8d779a803f27ed484ec26647a5bd4130a Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Wed, 16 Oct 2024 19:37:39 +0200 Subject: [PATCH] transcriberbin: notify passthrough at the appropriate time We want to enable passthrough internally, and only notify that internally it has been enabled once the transcriber has been unlinked. This way applications connected to the notify handler can synchronously update the properties and attempt to disable passthrough again. Doing so properly requires a refactoring of the transition to the passthrough state, with the currently set passthrough mode maintained separately from the target passthrough state. This commit also finishes the work left incomplete in 17d79971375edae005ff012fe3dffe97ca041b99 by moving the passthrough property to the sink pad class, making each transcriber passthrough state independent from the others. Also adds an example to demonstrate the behavior Part-of: --- Cargo.lock | 1 + docs/plugins/gst_plugins_cache.json | 24 +- video/closedcaption/Cargo.toml | 4 + .../examples/passthrough-notify.rs | 184 ++++++++ video/closedcaption/src/transcriberbin/imp.rs | 431 +++++++++++------- 5 files changed, 478 insertions(+), 166 deletions(-) create mode 100644 video/closedcaption/examples/passthrough-notify.rs diff --git a/Cargo.lock b/Cargo.lock index a8da69b8..69878499 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2481,6 +2481,7 @@ dependencies = [ "cea608-types", "cea708-types", "chrono", + "clap", "either", "gst-plugin-version-helper", "gstreamer", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index efbf8ea6..b5944592 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -7759,18 +7759,6 @@ "type": "GstTranscriberBinMuxMethod", "writable": true }, - "passthrough": { - "blurb": "Whether transcription should occur", - "conditionally-available": false, - "construct": false, - "construct-only": false, - "controllable": false, - "default": "false", - "mutable": "playing", - "readable": true, - "type": "gboolean", - "writable": true - }, "transcriber": { "blurb": "The transcriber element to use", "conditionally-available": false, @@ -8153,6 +8141,18 @@ "type": "GstTtToCea608Mode", "writable": true }, + "passthrough": { + "blurb": "Whether transcription should occur", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "playing", + "readable": true, + "type": "gboolean", + "writable": true + }, "transcriber": { "blurb": "The transcriber element to use", "conditionally-available": false, diff --git a/video/closedcaption/Cargo.toml b/video/closedcaption/Cargo.toml index 4ba26015..b5ea19f9 100644 --- a/video/closedcaption/Cargo.toml +++ b/video/closedcaption/Cargo.toml @@ -33,6 +33,7 @@ bitstream-io = "2.3" pretty_assertions = "1" rand = { version = "0.8", features = ["small_rng"] } gst-check.workspace = true +clap = { version = "4", features = ["derive"] } [lib] name = "gstrsclosedcaption" @@ -62,3 +63,6 @@ import_library = false [package.metadata.capi.pkg_config] requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0, pango, pangocairo, cairo-gobject" + +[[example]] +name = "passthrough-notify" diff --git a/video/closedcaption/examples/passthrough-notify.rs b/video/closedcaption/examples/passthrough-notify.rs new file mode 100644 index 00000000..e6d4d977 --- /dev/null +++ b/video/closedcaption/examples/passthrough-notify.rs @@ -0,0 +1,184 @@ +// This example creates a pipeline that will decode a file, pipe its audio +// and video streams through transcriberbin, and display the result +// with closed captions overlaid. +// +// At first the (AWS) transcriber will not have its access keys set, which +// means it should automatically go to passthrough=true. +// +// At this point we set the access keys and disable passthrough again. +// +// The expected result is for the terminal to display +// "Access key set, disabling passthrough" and for closed captions to be +// overlaid over the video. + +use anyhow::Error; +use clap::Parser; +use gst::glib; +use gst::prelude::*; + +#[derive(Debug, Default, Clone, clap::Parser)] +struct Args { + #[clap(long, help = "URI to transcribe")] + pub uri: String, + + #[clap(long, help = "Access key ID")] + pub access_key_id: String, + + #[clap(long, help = "Secret access key")] + pub secret_access_key: String, +} + +fn link_video_stream( + pipeline: &gst::Pipeline, + transcriberbin: &gst::Element, + pad: &gst::Pad, +) -> Result<(), Error> { + let conv = gst::ElementFactory::make("videoconvert").build()?; + + pipeline.add(&conv)?; + + conv.sync_state_with_parent()?; + + pad.link(&conv.static_pad("sink").unwrap())?; + + conv.link_pads(None, transcriberbin, Some("sink_video"))?; + + Ok(()) +} + +fn link_audio_stream( + pipeline: &gst::Pipeline, + transcriberbin: &gst::Element, + pad: &gst::Pad, +) -> Result<(), Error> { + let conv = gst::ElementFactory::make("audioconvert").build()?; + + pipeline.add(&conv)?; + + conv.sync_state_with_parent()?; + + pad.link(&conv.static_pad("sink").unwrap())?; + + conv.link_pads(None, transcriberbin, Some("sink_audio"))?; + + Ok(()) +} + +fn main() -> Result<(), Error> { + let args = Args::parse(); + + gst::init()?; + + let pipeline = gst::Pipeline::builder().build(); + + let uridecodebin = gst::ElementFactory::make("uridecodebin") + .property("uri", &args.uri) + .build()?; + let transcriberbin = gst::ElementFactory::make("transcriberbin").build()?; + let asink = gst::ElementFactory::make("fakesink").build()?; + let overlay = gst::ElementFactory::make("cea608overlay").build()?; + let vconv = gst::ElementFactory::make("videoconvert").build()?; + let vsink = gst::ElementFactory::make("autovideosink").build()?; + + uridecodebin.connect_pad_added(glib::clone!( + #[weak] + pipeline, + #[weak] + transcriberbin, + move |_element, pad| { + if pad + .current_caps() + .map(|c| c.structure(0).unwrap().name().starts_with("video/")) + .unwrap_or(false) + { + link_video_stream(&pipeline, &transcriberbin, pad) + .expect("Failed to link video stream"); + } else { + link_audio_stream(&pipeline, &transcriberbin, pad) + .expect("Failed to link audio stream"); + } + } + )); + + transcriberbin + .static_pad("sink_audio") + .unwrap() + .connect_closure( + "notify::passthrough", + false, + glib::closure!( + #[strong] + args, + move |pad: &gst::Pad, _pspec: &gst::glib::ParamSpec| { + let passthrough = pad.property::("passthrough"); + if passthrough { + let transcriber = pad.property::("transcriber"); + transcriber.set_property("access-key", &args.access_key_id); + transcriber.set_property("secret-access-key", &args.secret_access_key); + + eprintln!( + "Access key set, disabling passthrough, transcriber state: {:?}", + transcriber.state(gst::ClockTime::NONE) + ); + + pad.set_property("passthrough", false); + } + } + ), + ); + + pipeline.add_many([ + &uridecodebin, + &transcriberbin, + &asink, + &overlay, + &vconv, + &vsink, + ])?; + + transcriberbin.link_pads(Some("src_audio"), &asink, None)?; + transcriberbin.link_pads(Some("src_video"), &overlay, None)?; + + gst::Element::link_many([&overlay, &vconv, &vsink])?; + + pipeline.set_state(gst::State::Playing)?; + + let bus = pipeline.bus().expect("Pipeline should have a bus"); + + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + println!("EOS"); + break; + } + MessageView::StateChanged(sc) => { + if msg.src() == Some(pipeline.upcast_ref()) { + pipeline.debug_to_dot_file( + gst::DebugGraphDetails::all(), + format!("{}-{:?}-{:?}", pipeline.name(), sc.old(), sc.current()), + ); + } + } + MessageView::Error(err) => { + pipeline.debug_to_dot_file(gst::DebugGraphDetails::ALL, "error"); + pipeline.set_state(gst::State::Null)?; + eprintln!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error(), + err.debug().unwrap_or_else(|| "".into()), + ); + break; + } + _ => (), + } + } + + pipeline.set_state(gst::State::Null)?; + + Ok(()) +} diff --git a/video/closedcaption/src/transcriberbin/imp.rs b/video/closedcaption/src/transcriberbin/imp.rs index 0ff8d3ae..a896ed5d 100644 --- a/video/closedcaption/src/transcriberbin/imp.rs +++ b/video/closedcaption/src/transcriberbin/imp.rs @@ -38,6 +38,12 @@ const DEFAULT_MUX_METHOD: MuxMethod = MuxMethod::Cea608; const CEAX08MUX_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(100); +enum TargetPassthroughState { + None, + Enabled, + Disabled, +} + /* One per language, including original */ struct TranscriptionChannel { bin: gst::Bin, @@ -79,7 +85,6 @@ impl TranscriptionChannel { struct State { mux_method: MuxMethod, framerate: Option, - tearing_down: usize, internal_bin: gst::Bin, video_queue: gst::Element, ccmux: gst::Element, @@ -96,7 +101,6 @@ struct Settings { cc_caps: gst::Caps, latency: gst::ClockTime, translate_latency: gst::ClockTime, - passthrough: bool, accumulate_time: gst::ClockTime, caption_source: CaptionSource, mux_method: MuxMethod, @@ -108,7 +112,6 @@ impl Default for Settings { cc_caps: gst::Caps::builder("closedcaption/x-cea-608") .field("format", "raw") .build(), - passthrough: DEFAULT_PASSTHROUGH, latency: DEFAULT_LATENCY, translate_latency: DEFAULT_TRANSLATE_LATENCY, accumulate_time: DEFAULT_ACCUMULATE, @@ -536,35 +539,32 @@ impl TranscriberBin { .set_property("max-size-time", max_size_time); } - if !settings.passthrough { - gst::debug!( - CAT, - imp = self, - "Linking transcription bins and synchronizing state" - ); - state + gst::debug!( + CAT, + imp = self, + "Linking transcription bins and synchronizing state" + ); + state + .transcription_bin + .link_pads(Some("src"), &state.cccombiner, Some("caption")) + .unwrap(); + + state.transcription_bin.set_locked_state(false); + state.transcription_bin.sync_state_with_parent().unwrap(); + + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + pad_state.transcription_bin.set_locked_state(false); + pad_state .transcription_bin - .link_pads(Some("src"), &state.cccombiner, Some("caption")) + .sync_state_with_parent() .unwrap(); - - state.transcription_bin.set_locked_state(false); - state.transcription_bin.sync_state_with_parent().unwrap(); - - for pad in state.audio_sink_pads.values() { - let ps = pad.imp().state.lock().unwrap(); - let pad_state = ps.as_ref().unwrap(); - pad_state.transcription_bin.set_locked_state(false); - pad_state - .transcription_bin - .sync_state_with_parent() - .unwrap(); - let transcription_sink_pad = - state.transcription_bin.static_pad(&pad.name()).unwrap(); - // Might be linked already if "translation-languages" is set - if transcription_sink_pad.peer().is_none() { - let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); - audio_tee_pad.link(&transcription_sink_pad).unwrap(); - } + let transcription_sink_pad = state.transcription_bin.static_pad(&pad.name()).unwrap(); + // Might be linked already if "translation-languages" is set + if transcription_sink_pad.peer().is_none() { + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); + audio_tee_pad.link(&transcription_sink_pad).unwrap(); } } @@ -576,92 +576,161 @@ impl TranscriberBin { } } - fn disable_transcription_bin(&self, state: &mut State) { - // At this point, we want to check whether passthrough - // has been unset in the meantime - let passthrough = self.settings.lock().unwrap().passthrough; + fn disable_transcription_bin( + &self, + pad: &super::TranscriberSinkPad, + state: &mut State, + pad_state: &mut TranscriberSinkPadState, + ) { + gst::debug!(CAT, imp = self, "disabling transcription bin"); - if passthrough { - gst::debug!(CAT, imp = self, "disabling transcription bin"); - - for pad in state.audio_sink_pads.values() { - let ps = pad.imp().state.lock().unwrap(); - let pad_state = ps.as_ref().unwrap(); - let bin_sink_pad = state.transcription_bin.static_pad(&pad.name()).unwrap(); - if let Some(audio_tee_pad) = bin_sink_pad.peer() { - audio_tee_pad.unlink(&bin_sink_pad).unwrap(); - pad_state.audio_tee.release_request_pad(&audio_tee_pad); - } - } - - let bin_src_pad = state.transcription_bin.static_pad("src").unwrap(); - if let Some(cccombiner_pad) = bin_src_pad.peer() { - bin_src_pad.unlink(&cccombiner_pad).unwrap(); - state.cccombiner.release_request_pad(&cccombiner_pad); - } - - state.transcription_bin.set_locked_state(true); - state.transcription_bin.set_state(gst::State::Null).unwrap(); + let bin_sink_pad = state.transcription_bin.static_pad(&pad.name()).unwrap(); + if let Some(audio_tee_pad) = bin_sink_pad.peer() { + audio_tee_pad.unlink(&bin_sink_pad).unwrap(); + pad_state.audio_tee.release_request_pad(&audio_tee_pad); } + + for channel in pad_state.transcription_channels.values() { + let srcpad = pad_state + .transcription_bin + .static_pad(&format!("src_{}", channel.language)) + .unwrap(); + + if let Some(sinkpad) = state.ccmux.static_pad(&channel.ccmux_pad_name) { + srcpad.unlink(&sinkpad).unwrap(); + state.ccmux.release_request_pad(&sinkpad); + } + } + + pad_state.transcription_bin.set_locked_state(true); + pad_state + .transcription_bin + .set_state(gst::State::Null) + .unwrap(); } - fn block_and_update(&self, passthrough: bool) { - let mut s = self.state.lock().unwrap(); + fn enable_transcription_bin( + &self, + sinkpad: &TranscriberSinkPad, + state: &mut State, + pad_state: &mut TranscriberSinkPadState, + ) { + for channel in pad_state.transcription_channels.values() { + let srcpad = pad_state + .transcription_bin + .static_pad(&format!("src_{}", channel.language)) + .unwrap(); + let sinkpad = state + .ccmux + .static_pad(&channel.ccmux_pad_name) + .unwrap_or_else(|| { + state + .ccmux + .request_pad_simple(&channel.ccmux_pad_name) + .unwrap() + }); + srcpad.link(&sinkpad).unwrap(); + } + pad_state.transcription_bin.set_locked_state(false); + pad_state + .transcription_bin + .sync_state_with_parent() + .unwrap(); + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); + let transcription_sink_pad = state + .transcription_bin + .static_pad(&sinkpad.obj().name()) + .unwrap(); + audio_tee_pad.link(&transcription_sink_pad).unwrap(); + pad_state.target_passthrough_state = TargetPassthroughState::None; + } - if let Some(ref mut state) = s.as_mut() { - if passthrough { - state.tearing_down = state.audio_sink_pads.len(); - let sinkpads = state.audio_sink_pads.clone(); + fn block_and_update( + &self, + sinkpad: &TranscriberSinkPad, + mut s: std::sync::MutexGuard>, + mut ps: std::sync::MutexGuard>, + ) { + let Some(ref mut state) = s.as_mut() else { + return; + }; + + let Ok(ref mut pad_state) = ps.as_mut() else { + return; + }; + + match pad_state.target_passthrough_state { + TargetPassthroughState::Enabled => { drop(s); - for sinkpad in sinkpads.values() { - let imp_weak = self.downgrade(); - let _ = sinkpad.add_probe( - gst::PadProbeType::IDLE - | gst::PadProbeType::BUFFER - | gst::PadProbeType::EVENT_DOWNSTREAM, - move |_pad, _info| { - let Some(imp) = imp_weak.upgrade() else { - return gst::PadProbeReturn::Remove; - }; + drop(ps); + let imp_weak = self.downgrade(); + let _ = sinkpad.obj().add_probe( + gst::PadProbeType::IDLE + | gst::PadProbeType::BUFFER + | gst::PadProbeType::EVENT_DOWNSTREAM, + move |pad, _info| { + let Some(imp) = imp_weak.upgrade() else { + return gst::PadProbeReturn::Remove; + }; - let mut s = imp.state.lock().unwrap(); + let mut s = imp.state.lock().unwrap(); - if let Some(ref mut state) = s.as_mut() { - state.tearing_down -= 1; - if state.tearing_down == 0 { - imp.disable_transcription_bin(state); + let Some(ref mut state) = s.as_mut() else { + return gst::PadProbeReturn::Remove; + }; + + let pad_imp = pad + .downcast_ref::() + .unwrap() + .imp(); + let mut ps = pad_imp.state.lock().unwrap(); + let Ok(ref mut pad_state) = ps.as_mut() else { + return gst::PadProbeReturn::Remove; + }; + + match pad_state.target_passthrough_state { + TargetPassthroughState::Enabled => { + imp.disable_transcription_bin(pad, state, pad_state); + pad_state.target_passthrough_state = TargetPassthroughState::None; + // Now that we are done, make sure that this is reflected in our settings + let notify = { + let mut pad_settings = pad_imp.settings.lock().unwrap(); + let old_passthrough = pad_settings.passthrough; + pad_settings.passthrough = true; + !old_passthrough + }; + + if notify { + // We cannot notify from this thread, as this could cause + // the user to reset the passthrough property, which would + // in turn hang at link time (the probe would fire again + // and deadlock) + let pad_weak = pad.downgrade(); + imp.obj().call_async(move |_| { + let Some(pad) = pad_weak.upgrade() else { + return; + }; + pad.notify("passthrough"); + }); } } + TargetPassthroughState::Disabled => { + // Now at this point, even though we initially blocked in + // order to enable passthrough, the user may instead have + // requested disabling it in the meantime. + imp.enable_transcription_bin(pad_imp, state, pad_state); + } + _ => (), + } - gst::PadProbeReturn::Remove - }, - ); - } - } else if state.tearing_down > 0 { - // Do nothing, wait for the previous transcription bin - // to finish tearing down - } else { - state - .transcription_bin - .link_pads(Some("src"), &state.cccombiner, Some("caption")) - .unwrap(); - state.transcription_bin.set_locked_state(false); - state.transcription_bin.sync_state_with_parent().unwrap(); - - for pad in state.audio_sink_pads.values() { - let ps = pad.imp().state.lock().unwrap(); - let pad_state = ps.as_ref().unwrap(); - pad_state.transcription_bin.set_locked_state(false); - pad_state - .transcription_bin - .sync_state_with_parent() - .unwrap(); - let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); - let transcription_sink_pad = - state.transcription_bin.static_pad(&pad.name()).unwrap(); - audio_tee_pad.link(&transcription_sink_pad).unwrap(); - } + gst::PadProbeReturn::Remove + }, + ); } + TargetPassthroughState::Disabled => { + self.enable_transcription_bin(sinkpad, state, pad_state); + } + _ => (), } } @@ -857,7 +926,7 @@ impl TranscriberBin { } if lang_code_only { - if !settings.passthrough { + if !pad_settings.passthrough { gst::debug!(CAT, imp = self, "Syncing state with parent"); drop(settings); @@ -942,7 +1011,7 @@ impl TranscriberBin { self.setup_cc_mode(&pad.obj(), pad_state, state.mux_method, pad_settings.mode); - if !settings.passthrough { + if !pad_settings.passthrough { gst::debug!(CAT, imp = self, "Syncing state with parent"); let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); @@ -1036,6 +1105,16 @@ impl TranscriberBin { false } + fn all_sinks_are_passthrough(&self, state: &State) -> bool { + for pad in state.audio_sink_pads.values() { + let pad_settings = pad.imp().settings.lock().unwrap(); + if !pad_settings.passthrough { + return false; + } + } + true + } + #[allow(clippy::single_match)] fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { use gst::QueryViewMut; @@ -1051,16 +1130,20 @@ impl TranscriberBin { if ret { let (_, mut min, _) = upstream_query.result(); let state = self.state.lock().unwrap(); - let (received_framerate, translating) = { + let (received_framerate, translating, all_passthrough) = { if let Some(state) = state.as_ref() { - (state.framerate, self.any_sink_is_translating(state)) + ( + state.framerate, + self.any_sink_is_translating(state), + self.all_sinks_are_passthrough(state), + ) } else { - (None, false) + (None, false, true) } }; let settings = self.settings.lock().unwrap(); - if settings.passthrough || received_framerate.is_none() { + if all_passthrough || received_framerate.is_none() { min += settings.latency + settings.accumulate_time + CEAX08MUX_LATENCY; if translating { @@ -1103,12 +1186,23 @@ impl TranscriberBin { let settings = self.settings.lock().unwrap(); let mux_method = settings.mux_method; + let has_force_live = |factory_name: &str| { + gst::ElementFactory::find(factory_name) + .and_then(|f| f.load().ok()) + .map(|f| f.element_type()) + .and_then(glib::Class::::from_type) + .map(|k| k.has_property("force-live", Some(bool::static_type()))) + .unwrap_or(false) + }; + let ccmux = match mux_method { MuxMethod::Cea608 => gst::ElementFactory::make("cea608mux") .property_from_str("start-time-selection", "first") + .property_if("force-live", true, has_force_live("cea608mux")) .build()?, MuxMethod::Cea708 => gst::ElementFactory::make("cea708mux") .property_from_str("start-time-selection", "first") + .property_if("force-live", true, has_force_live("cea708mux")) .build()?, }; let ccmux_filter = gst::ElementFactory::make("capsfilter").build()?; @@ -1142,7 +1236,6 @@ impl TranscriberBin { transcription_bin, cccapsfilter, transcription_valve, - tearing_down: 0, audio_serial: 0, audio_sink_pads, }) @@ -1273,12 +1366,6 @@ impl ObjectImpl for TranscriberBin { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: LazyLock> = LazyLock::new(|| { vec![ - glib::ParamSpecBoolean::builder("passthrough") - .nick("Passthrough") - .blurb("Whether transcription should occur") - .default_value(DEFAULT_PASSTHROUGH) - .mutable_playing() - .build(), glib::ParamSpecUInt::builder("latency") .nick("Latency") .blurb("Amount of milliseconds to allow the transcriber") @@ -1346,18 +1433,6 @@ impl ObjectImpl for TranscriberBin { fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { - "passthrough" => { - let mut settings = self.settings.lock().unwrap(); - - let old_passthrough = settings.passthrough; - let new_passthrough = value.get().expect("type checked upstream"); - settings.passthrough = new_passthrough; - - if old_passthrough != new_passthrough { - drop(settings); - self.block_and_update(new_passthrough); - } - } "latency" => { let mut settings = self.settings.lock().unwrap(); settings.latency = gst::ClockTime::from_mseconds( @@ -1437,10 +1512,6 @@ impl ObjectImpl for TranscriberBin { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { - "passthrough" => { - let settings = self.settings.lock().unwrap(); - settings.passthrough.to_value() - } "latency" => { let settings = self.settings.lock().unwrap(); (settings.latency.mseconds() as u32).to_value() @@ -1798,27 +1869,42 @@ impl BinImpl for TranscriberBin { match msg.view() { MessageView::Error(m) => { - let pad = self - .audio_sinkpad - .downcast_ref::() - .unwrap(); - let ps = pad.imp().state.lock().unwrap(); - let pad_state = ps.as_ref().unwrap(); - if msg.src() == pad_state.transcriber.as_ref().map(|t| t.upcast_ref()) { - gst::error!( - CAT, - imp = self, - "Transcriber has posted an error ({m:?}), going back to passthrough", - ); - drop(ps); - self.settings.lock().unwrap().passthrough = true; - self.obj().notify("passthrough"); - self.obj().call_async(move |bin| { - let thiz = bin.imp(); - thiz.block_and_update(true); - }); - } else { - drop(ps); + let mut s = self.state.lock().unwrap(); + let Some(state) = s.as_mut() else { + drop(s); + self.parent_handle_message(msg); + return; + }; + let mut handled = false; + for pad in state.audio_sink_pads.values() { + let mut ps = pad.imp().state.lock().unwrap(); + let Ok(pad_state) = ps.as_mut() else { + continue; + }; + if msg.src() == pad_state.transcriber.as_ref().map(|t| t.upcast_ref()) { + gst::error!( + CAT, + imp = self, + "Transcriber has posted an error ({m:?}), going back to passthrough", + ); + pad_state.target_passthrough_state = TargetPassthroughState::Enabled; + let pad_weak = pad.downgrade(); + self.obj().call_async(move |bin| { + let Some(pad) = pad_weak.upgrade() else { + return; + }; + let thiz = bin.imp(); + let s = thiz.state.lock().unwrap(); + let ps = pad.imp().state.lock().unwrap(); + thiz.block_and_update(pad.imp(), s, ps); + }); + handled = true; + break; + } + } + + if !handled { + drop(s); self.parent_handle_message(msg); } } @@ -1832,6 +1918,7 @@ struct TranscriberSinkPadSettings { translation_languages: Option, language_code: String, mode: Cea608Mode, + passthrough: bool, } impl Default for TranscriberSinkPadSettings { @@ -1840,6 +1927,7 @@ impl Default for TranscriberSinkPadSettings { translation_languages: None, language_code: String::from(DEFAULT_INPUT_LANG_CODE), mode: DEFAULT_MODE, + passthrough: DEFAULT_PASSTHROUGH, } } } @@ -1855,6 +1943,7 @@ struct TranscriberSinkPadState { queue_passthrough: gst::Element, transcription_channels: HashMap, srcpad_name: Option, + target_passthrough_state: TargetPassthroughState, } impl TranscriberSinkPadState { @@ -1882,6 +1971,7 @@ impl TranscriberSinkPadState { queue_passthrough: gst::ElementFactory::make("queue").build()?, transcription_channels: HashMap::new(), srcpad_name: None, + target_passthrough_state: TargetPassthroughState::None, }) } } @@ -1909,6 +1999,12 @@ impl ObjectImpl for TranscriberSinkPad { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: LazyLock> = LazyLock::new(|| { vec![ + glib::ParamSpecBoolean::builder("passthrough") + .nick("Passthrough") + .blurb("Whether transcription should occur") + .default_value(DEFAULT_PASSTHROUGH) + .mutable_playing() + .build(), glib::ParamSpecBoxed::builder::("translation-languages") .nick("Translation languages") .blurb("A map of language codes to caption channels, e.g. translation-languages=\"languages, transcript={CC1, 708_1}, fr={708_2, CC3}\" will map the French translation to CC1/service 1 and the original transcript to CC3/service 2") @@ -1938,6 +2034,29 @@ impl ObjectImpl for TranscriberSinkPad { fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { + "passthrough" => { + let Some(parent) = self.obj().parent().and_downcast::() + else { + return; + }; + let s = parent.imp().state.lock().unwrap(); + let mut ps = self.state.lock().unwrap(); + let mut pad_settings = self.settings.lock().unwrap(); + pad_settings.passthrough = value.get().expect("type checked upstream"); + + let Ok(ref mut pad_state) = ps.as_mut() else { + return; + }; + + if pad_settings.passthrough { + pad_state.target_passthrough_state = TargetPassthroughState::Enabled; + } else { + pad_state.target_passthrough_state = TargetPassthroughState::Disabled; + } + drop(pad_settings); + + parent.imp().block_and_update(self, s, ps); + } "translation-languages" => { let mut settings = self.settings.lock().unwrap(); settings.translation_languages = value @@ -2038,6 +2157,10 @@ impl ObjectImpl for TranscriberSinkPad { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { + "passthrough" => { + let settings = self.settings.lock().unwrap(); + settings.passthrough.to_value() + } "translation-languages" => { let settings = self.settings.lock().unwrap(); settings.translation_languages.to_value()