diff --git a/Cargo.lock b/Cargo.lock index a8da69b8c..698784991 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2481,6 +2481,7 @@ dependencies = [ "cea608-types", "cea708-types", "chrono", + "clap", "either", "gst-plugin-version-helper", "gstreamer", diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index efbf8ea66..b59445928 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -7759,18 +7759,6 @@ "type": "GstTranscriberBinMuxMethod", "writable": true }, - "passthrough": { - "blurb": "Whether transcription should occur", - "conditionally-available": false, - "construct": false, - "construct-only": false, - "controllable": false, - "default": "false", - "mutable": "playing", - "readable": true, - "type": "gboolean", - "writable": true - }, "transcriber": { "blurb": "The transcriber element to use", "conditionally-available": false, @@ -8153,6 +8141,18 @@ "type": "GstTtToCea608Mode", "writable": true }, + "passthrough": { + "blurb": "Whether transcription should occur", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "false", + "mutable": "playing", + "readable": true, + "type": "gboolean", + "writable": true + }, "transcriber": { "blurb": "The transcriber element to use", "conditionally-available": false, diff --git a/video/closedcaption/Cargo.toml b/video/closedcaption/Cargo.toml index 4ba260150..b5ea19f97 100644 --- a/video/closedcaption/Cargo.toml +++ b/video/closedcaption/Cargo.toml @@ -33,6 +33,7 @@ bitstream-io = "2.3" pretty_assertions = "1" rand = { version = "0.8", features = ["small_rng"] } gst-check.workspace = true +clap = { version = "4", features = ["derive"] } [lib] name = "gstrsclosedcaption" @@ -62,3 +63,6 @@ import_library = false [package.metadata.capi.pkg_config] requires_private = "gstreamer-1.0, gstreamer-base-1.0, gstreamer-video-1.0, gobject-2.0, glib-2.0, gmodule-2.0, pango, pangocairo, cairo-gobject" + +[[example]] +name = "passthrough-notify" diff --git a/video/closedcaption/examples/passthrough-notify.rs b/video/closedcaption/examples/passthrough-notify.rs new file mode 100644 index 000000000..e6d4d9772 --- /dev/null +++ b/video/closedcaption/examples/passthrough-notify.rs @@ -0,0 +1,184 @@ +// This example creates a pipeline that will decode a file, pipe its audio +// and video streams through transcriberbin, and display the result +// with closed captions overlaid. +// +// At first the (AWS) transcriber will not have its access keys set, which +// means it should automatically go to passthrough=true. +// +// At this point we set the access keys and disable passthrough again. +// +// The expected result is for the terminal to display +// "Access key set, disabling passthrough" and for closed captions to be +// overlaid over the video. + +use anyhow::Error; +use clap::Parser; +use gst::glib; +use gst::prelude::*; + +#[derive(Debug, Default, Clone, clap::Parser)] +struct Args { + #[clap(long, help = "URI to transcribe")] + pub uri: String, + + #[clap(long, help = "Access key ID")] + pub access_key_id: String, + + #[clap(long, help = "Secret access key")] + pub secret_access_key: String, +} + +fn link_video_stream( + pipeline: &gst::Pipeline, + transcriberbin: &gst::Element, + pad: &gst::Pad, +) -> Result<(), Error> { + let conv = gst::ElementFactory::make("videoconvert").build()?; + + pipeline.add(&conv)?; + + conv.sync_state_with_parent()?; + + pad.link(&conv.static_pad("sink").unwrap())?; + + conv.link_pads(None, transcriberbin, Some("sink_video"))?; + + Ok(()) +} + +fn link_audio_stream( + pipeline: &gst::Pipeline, + transcriberbin: &gst::Element, + pad: &gst::Pad, +) -> Result<(), Error> { + let conv = gst::ElementFactory::make("audioconvert").build()?; + + pipeline.add(&conv)?; + + conv.sync_state_with_parent()?; + + pad.link(&conv.static_pad("sink").unwrap())?; + + conv.link_pads(None, transcriberbin, Some("sink_audio"))?; + + Ok(()) +} + +fn main() -> Result<(), Error> { + let args = Args::parse(); + + gst::init()?; + + let pipeline = gst::Pipeline::builder().build(); + + let uridecodebin = gst::ElementFactory::make("uridecodebin") + .property("uri", &args.uri) + .build()?; + let transcriberbin = gst::ElementFactory::make("transcriberbin").build()?; + let asink = gst::ElementFactory::make("fakesink").build()?; + let overlay = gst::ElementFactory::make("cea608overlay").build()?; + let vconv = gst::ElementFactory::make("videoconvert").build()?; + let vsink = gst::ElementFactory::make("autovideosink").build()?; + + uridecodebin.connect_pad_added(glib::clone!( + #[weak] + pipeline, + #[weak] + transcriberbin, + move |_element, pad| { + if pad + .current_caps() + .map(|c| c.structure(0).unwrap().name().starts_with("video/")) + .unwrap_or(false) + { + link_video_stream(&pipeline, &transcriberbin, pad) + .expect("Failed to link video stream"); + } else { + link_audio_stream(&pipeline, &transcriberbin, pad) + .expect("Failed to link audio stream"); + } + } + )); + + transcriberbin + .static_pad("sink_audio") + .unwrap() + .connect_closure( + "notify::passthrough", + false, + glib::closure!( + #[strong] + args, + move |pad: &gst::Pad, _pspec: &gst::glib::ParamSpec| { + let passthrough = pad.property::("passthrough"); + if passthrough { + let transcriber = pad.property::("transcriber"); + transcriber.set_property("access-key", &args.access_key_id); + transcriber.set_property("secret-access-key", &args.secret_access_key); + + eprintln!( + "Access key set, disabling passthrough, transcriber state: {:?}", + transcriber.state(gst::ClockTime::NONE) + ); + + pad.set_property("passthrough", false); + } + } + ), + ); + + pipeline.add_many([ + &uridecodebin, + &transcriberbin, + &asink, + &overlay, + &vconv, + &vsink, + ])?; + + transcriberbin.link_pads(Some("src_audio"), &asink, None)?; + transcriberbin.link_pads(Some("src_video"), &overlay, None)?; + + gst::Element::link_many([&overlay, &vconv, &vsink])?; + + pipeline.set_state(gst::State::Playing)?; + + let bus = pipeline.bus().expect("Pipeline should have a bus"); + + for msg in bus.iter_timed(gst::ClockTime::NONE) { + use gst::MessageView; + + match msg.view() { + MessageView::Eos(..) => { + println!("EOS"); + break; + } + MessageView::StateChanged(sc) => { + if msg.src() == Some(pipeline.upcast_ref()) { + pipeline.debug_to_dot_file( + gst::DebugGraphDetails::all(), + format!("{}-{:?}-{:?}", pipeline.name(), sc.old(), sc.current()), + ); + } + } + MessageView::Error(err) => { + pipeline.debug_to_dot_file(gst::DebugGraphDetails::ALL, "error"); + pipeline.set_state(gst::State::Null)?; + eprintln!( + "Got error from {}: {} ({})", + msg.src() + .map(|s| String::from(s.path_string())) + .unwrap_or_else(|| "None".into()), + err.error(), + err.debug().unwrap_or_else(|| "".into()), + ); + break; + } + _ => (), + } + } + + pipeline.set_state(gst::State::Null)?; + + Ok(()) +} diff --git a/video/closedcaption/src/transcriberbin/imp.rs b/video/closedcaption/src/transcriberbin/imp.rs index 0ff8d3aeb..a896ed5d3 100644 --- a/video/closedcaption/src/transcriberbin/imp.rs +++ b/video/closedcaption/src/transcriberbin/imp.rs @@ -38,6 +38,12 @@ const DEFAULT_MUX_METHOD: MuxMethod = MuxMethod::Cea608; const CEAX08MUX_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(100); +enum TargetPassthroughState { + None, + Enabled, + Disabled, +} + /* One per language, including original */ struct TranscriptionChannel { bin: gst::Bin, @@ -79,7 +85,6 @@ impl TranscriptionChannel { struct State { mux_method: MuxMethod, framerate: Option, - tearing_down: usize, internal_bin: gst::Bin, video_queue: gst::Element, ccmux: gst::Element, @@ -96,7 +101,6 @@ struct Settings { cc_caps: gst::Caps, latency: gst::ClockTime, translate_latency: gst::ClockTime, - passthrough: bool, accumulate_time: gst::ClockTime, caption_source: CaptionSource, mux_method: MuxMethod, @@ -108,7 +112,6 @@ impl Default for Settings { cc_caps: gst::Caps::builder("closedcaption/x-cea-608") .field("format", "raw") .build(), - passthrough: DEFAULT_PASSTHROUGH, latency: DEFAULT_LATENCY, translate_latency: DEFAULT_TRANSLATE_LATENCY, accumulate_time: DEFAULT_ACCUMULATE, @@ -536,35 +539,32 @@ impl TranscriberBin { .set_property("max-size-time", max_size_time); } - if !settings.passthrough { - gst::debug!( - CAT, - imp = self, - "Linking transcription bins and synchronizing state" - ); - state + gst::debug!( + CAT, + imp = self, + "Linking transcription bins and synchronizing state" + ); + state + .transcription_bin + .link_pads(Some("src"), &state.cccombiner, Some("caption")) + .unwrap(); + + state.transcription_bin.set_locked_state(false); + state.transcription_bin.sync_state_with_parent().unwrap(); + + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + pad_state.transcription_bin.set_locked_state(false); + pad_state .transcription_bin - .link_pads(Some("src"), &state.cccombiner, Some("caption")) + .sync_state_with_parent() .unwrap(); - - state.transcription_bin.set_locked_state(false); - state.transcription_bin.sync_state_with_parent().unwrap(); - - for pad in state.audio_sink_pads.values() { - let ps = pad.imp().state.lock().unwrap(); - let pad_state = ps.as_ref().unwrap(); - pad_state.transcription_bin.set_locked_state(false); - pad_state - .transcription_bin - .sync_state_with_parent() - .unwrap(); - let transcription_sink_pad = - state.transcription_bin.static_pad(&pad.name()).unwrap(); - // Might be linked already if "translation-languages" is set - if transcription_sink_pad.peer().is_none() { - let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); - audio_tee_pad.link(&transcription_sink_pad).unwrap(); - } + let transcription_sink_pad = state.transcription_bin.static_pad(&pad.name()).unwrap(); + // Might be linked already if "translation-languages" is set + if transcription_sink_pad.peer().is_none() { + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); + audio_tee_pad.link(&transcription_sink_pad).unwrap(); } } @@ -576,92 +576,161 @@ impl TranscriberBin { } } - fn disable_transcription_bin(&self, state: &mut State) { - // At this point, we want to check whether passthrough - // has been unset in the meantime - let passthrough = self.settings.lock().unwrap().passthrough; + fn disable_transcription_bin( + &self, + pad: &super::TranscriberSinkPad, + state: &mut State, + pad_state: &mut TranscriberSinkPadState, + ) { + gst::debug!(CAT, imp = self, "disabling transcription bin"); - if passthrough { - gst::debug!(CAT, imp = self, "disabling transcription bin"); - - for pad in state.audio_sink_pads.values() { - let ps = pad.imp().state.lock().unwrap(); - let pad_state = ps.as_ref().unwrap(); - let bin_sink_pad = state.transcription_bin.static_pad(&pad.name()).unwrap(); - if let Some(audio_tee_pad) = bin_sink_pad.peer() { - audio_tee_pad.unlink(&bin_sink_pad).unwrap(); - pad_state.audio_tee.release_request_pad(&audio_tee_pad); - } - } - - let bin_src_pad = state.transcription_bin.static_pad("src").unwrap(); - if let Some(cccombiner_pad) = bin_src_pad.peer() { - bin_src_pad.unlink(&cccombiner_pad).unwrap(); - state.cccombiner.release_request_pad(&cccombiner_pad); - } - - state.transcription_bin.set_locked_state(true); - state.transcription_bin.set_state(gst::State::Null).unwrap(); + let bin_sink_pad = state.transcription_bin.static_pad(&pad.name()).unwrap(); + if let Some(audio_tee_pad) = bin_sink_pad.peer() { + audio_tee_pad.unlink(&bin_sink_pad).unwrap(); + pad_state.audio_tee.release_request_pad(&audio_tee_pad); } + + for channel in pad_state.transcription_channels.values() { + let srcpad = pad_state + .transcription_bin + .static_pad(&format!("src_{}", channel.language)) + .unwrap(); + + if let Some(sinkpad) = state.ccmux.static_pad(&channel.ccmux_pad_name) { + srcpad.unlink(&sinkpad).unwrap(); + state.ccmux.release_request_pad(&sinkpad); + } + } + + pad_state.transcription_bin.set_locked_state(true); + pad_state + .transcription_bin + .set_state(gst::State::Null) + .unwrap(); } - fn block_and_update(&self, passthrough: bool) { - let mut s = self.state.lock().unwrap(); + fn enable_transcription_bin( + &self, + sinkpad: &TranscriberSinkPad, + state: &mut State, + pad_state: &mut TranscriberSinkPadState, + ) { + for channel in pad_state.transcription_channels.values() { + let srcpad = pad_state + .transcription_bin + .static_pad(&format!("src_{}", channel.language)) + .unwrap(); + let sinkpad = state + .ccmux + .static_pad(&channel.ccmux_pad_name) + .unwrap_or_else(|| { + state + .ccmux + .request_pad_simple(&channel.ccmux_pad_name) + .unwrap() + }); + srcpad.link(&sinkpad).unwrap(); + } + pad_state.transcription_bin.set_locked_state(false); + pad_state + .transcription_bin + .sync_state_with_parent() + .unwrap(); + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); + let transcription_sink_pad = state + .transcription_bin + .static_pad(&sinkpad.obj().name()) + .unwrap(); + audio_tee_pad.link(&transcription_sink_pad).unwrap(); + pad_state.target_passthrough_state = TargetPassthroughState::None; + } - if let Some(ref mut state) = s.as_mut() { - if passthrough { - state.tearing_down = state.audio_sink_pads.len(); - let sinkpads = state.audio_sink_pads.clone(); + fn block_and_update( + &self, + sinkpad: &TranscriberSinkPad, + mut s: std::sync::MutexGuard>, + mut ps: std::sync::MutexGuard>, + ) { + let Some(ref mut state) = s.as_mut() else { + return; + }; + + let Ok(ref mut pad_state) = ps.as_mut() else { + return; + }; + + match pad_state.target_passthrough_state { + TargetPassthroughState::Enabled => { drop(s); - for sinkpad in sinkpads.values() { - let imp_weak = self.downgrade(); - let _ = sinkpad.add_probe( - gst::PadProbeType::IDLE - | gst::PadProbeType::BUFFER - | gst::PadProbeType::EVENT_DOWNSTREAM, - move |_pad, _info| { - let Some(imp) = imp_weak.upgrade() else { - return gst::PadProbeReturn::Remove; - }; + drop(ps); + let imp_weak = self.downgrade(); + let _ = sinkpad.obj().add_probe( + gst::PadProbeType::IDLE + | gst::PadProbeType::BUFFER + | gst::PadProbeType::EVENT_DOWNSTREAM, + move |pad, _info| { + let Some(imp) = imp_weak.upgrade() else { + return gst::PadProbeReturn::Remove; + }; - let mut s = imp.state.lock().unwrap(); + let mut s = imp.state.lock().unwrap(); - if let Some(ref mut state) = s.as_mut() { - state.tearing_down -= 1; - if state.tearing_down == 0 { - imp.disable_transcription_bin(state); + let Some(ref mut state) = s.as_mut() else { + return gst::PadProbeReturn::Remove; + }; + + let pad_imp = pad + .downcast_ref::() + .unwrap() + .imp(); + let mut ps = pad_imp.state.lock().unwrap(); + let Ok(ref mut pad_state) = ps.as_mut() else { + return gst::PadProbeReturn::Remove; + }; + + match pad_state.target_passthrough_state { + TargetPassthroughState::Enabled => { + imp.disable_transcription_bin(pad, state, pad_state); + pad_state.target_passthrough_state = TargetPassthroughState::None; + // Now that we are done, make sure that this is reflected in our settings + let notify = { + let mut pad_settings = pad_imp.settings.lock().unwrap(); + let old_passthrough = pad_settings.passthrough; + pad_settings.passthrough = true; + !old_passthrough + }; + + if notify { + // We cannot notify from this thread, as this could cause + // the user to reset the passthrough property, which would + // in turn hang at link time (the probe would fire again + // and deadlock) + let pad_weak = pad.downgrade(); + imp.obj().call_async(move |_| { + let Some(pad) = pad_weak.upgrade() else { + return; + }; + pad.notify("passthrough"); + }); } } + TargetPassthroughState::Disabled => { + // Now at this point, even though we initially blocked in + // order to enable passthrough, the user may instead have + // requested disabling it in the meantime. + imp.enable_transcription_bin(pad_imp, state, pad_state); + } + _ => (), + } - gst::PadProbeReturn::Remove - }, - ); - } - } else if state.tearing_down > 0 { - // Do nothing, wait for the previous transcription bin - // to finish tearing down - } else { - state - .transcription_bin - .link_pads(Some("src"), &state.cccombiner, Some("caption")) - .unwrap(); - state.transcription_bin.set_locked_state(false); - state.transcription_bin.sync_state_with_parent().unwrap(); - - for pad in state.audio_sink_pads.values() { - let ps = pad.imp().state.lock().unwrap(); - let pad_state = ps.as_ref().unwrap(); - pad_state.transcription_bin.set_locked_state(false); - pad_state - .transcription_bin - .sync_state_with_parent() - .unwrap(); - let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); - let transcription_sink_pad = - state.transcription_bin.static_pad(&pad.name()).unwrap(); - audio_tee_pad.link(&transcription_sink_pad).unwrap(); - } + gst::PadProbeReturn::Remove + }, + ); } + TargetPassthroughState::Disabled => { + self.enable_transcription_bin(sinkpad, state, pad_state); + } + _ => (), } } @@ -857,7 +926,7 @@ impl TranscriberBin { } if lang_code_only { - if !settings.passthrough { + if !pad_settings.passthrough { gst::debug!(CAT, imp = self, "Syncing state with parent"); drop(settings); @@ -942,7 +1011,7 @@ impl TranscriberBin { self.setup_cc_mode(&pad.obj(), pad_state, state.mux_method, pad_settings.mode); - if !settings.passthrough { + if !pad_settings.passthrough { gst::debug!(CAT, imp = self, "Syncing state with parent"); let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); @@ -1036,6 +1105,16 @@ impl TranscriberBin { false } + fn all_sinks_are_passthrough(&self, state: &State) -> bool { + for pad in state.audio_sink_pads.values() { + let pad_settings = pad.imp().settings.lock().unwrap(); + if !pad_settings.passthrough { + return false; + } + } + true + } + #[allow(clippy::single_match)] fn src_query(&self, pad: &gst::Pad, query: &mut gst::QueryRef) -> bool { use gst::QueryViewMut; @@ -1051,16 +1130,20 @@ impl TranscriberBin { if ret { let (_, mut min, _) = upstream_query.result(); let state = self.state.lock().unwrap(); - let (received_framerate, translating) = { + let (received_framerate, translating, all_passthrough) = { if let Some(state) = state.as_ref() { - (state.framerate, self.any_sink_is_translating(state)) + ( + state.framerate, + self.any_sink_is_translating(state), + self.all_sinks_are_passthrough(state), + ) } else { - (None, false) + (None, false, true) } }; let settings = self.settings.lock().unwrap(); - if settings.passthrough || received_framerate.is_none() { + if all_passthrough || received_framerate.is_none() { min += settings.latency + settings.accumulate_time + CEAX08MUX_LATENCY; if translating { @@ -1103,12 +1186,23 @@ impl TranscriberBin { let settings = self.settings.lock().unwrap(); let mux_method = settings.mux_method; + let has_force_live = |factory_name: &str| { + gst::ElementFactory::find(factory_name) + .and_then(|f| f.load().ok()) + .map(|f| f.element_type()) + .and_then(glib::Class::::from_type) + .map(|k| k.has_property("force-live", Some(bool::static_type()))) + .unwrap_or(false) + }; + let ccmux = match mux_method { MuxMethod::Cea608 => gst::ElementFactory::make("cea608mux") .property_from_str("start-time-selection", "first") + .property_if("force-live", true, has_force_live("cea608mux")) .build()?, MuxMethod::Cea708 => gst::ElementFactory::make("cea708mux") .property_from_str("start-time-selection", "first") + .property_if("force-live", true, has_force_live("cea708mux")) .build()?, }; let ccmux_filter = gst::ElementFactory::make("capsfilter").build()?; @@ -1142,7 +1236,6 @@ impl TranscriberBin { transcription_bin, cccapsfilter, transcription_valve, - tearing_down: 0, audio_serial: 0, audio_sink_pads, }) @@ -1273,12 +1366,6 @@ impl ObjectImpl for TranscriberBin { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: LazyLock> = LazyLock::new(|| { vec![ - glib::ParamSpecBoolean::builder("passthrough") - .nick("Passthrough") - .blurb("Whether transcription should occur") - .default_value(DEFAULT_PASSTHROUGH) - .mutable_playing() - .build(), glib::ParamSpecUInt::builder("latency") .nick("Latency") .blurb("Amount of milliseconds to allow the transcriber") @@ -1346,18 +1433,6 @@ impl ObjectImpl for TranscriberBin { fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { - "passthrough" => { - let mut settings = self.settings.lock().unwrap(); - - let old_passthrough = settings.passthrough; - let new_passthrough = value.get().expect("type checked upstream"); - settings.passthrough = new_passthrough; - - if old_passthrough != new_passthrough { - drop(settings); - self.block_and_update(new_passthrough); - } - } "latency" => { let mut settings = self.settings.lock().unwrap(); settings.latency = gst::ClockTime::from_mseconds( @@ -1437,10 +1512,6 @@ impl ObjectImpl for TranscriberBin { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { - "passthrough" => { - let settings = self.settings.lock().unwrap(); - settings.passthrough.to_value() - } "latency" => { let settings = self.settings.lock().unwrap(); (settings.latency.mseconds() as u32).to_value() @@ -1798,27 +1869,42 @@ impl BinImpl for TranscriberBin { match msg.view() { MessageView::Error(m) => { - let pad = self - .audio_sinkpad - .downcast_ref::() - .unwrap(); - let ps = pad.imp().state.lock().unwrap(); - let pad_state = ps.as_ref().unwrap(); - if msg.src() == pad_state.transcriber.as_ref().map(|t| t.upcast_ref()) { - gst::error!( - CAT, - imp = self, - "Transcriber has posted an error ({m:?}), going back to passthrough", - ); - drop(ps); - self.settings.lock().unwrap().passthrough = true; - self.obj().notify("passthrough"); - self.obj().call_async(move |bin| { - let thiz = bin.imp(); - thiz.block_and_update(true); - }); - } else { - drop(ps); + let mut s = self.state.lock().unwrap(); + let Some(state) = s.as_mut() else { + drop(s); + self.parent_handle_message(msg); + return; + }; + let mut handled = false; + for pad in state.audio_sink_pads.values() { + let mut ps = pad.imp().state.lock().unwrap(); + let Ok(pad_state) = ps.as_mut() else { + continue; + }; + if msg.src() == pad_state.transcriber.as_ref().map(|t| t.upcast_ref()) { + gst::error!( + CAT, + imp = self, + "Transcriber has posted an error ({m:?}), going back to passthrough", + ); + pad_state.target_passthrough_state = TargetPassthroughState::Enabled; + let pad_weak = pad.downgrade(); + self.obj().call_async(move |bin| { + let Some(pad) = pad_weak.upgrade() else { + return; + }; + let thiz = bin.imp(); + let s = thiz.state.lock().unwrap(); + let ps = pad.imp().state.lock().unwrap(); + thiz.block_and_update(pad.imp(), s, ps); + }); + handled = true; + break; + } + } + + if !handled { + drop(s); self.parent_handle_message(msg); } } @@ -1832,6 +1918,7 @@ struct TranscriberSinkPadSettings { translation_languages: Option, language_code: String, mode: Cea608Mode, + passthrough: bool, } impl Default for TranscriberSinkPadSettings { @@ -1840,6 +1927,7 @@ impl Default for TranscriberSinkPadSettings { translation_languages: None, language_code: String::from(DEFAULT_INPUT_LANG_CODE), mode: DEFAULT_MODE, + passthrough: DEFAULT_PASSTHROUGH, } } } @@ -1855,6 +1943,7 @@ struct TranscriberSinkPadState { queue_passthrough: gst::Element, transcription_channels: HashMap, srcpad_name: Option, + target_passthrough_state: TargetPassthroughState, } impl TranscriberSinkPadState { @@ -1882,6 +1971,7 @@ impl TranscriberSinkPadState { queue_passthrough: gst::ElementFactory::make("queue").build()?, transcription_channels: HashMap::new(), srcpad_name: None, + target_passthrough_state: TargetPassthroughState::None, }) } } @@ -1909,6 +1999,12 @@ impl ObjectImpl for TranscriberSinkPad { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: LazyLock> = LazyLock::new(|| { vec![ + glib::ParamSpecBoolean::builder("passthrough") + .nick("Passthrough") + .blurb("Whether transcription should occur") + .default_value(DEFAULT_PASSTHROUGH) + .mutable_playing() + .build(), glib::ParamSpecBoxed::builder::("translation-languages") .nick("Translation languages") .blurb("A map of language codes to caption channels, e.g. translation-languages=\"languages, transcript={CC1, 708_1}, fr={708_2, CC3}\" will map the French translation to CC1/service 1 and the original transcript to CC3/service 2") @@ -1938,6 +2034,29 @@ impl ObjectImpl for TranscriberSinkPad { fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { + "passthrough" => { + let Some(parent) = self.obj().parent().and_downcast::() + else { + return; + }; + let s = parent.imp().state.lock().unwrap(); + let mut ps = self.state.lock().unwrap(); + let mut pad_settings = self.settings.lock().unwrap(); + pad_settings.passthrough = value.get().expect("type checked upstream"); + + let Ok(ref mut pad_state) = ps.as_mut() else { + return; + }; + + if pad_settings.passthrough { + pad_state.target_passthrough_state = TargetPassthroughState::Enabled; + } else { + pad_state.target_passthrough_state = TargetPassthroughState::Disabled; + } + drop(pad_settings); + + parent.imp().block_and_update(self, s, ps); + } "translation-languages" => { let mut settings = self.settings.lock().unwrap(); settings.translation_languages = value @@ -2038,6 +2157,10 @@ impl ObjectImpl for TranscriberSinkPad { fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { match pspec.name() { + "passthrough" => { + let settings = self.settings.lock().unwrap(); + settings.passthrough.to_value() + } "translation-languages" => { let settings = self.settings.lock().unwrap(); settings.translation_languages.to_value()