From 17d79971375edae005ff012fe3dffe97ca041b99 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Fri, 19 Apr 2024 20:12:46 +0200 Subject: [PATCH] transcriberbin: add support for consuming secondary audio streams In some situations, a translated alternate audio stream for a content might be available. Instead of going through transcription and translation of the original audio stream, it may be preferrable for accuracy purposes to simply transcribe the secondary audio stream. This MR adds support for doing just that: * Secondary audio sink pads can be requested as "sink_audio_%u" * Sometimes audio source pads are added at that point to pass through the audio, as "src_audio_%u" * The main transcription bin now contains per-input stream transcription bins. Those can be individually controlled through properties on the sink pads, for instance translation-languages can be dynamically set per audio stream * Some properties that originally existed on the main element still remain, but are now simply mapped to the always audio sink pad * Releasing of secondary sink pads is nominally implemented, but not tested in states other than NULL An example launch line for this would be: ``` $ gst-launch-1.0 transcriberbin name=transcriberbin latency=8000 accumulate-time=0 \ cc-caps="closedcaption/x-cea-708, format=cc_data" sink_audio_0::language-code="es-US" \ sink_audio_0::translation-languages="languages, transcript=cc3" uridecodebin uri=file:///home/meh/Music/chaplin.mkv name=d d. ! videoconvert ! transcriberbin.sink_video d. ! clocksync ! audioconvert ! transcriberbin.sink_audio transcriberbin.src_video ! cea608overlay field=1 ! videoconvert ! autovideosink \ transcriberbin.src_audio ! audioconvert ! fakesink \ uridecodebin uri=file:///home/meh/Music/chaplin-spanish.webm name=d2 \ d2. ! audioconvert ! transcriberbin.sink_audio_0 \ transcriberbin.src_audio_0 ! fakesink ``` Part-of: --- docs/plugins/gst_plugins_cache.json | 89 +- video/closedcaption/src/transcriberbin/imp.rs | 1298 ++++++++++++----- video/closedcaption/src/transcriberbin/mod.rs | 12 +- 3 files changed, 1031 insertions(+), 368 deletions(-) diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index fcc4bcd1..c5f304c4 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -5585,7 +5585,14 @@ "sink_audio": { "caps": "audio/x-raw:\n", "direction": "sink", - "presence": "always" + "presence": "always", + "type": "GstTranscriberSinkPad" + }, + "sink_audio_%%u": { + "caps": "audio/x-raw:\n", + "direction": "sink", + "presence": "request", + "type": "GstTranscriberSinkPad" }, "sink_video": { "caps": "video/x-raw(ANY):\n", @@ -5597,6 +5604,12 @@ "direction": "src", "presence": "always" }, + "src_audio_%%u": { + "caps": "audio/x-raw:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstTranscriberSrcPad" + }, "src_video": { "caps": "video/x-raw(ANY):\n", "direction": "src", @@ -5735,6 +5748,7 @@ "construct": false, "construct-only": false, "controllable": false, + "default": "languages, transcript=(string)cc1;", "mutable": "playing", "readable": true, "type": "GstStructure", @@ -6038,6 +6052,79 @@ } ] }, + "GstTranscriberSinkPad": { + "hierarchy": [ + "GstTranscriberSinkPad", + "GstGhostPad", + "GstProxyPad", + "GstPad", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "kind": "object", + "properties": { + "language-code": { + "blurb": "The language of the input stream", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "en-US", + "mutable": "playing", + "readable": true, + "type": "gchararray", + "writable": true + }, + "mode": { + "blurb": "Which closed caption mode to operate in", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "roll-up2 (2)", + "mutable": "playing", + "readable": true, + "type": "GstTtToCea608Mode", + "writable": true + }, + "transcriber": { + "blurb": "The transcriber element to use", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "ready", + "readable": true, + "type": "GstElement", + "writable": true + }, + "translation-languages": { + "blurb": "A map of language codes to caption channels, e.g. translation-languages=\"languages, transcript={CC1, 708_1}, fr={708_2, CC3}\" will map the French translation to CC1/service 1 and the original transcript to CC3/service 2", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "languages, transcript=(string)cc1;", + "mutable": "playing", + "readable": true, + "type": "GstStructure", + "writable": true + } + } + }, + "GstTranscriberSrcPad": { + "hierarchy": [ + "GstTranscriberSrcPad", + "GstGhostPad", + "GstProxyPad", + "GstPad", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "kind": "object" + }, "GstTtToCea608Mode": { "kind": "enum", "values": [ diff --git a/video/closedcaption/src/transcriberbin/imp.rs b/video/closedcaption/src/transcriberbin/imp.rs index 7b85b946..03a20ae9 100644 --- a/video/closedcaption/src/transcriberbin/imp.rs +++ b/video/closedcaption/src/transcriberbin/imp.rs @@ -68,24 +68,22 @@ impl TranscriptionChannel { } } +/* Locking order: State, Settings, PadState, PadSettings */ + struct State { mux_method: MuxMethod, framerate: Option, - tearing_down: bool, + tearing_down: usize, internal_bin: gst::Bin, - audio_queue_passthrough: gst::Element, video_queue: gst::Element, - audio_tee: gst::Element, - transcriber_resample: gst::Element, - transcriber_aconv: gst::Element, - transcriber: gst::Element, ccmux: gst::Element, ccmux_filter: gst::Element, cccombiner: gst::Element, transcription_bin: gst::Bin, - transcription_channels: HashMap, cccapsfilter: gst::Element, transcription_valve: gst::Element, + audio_serial: u32, + audio_sink_pads: HashMap, } struct Settings { @@ -94,10 +92,7 @@ struct Settings { translate_latency: gst::ClockTime, passthrough: bool, accumulate_time: gst::ClockTime, - mode: Cea608Mode, caption_source: CaptionSource, - translation_languages: Option, - language_code: String, mux_method: MuxMethod, } @@ -111,10 +106,7 @@ impl Default for Settings { latency: DEFAULT_LATENCY, translate_latency: DEFAULT_TRANSLATE_LATENCY, accumulate_time: DEFAULT_ACCUMULATE, - mode: DEFAULT_MODE, caption_source: DEFAULT_CAPTION_SOURCE, - translation_languages: None, - language_code: String::from(DEFAULT_INPUT_LANG_CODE), mux_method: DEFAULT_MUX_METHOD, } } @@ -235,9 +227,34 @@ impl TranscriberBin { }) } - fn construct_transcription_bin(&self, state: &mut State) -> Result<(), Error> { - gst::debug!(CAT, imp: self, "Building transcription bin"); + fn link_input_audio_stream( + &self, + pad_name: &str, + pad_state: &TranscriberSinkPadState, + state: &mut State, + ) -> Result<(), Error> { + gst::debug!(CAT, imp: self, "Linking input audio stream {pad_name}"); + state + .internal_bin + .add_many([ + &pad_state.clocksync, + &pad_state.identity, + &pad_state.audio_tee, + &pad_state.queue_passthrough, + ]) + .unwrap(); + gst::Element::link_many([ + &pad_state.clocksync, + &pad_state.identity, + &pad_state.audio_tee, + ])?; + pad_state.audio_tee.link_pads( + Some("src_%u"), + &pad_state.queue_passthrough, + Some("sink"), + )?; + state.transcription_bin.add(&pad_state.transcription_bin)?; let aqueue_transcription = gst::ElementFactory::make("queue") .name("transqueue") .property("max-size-buffers", 0u32) @@ -245,58 +262,86 @@ impl TranscriberBin { .property("max-size-time", 5_000_000_000u64) .property_from_str("leaky", "downstream") .build()?; - let ccconverter = gst::ElementFactory::make("ccconverter").build()?; - state.transcription_bin.add_many([ + pad_state.transcription_bin.add_many([ &aqueue_transcription, - &state.transcriber_resample, - &state.transcriber_aconv, - &state.transcriber, - &state.ccmux, - &state.ccmux_filter, - &ccconverter, - &state.cccapsfilter, - &state.transcription_valve, + &pad_state.transcriber_resample, + &pad_state.transcriber_aconv, + &pad_state.transcriber, ])?; gst::Element::link_many([ &aqueue_transcription, - &state.transcriber_resample, - &state.transcriber_aconv, - &state.transcriber, + &pad_state.transcriber_resample, + &pad_state.transcriber_aconv, + &pad_state.transcriber, ])?; - gst::Element::link_many([ - &state.ccmux, - &state.ccmux_filter, - &ccconverter, - &state.cccapsfilter, - &state.transcription_valve, - ])?; - - for channel in state.transcription_channels.values() { - state.transcription_bin.add(&channel.bin)?; - - channel.link_transcriber(&state.transcriber)?; - - let ccmux_pad = state - .ccmux - .request_pad_simple(&channel.ccmux_pad_name) - .ok_or(anyhow!("Failed to request ccmux sink pad"))?; - channel.bin.static_pad("src").unwrap().link(&ccmux_pad)?; - } - - state.ccmux.set_property("latency", CEAX08MUX_LATENCY); - let transcription_audio_sinkpad = - gst::GhostPad::with_target(&aqueue_transcription.static_pad("sink").unwrap()).unwrap(); - let transcription_audio_srcpad = - gst::GhostPad::with_target(&state.transcription_valve.static_pad("src").unwrap()) - .unwrap(); + gst::GhostPad::builder_with_target(&aqueue_transcription.static_pad("sink").unwrap()) + .unwrap() + .name(pad_name) + .build(); + + pad_state + .transcription_bin + .add_pad(&transcription_audio_sinkpad)?; + + let transcription_audio_sinkpad = + gst::GhostPad::with_target(&transcription_audio_sinkpad).unwrap(); state .transcription_bin .add_pad(&transcription_audio_sinkpad)?; + + for channel in pad_state.transcription_channels.values() { + pad_state.transcription_bin.add(&channel.bin)?; + + channel.link_transcriber(&pad_state.transcriber)?; + + let srcpad = + gst::GhostPad::with_target(&channel.bin.static_pad("src").unwrap()).unwrap(); + + pad_state.transcription_bin.add_pad(&srcpad)?; + if state.ccmux.static_pad(&channel.ccmux_pad_name).is_none() { + let ccmux_pad = state + .ccmux + .request_pad_simple(&channel.ccmux_pad_name) + .ok_or(anyhow!("Failed to request ccmux sink pad"))?; + srcpad.link(&ccmux_pad)?; + } + } + + Ok(()) + } + + fn construct_transcription_bin(&self, state: &mut State) -> Result<(), Error> { + gst::debug!(CAT, imp: self, "Building transcription bin"); + + let ccconverter = gst::ElementFactory::make("ccconverter").build()?; + + state.transcription_bin.add_many([ + &state.ccmux, + &state.ccmux_filter, + &ccconverter, + &state.cccapsfilter, + &state.transcription_valve, + ])?; + + gst::Element::link_many([ + &state.ccmux, + &state.ccmux_filter, + &ccconverter, + &state.cccapsfilter, + &state.transcription_valve, + ])?; + + state.ccmux.set_property("latency", CEAX08MUX_LATENCY); + + let transcription_audio_srcpad = + gst::GhostPad::with_target(&state.transcription_valve.static_pad("src").unwrap()) + .unwrap(); + state .transcription_bin .add_pad(&transcription_audio_srcpad)?; @@ -309,39 +354,19 @@ impl TranscriberBin { } fn construct_internal_bin(&self, state: &mut State) -> Result<(), Error> { - let aclocksync = gst::ElementFactory::make("clocksync").build()?; + let vclocksync = gst::ElementFactory::make("clocksync") + .name("vclocksync") + .build()?; - let vclocksync = gst::ElementFactory::make("clocksync").build()?; - - state.internal_bin.add_many([ - &aclocksync, - &state.audio_tee, - &state.audio_queue_passthrough, - &vclocksync, - &state.video_queue, - &state.cccombiner, - ])?; - - aclocksync.link(&state.audio_tee)?; state - .audio_tee - .link_pads(Some("src_%u"), &state.audio_queue_passthrough, Some("sink"))?; + .internal_bin + .add_many([&vclocksync, &state.video_queue, &state.cccombiner])?; + vclocksync.link(&state.video_queue)?; state .video_queue .link_pads(Some("src"), &state.cccombiner, Some("sink"))?; - let internal_audio_sinkpad = - gst::GhostPad::builder_with_target(&aclocksync.static_pad("sink").unwrap()) - .unwrap() - .name("audio_sink") - .build(); - let internal_audio_srcpad = gst::GhostPad::builder_with_target( - &state.audio_queue_passthrough.static_pad("src").unwrap(), - ) - .unwrap() - .name("audio_src") - .build(); let internal_video_sinkpad = gst::GhostPad::builder_with_target(&vclocksync.static_pad("sink").unwrap()) .unwrap() @@ -353,8 +378,6 @@ impl TranscriberBin { .name("video_src") .build(); - state.internal_bin.add_pad(&internal_audio_sinkpad)?; - state.internal_bin.add_pad(&internal_audio_srcpad)?; state.internal_bin.add_pad(&internal_video_sinkpad)?; state.internal_bin.add_pad(&internal_video_srcpad)?; @@ -385,10 +408,6 @@ impl TranscriberBin { state.cccombiner.set_property("latency", 100.mseconds()); - self.audio_sinkpad - .set_target(Some(&state.internal_bin.static_pad("audio_sink").unwrap()))?; - self.audio_srcpad - .set_target(Some(&state.internal_bin.static_pad("audio_src").unwrap()))?; self.video_sinkpad .set_target(Some(&state.internal_bin.static_pad("video_sink").unwrap()))?; self.video_srcpad @@ -396,6 +415,35 @@ impl TranscriberBin { self.construct_transcription_bin(state)?; + let pad = self + .audio_sinkpad + .downcast_ref::() + .unwrap(); + // FIXME: replace this pattern with https://doc.rust-lang.org/nightly/std/sync/struct.MappedMutexGuard.html + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + + self.link_input_audio_stream("sink_audio", pad_state, state)?; + + let internal_audio_sinkpad = + gst::GhostPad::builder_with_target(&pad_state.clocksync.static_pad("sink").unwrap()) + .unwrap() + .name("audio_sink") + .build(); + let internal_audio_srcpad = gst::GhostPad::builder_with_target( + &pad_state.queue_passthrough.static_pad("src").unwrap(), + ) + .unwrap() + .name("audio_src") + .build(); + + state.internal_bin.add_pad(&internal_audio_sinkpad)?; + state.internal_bin.add_pad(&internal_audio_srcpad)?; + + self.audio_sinkpad + .set_target(Some(&state.internal_bin.static_pad("audio_sink").unwrap()))?; + self.audio_srcpad + .set_target(Some(&state.internal_bin.static_pad("audio_src").unwrap()))?; Ok(()) } @@ -427,21 +475,42 @@ impl TranscriberBin { + settings.accumulate_time + CEAX08MUX_LATENCY; - for queue in [&state.audio_queue_passthrough, &state.video_queue] { - queue.set_property("max-size-bytes", 0u32); - queue.set_property("max-size-buffers", 0u32); - queue.set_property("max-size-time", max_size_time); + gst::debug!( + CAT, + "Calculated max size time for passthrough branches: {max_size_time}" + ); + + state.video_queue.set_property("max-size-bytes", 0u32); + state.video_queue.set_property("max-size-buffers", 0u32); + state + .video_queue + .set_property("max-size-time", max_size_time); + + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + let latency_ms = settings.latency.mseconds() as u32; + pad_state + .transcriber + .set_property("transcribe-latency", latency_ms); + + let translate_latency_ms = settings.translate_latency.mseconds() as u32; + pad_state + .transcriber + .set_property("translate-latency", translate_latency_ms); + pad_state + .queue_passthrough + .set_property("max-size-bytes", 0u32); + pad_state + .queue_passthrough + .set_property("max-size-buffers", 0u32); + pad_state + .queue_passthrough + .set_property("max-size-time", max_size_time); } - let latency_ms = settings.latency.mseconds() as u32; - state.transcriber.set_property("latency", latency_ms); - - let translate_latency_ms = settings.translate_latency.mseconds() as u32; - state - .transcriber - .set_property("translate-latency", translate_latency_ms); - if !settings.passthrough { + gst::debug!(CAT, imp: self, "Linking transcription bins and synchronizing state"); state .transcription_bin .link_pads(Some("src"), &state.cccombiner, Some("caption")) @@ -450,47 +519,58 @@ impl TranscriberBin { state.transcription_bin.set_locked_state(false); state.transcription_bin.sync_state_with_parent().unwrap(); - let transcription_sink_pad = state.transcription_bin.static_pad("sink").unwrap(); - // Might be linked already if "translation-languages" is set - if transcription_sink_pad.peer().is_none() { - let audio_tee_pad = state.audio_tee.request_pad_simple("src_%u").unwrap(); - audio_tee_pad.link(&transcription_sink_pad).unwrap(); + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + pad_state.transcription_bin.set_locked_state(false); + pad_state + .transcription_bin + .sync_state_with_parent() + .unwrap(); + let transcription_sink_pad = + state.transcription_bin.static_pad(&pad.name()).unwrap(); + // Might be linked already if "translation-languages" is set + if transcription_sink_pad.peer().is_none() { + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); + audio_tee_pad.link(&transcription_sink_pad).unwrap(); + } } } - drop(settings); - - self.setup_cc_mode(state); + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + let pad_settings = pad.imp().settings.lock().unwrap(); + self.setup_cc_mode(pad, pad_state, state.mux_method, pad_settings.mode); + } } - fn disable_transcription_bin(&self) { - let mut state = self.state.lock().unwrap(); + fn disable_transcription_bin(&self, state: &mut State) { + // At this point, we want to check whether passthrough + // has been unset in the meantime + let passthrough = self.settings.lock().unwrap().passthrough; - if let Some(ref mut state) = state.as_mut() { - state.tearing_down = false; + if passthrough { + gst::debug!(CAT, imp: self, "disabling transcription bin"); - // At this point, we want to check whether passthrough - // has been unset in the meantime - let passthrough = self.settings.lock().unwrap().passthrough; - - if passthrough { - gst::debug!(CAT, imp: self, "disabling transcription bin"); - - let bin_sink_pad = state.transcription_bin.static_pad("sink").unwrap(); + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + let bin_sink_pad = state.transcription_bin.static_pad(&pad.name()).unwrap(); if let Some(audio_tee_pad) = bin_sink_pad.peer() { audio_tee_pad.unlink(&bin_sink_pad).unwrap(); - state.audio_tee.release_request_pad(&audio_tee_pad); + pad_state.audio_tee.release_request_pad(&audio_tee_pad); } - - let bin_src_pad = state.transcription_bin.static_pad("src").unwrap(); - if let Some(cccombiner_pad) = bin_src_pad.peer() { - bin_src_pad.unlink(&cccombiner_pad).unwrap(); - state.cccombiner.release_request_pad(&cccombiner_pad); - } - - state.transcription_bin.set_locked_state(true); - state.transcription_bin.set_state(gst::State::Null).unwrap(); } + + let bin_src_pad = state.transcription_bin.static_pad("src").unwrap(); + if let Some(cccombiner_pad) = bin_src_pad.peer() { + bin_src_pad.unlink(&cccombiner_pad).unwrap(); + state.cccombiner.release_request_pad(&cccombiner_pad); + } + + state.transcription_bin.set_locked_state(true); + state.transcription_bin.set_state(gst::State::Null).unwrap(); } } @@ -499,25 +579,34 @@ impl TranscriberBin { if let Some(ref mut state) = s.as_mut() { if passthrough { - let sinkpad = state.transcription_bin.static_pad("sink").unwrap(); - let imp_weak = self.downgrade(); - state.tearing_down = true; + state.tearing_down = state.audio_sink_pads.len(); + let sinkpads = state.audio_sink_pads.clone(); drop(s); - let _ = sinkpad.add_probe( - gst::PadProbeType::IDLE - | gst::PadProbeType::BUFFER - | gst::PadProbeType::EVENT_DOWNSTREAM, - move |_pad, _info| { - let Some(imp) = imp_weak.upgrade() else { - return gst::PadProbeReturn::Remove; - }; + for sinkpad in sinkpads.values() { + let imp_weak = self.downgrade(); + let _ = sinkpad.add_probe( + gst::PadProbeType::IDLE + | gst::PadProbeType::BUFFER + | gst::PadProbeType::EVENT_DOWNSTREAM, + move |_pad, _info| { + let Some(imp) = imp_weak.upgrade() else { + return gst::PadProbeReturn::Remove; + }; - imp.disable_transcription_bin(); + let mut s = imp.state.lock().unwrap(); - gst::PadProbeReturn::Remove - }, - ); - } else if state.tearing_down { + if let Some(ref mut state) = s.as_mut() { + state.tearing_down -= 1; + if state.tearing_down == 0 { + imp.disable_transcription_bin(state); + } + } + + gst::PadProbeReturn::Remove + }, + ); + } + } else if state.tearing_down > 0 { // Do nothing, wait for the previous transcription bin // to finish tearing down } else { @@ -528,20 +617,29 @@ impl TranscriberBin { state.transcription_bin.set_locked_state(false); state.transcription_bin.sync_state_with_parent().unwrap(); - let audio_tee_pad = state.audio_tee.request_pad_simple("src_%u").unwrap(); - let transcription_sink_pad = state.transcription_bin.static_pad("sink").unwrap(); - audio_tee_pad.link(&transcription_sink_pad).unwrap(); + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); + let transcription_sink_pad = + state.transcription_bin.static_pad(&pad.name()).unwrap(); + audio_tee_pad.link(&transcription_sink_pad).unwrap(); + } } } } - fn setup_cc_mode(&self, state: &State) { - let mode = self.settings.lock().unwrap().mode; + fn setup_cc_mode( + &self, + pad: &super::TranscriberSinkPad, + pad_state: &TranscriberSinkPadState, + mux_method: MuxMethod, + mode: Cea608Mode, + ) { + gst::debug!(CAT, imp: self, "setting CC mode {:?} for pad {:?}", mode, pad); - gst::debug!(CAT, imp: self, "setting CC mode {:?}", mode); - - for channel in state.transcription_channels.values() { - match state.mux_method { + for channel in pad_state.transcription_channels.values() { + match mux_method { MuxMethod::Cea608 => channel.tttoceax08.set_property("mode", mode), MuxMethod::Cea708 => match mode { Cea608Mode::PopOn => channel.tttoceax08.set_property("mode", Cea708Mode::PopOn), @@ -571,6 +669,7 @@ impl TranscriberBin { fn relink_transcriber( &self, state: &mut State, + pad_state: &TranscriberSinkPadState, old_transcriber: &gst::Element, ) -> Result<(), Error> { gst::debug!( @@ -578,23 +677,23 @@ impl TranscriberBin { imp: self, "Relinking transcriber, old: {:?}, new: {:?}", old_transcriber, - state.transcriber + pad_state.transcriber ); - state.transcriber_aconv.unlink(old_transcriber); + pad_state.transcriber_aconv.unlink(old_transcriber); - for channel in state.transcription_channels.values() { + for channel in pad_state.transcription_channels.values() { old_transcriber.unlink(&channel.bin); } state.transcription_bin.remove(old_transcriber).unwrap(); old_transcriber.set_state(gst::State::Null).unwrap(); - state.transcription_bin.add(&state.transcriber)?; - state.transcriber.sync_state_with_parent().unwrap(); - state.transcriber_aconv.link(&state.transcriber)?; + state.transcription_bin.add(&pad_state.transcriber)?; + pad_state.transcriber.sync_state_with_parent().unwrap(); + pad_state.transcriber_aconv.link(&pad_state.transcriber)?; - for channel in state.transcription_channels.values() { - channel.link_transcriber(&state.transcriber)?; + for channel in pad_state.transcription_channels.values() { + channel.link_transcriber(&pad_state.transcriber)?; } Ok(()) @@ -602,7 +701,7 @@ impl TranscriberBin { fn construct_transcription_channels( &self, - settings: &Settings, + settings: &TranscriberSinkPadSettings, mux_method: MuxMethod, transcription_channels: &mut HashMap, ) -> Result<(), Error> { @@ -667,11 +766,18 @@ impl TranscriberBin { Ok(()) } - fn reconfigure_transcription_bin(&self, lang_code_only: bool) -> Result<(), Error> { + fn reconfigure_transcription_bin( + &self, + pad: &TranscriberSinkPad, + lang_code_only: bool, + ) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); if let Some(ref mut state) = state.as_mut() { let settings = self.settings.lock().unwrap(); + let mut ps = pad.state.lock().unwrap(); + let pad_state = ps.as_mut().unwrap(); + let pad_settings = pad.settings.lock().unwrap(); gst::debug!( CAT, @@ -680,20 +786,26 @@ impl TranscriberBin { ); // Unlink sinkpad temporarily - let sinkpad = state.transcription_bin.static_pad("sink").unwrap(); + let sinkpad = state + .transcription_bin + .static_pad(&pad.obj().name()) + .unwrap(); let peer = sinkpad.peer(); if let Some(peer) = &peer { gst::debug!(CAT, imp: self, "Unlinking {:?}", peer); peer.unlink(&sinkpad)?; - state.audio_tee.release_request_pad(peer); + pad_state.audio_tee.release_request_pad(peer); } - state.transcription_bin.set_locked_state(true); - state.transcription_bin.set_state(gst::State::Null).unwrap(); + pad_state.transcription_bin.set_locked_state(true); + pad_state + .transcription_bin + .set_state(gst::State::Null) + .unwrap(); - state + pad_state .transcriber - .set_property("language-code", &settings.language_code); + .set_property("language-code", &pad_settings.language_code); if lang_code_only { if !settings.passthrough { @@ -701,64 +813,89 @@ impl TranscriberBin { drop(settings); + // While we haven't locked the state here, the state of the + // top level transcription bin might be locked, for instance + // at start up. Unlock and sync both the inner and top level + // bin states to ensure data flows in the correct state state.transcription_bin.set_locked_state(false); - state.transcription_bin.sync_state_with_parent()?; + state.transcription_bin.sync_state_with_parent().unwrap(); - let audio_tee_pad = state.audio_tee.request_pad_simple("src_%u").unwrap(); + pad_state.transcription_bin.set_locked_state(false); + pad_state.transcription_bin.sync_state_with_parent()?; + + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); audio_tee_pad.link(&sinkpad)?; } return Ok(()); } - for channel in state.transcription_channels.values() { + for channel in pad_state.transcription_channels.values() { let sinkpad = channel.bin.static_pad("sink").unwrap(); if let Some(peer) = sinkpad.peer() { peer.unlink(&sinkpad)?; if channel.language != "transcript" { - state.transcriber.release_request_pad(&peer); + pad_state.transcriber.release_request_pad(&peer); } } let srcpad = channel.bin.static_pad("src").unwrap(); if let Some(peer) = srcpad.peer() { - srcpad.unlink(&peer)?; - state.ccmux.release_request_pad(&peer); + // The source pad might not have been linked to the muxer initially, for + // instance in case of a collision with another source pad's + // translation-languages mapping + if peer.parent().and_downcast_ref::() + == Some(state.ccmux.as_ref()) + { + srcpad.unlink(&peer)?; + state.ccmux.release_request_pad(&peer); + } } - state.transcription_bin.remove(&channel.bin)?; + pad_state.transcription_bin.remove(&channel.bin)?; } - state.transcription_channels.clear(); + pad_state.transcription_channels.clear(); self.construct_transcription_channels( - &settings, + &pad_settings, state.mux_method, - &mut state.transcription_channels, + &mut pad_state.transcription_channels, )?; - for channel in state.transcription_channels.values() { - state.transcription_bin.add(&channel.bin)?; + for channel in pad_state.transcription_channels.values() { + pad_state.transcription_bin.add(&channel.bin)?; - channel.link_transcriber(&state.transcriber)?; + channel.link_transcriber(&pad_state.transcriber)?; - let ccmux_pad = state - .ccmux - .request_pad_simple(&channel.ccmux_pad_name) - .ok_or(anyhow!("Failed to request ccmux sink pad"))?; - channel.bin.static_pad("src").unwrap().link(&ccmux_pad)?; + let srcpad = pad_state.transcription_bin.static_pad("src").unwrap(); + + srcpad + .downcast_ref::() + .unwrap() + .set_target(channel.bin.static_pad("src").as_ref())?; + + if state.ccmux.static_pad(&channel.ccmux_pad_name).is_none() { + let ccmux_pad = state + .ccmux + .request_pad_simple(&channel.ccmux_pad_name) + .ok_or(anyhow!("Failed to request ccmux sink pad"))?; + srcpad.link(&ccmux_pad)?; + } } - drop(settings); - self.setup_cc_mode(state); + self.setup_cc_mode(&pad.obj(), pad_state, state.mux_method, pad_settings.mode); - if !self.settings.lock().unwrap().passthrough { + if !settings.passthrough { gst::debug!(CAT, imp: self, "Syncing state with parent"); - state.transcription_bin.set_locked_state(false); - state.transcription_bin.sync_state_with_parent()?; + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); - let audio_tee_pad = state.audio_tee.request_pad_simple("src_%u").unwrap(); + drop(pad_settings); + drop(settings); + + pad_state.transcription_bin.set_locked_state(false); + pad_state.transcription_bin.sync_state_with_parent()?; audio_tee_pad.link(&sinkpad)?; } } @@ -766,43 +903,81 @@ impl TranscriberBin { Ok(()) } - fn update_languages(&self, lang_code_only: bool) { - let s = self.state.lock().unwrap(); + fn update_languages(&self, pad: &super::TranscriberSinkPad, lang_code_only: bool) { + gst::debug!( + CAT, + imp: self, + "Schedule transcription/translation language update for pad {pad:?}" + ); - if let Some(state) = s.as_ref() { - gst::debug!( - CAT, - imp: self, - "Schedule transcription/translation language update" - ); + let Some(sinkpad) = pad + .imp() + .state + .lock() + .unwrap() + .as_ref() + .unwrap() + .transcription_bin + .static_pad(pad.name().as_str()) + else { + gst::debug!(CAT, imp: pad.imp(), "transcription bin not set up yet"); + return; + }; - let sinkpad = state.transcription_bin.static_pad("sink").unwrap(); - let imp_weak = self.downgrade(); - drop(s); + let imp_weak = self.downgrade(); + let pad_weak = pad.downgrade(); - let _ = sinkpad.add_probe( - gst::PadProbeType::IDLE - | gst::PadProbeType::BUFFER - | gst::PadProbeType::EVENT_DOWNSTREAM, - move |_pad, _info| { - let Some(imp) = imp_weak.upgrade() else { - return gst::PadProbeReturn::Remove; - }; + let _ = sinkpad.add_probe( + gst::PadProbeType::IDLE + | gst::PadProbeType::BUFFER + | gst::PadProbeType::EVENT_DOWNSTREAM, + move |_pad, _info| { + let Some(imp) = imp_weak.upgrade() else { + return gst::PadProbeReturn::Remove; + }; - if imp.reconfigure_transcription_bin(lang_code_only).is_err() { - gst::element_imp_error!( - imp, - gst::StreamError::Failed, - ["Couldn't reconfigure channels"] - ); - } + let Some(pad) = pad_weak.upgrade() else { + return gst::PadProbeReturn::Remove; + }; - gst::PadProbeReturn::Remove - }, - ); - } else { - gst::debug!(CAT, imp: self, "Transcriber is not configured yet"); + if let Err(e) = imp.reconfigure_transcription_bin(pad.imp(), lang_code_only) { + gst::error!(CAT, "Couldn't reconfigure channels: {e}"); + gst::element_imp_error!( + imp, + gst::StreamError::Failed, + ["Couldn't reconfigure channels: {}", e] + ); + *imp.state.lock().unwrap() = None; + } + + gst::PadProbeReturn::Remove + }, + ); + } + + fn any_sink_is_translating(&self, state: &State) -> bool { + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + if pad_state + .transcription_channels + .values() + .any(|c| c.language != "transcript") + { + return true; + } } + false + } + + fn any_sink_is_rollup(&self, state: &State) -> bool { + for pad in state.audio_sink_pads.values() { + let pad_settings = pad.imp().settings.lock().unwrap(); + if pad_settings.mode.is_rollup() { + return true; + } + } + false } #[allow(clippy::single_match)] @@ -819,16 +994,10 @@ impl TranscriberBin { if ret { let (_, mut min, _) = upstream_query.result(); + let state = self.state.lock().unwrap(); let (received_framerate, translating) = { - let state = self.state.lock().unwrap(); if let Some(state) = state.as_ref() { - ( - state.framerate, - state - .transcription_channels - .values() - .any(|c| c.language != "transcript"), - ) + (state.framerate, self.any_sink_is_translating(state)) } else { (None, false) } @@ -848,8 +1017,10 @@ impl TranscriberBin { .mul_div_floor(framerate.denom() as u64, framerate.numer() as u64) .unwrap(); } - } else if settings.mode.is_rollup() { - min += settings.accumulate_time; + } else if let Some(state) = state.as_ref() { + if self.any_sink_is_rollup(state) { + min += settings.accumulate_time; + } } q.set(true, min, gst::ClockTime::NONE); @@ -864,24 +1035,9 @@ impl TranscriberBin { fn build_state(&self) -> Result { let internal_bin = gst::Bin::with_name("internal"); let transcription_bin = gst::Bin::with_name("transcription-bin"); - let audio_tee = gst::ElementFactory::make("tee") - // Protect passthrough enable (and resulting dynamic reconfigure) - // from non-streaming thread - .property("allow-not-linked", true) - .build()?; let cccombiner = gst::ElementFactory::make("cccombiner") .name("cccombiner") .build()?; - let transcriber_resample = gst::ElementFactory::make("audioresample").build()?; - let transcriber_aconv = gst::ElementFactory::make("audioconvert").build()?; - let transcriber = gst::ElementFactory::make("awstranscriber") - .name("transcriber") - .property( - "language-code", - &self.settings.lock().unwrap().language_code, - ) - .build()?; - let audio_queue_passthrough = gst::ElementFactory::make("queue").build()?; let video_queue = gst::ElementFactory::make("queue").build()?; let cccapsfilter = gst::ElementFactory::make("capsfilter").build()?; let transcription_valve = gst::ElementFactory::make("valve") @@ -901,33 +1057,36 @@ impl TranscriberBin { }; let ccmux_filter = gst::ElementFactory::make("capsfilter").build()?; - let mut transcription_channels = HashMap::new(); - + let pad = self + .audio_sinkpad + .clone() + .downcast::() + .unwrap(); + let mut audio_sink_pads = HashMap::new(); + audio_sink_pads.insert(self.audio_sinkpad.name().to_string(), pad.clone()); + let mut ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_mut().unwrap(); + let pad_settings = pad.imp().settings.lock().unwrap(); self.construct_transcription_channels( - &settings, + &pad_settings, settings.mux_method, - &mut transcription_channels, + &mut pad_state.transcription_channels, )?; - drop(settings); Ok(State { mux_method, framerate: None, internal_bin, - audio_queue_passthrough, video_queue, - transcriber_resample, - transcriber_aconv, - transcriber, ccmux, ccmux_filter, - audio_tee, cccombiner, transcription_bin, - transcription_channels, cccapsfilter, transcription_valve, - tearing_down: false, + tearing_down: 0, + audio_serial: 0, + audio_sink_pads, }) } @@ -969,15 +1128,35 @@ impl TranscriberBin { } } +impl ChildProxyImpl for TranscriberBin { + fn child_by_index(&self, _index: u32) -> Option { + None + } + + fn children_count(&self) -> u32 { + 0 + } + + fn child_by_name(&self, name: &str) -> Option { + if let Some(child) = self.parent_child_by_name(name) { + Some(child) + } else { + self.obj().static_pad(name).map(|pad| pad.upcast()) + } + } +} + #[glib::object_subclass] impl ObjectSubclass for TranscriberBin { const NAME: &'static str = "GstTranscriberBin"; type Type = super::TranscriberBin; type ParentType = gst::Bin; + type Interfaces = (gst::ChildProxy,); fn with_class(klass: &Self::Class) -> Self { let templ = klass.pad_template("sink_audio").unwrap(); - let audio_sinkpad = gst::GhostPad::from_template(&templ); + let audio_sinkpad = + gst::PadBuilder::::from_template(&templ).build(); let templ = klass.pad_template("src_audio").unwrap(); let audio_srcpad = gst::GhostPad::builder_from_template(&templ) .query_function(|pad, parent, query| { @@ -1013,7 +1192,7 @@ impl ObjectSubclass for TranscriberBin { Self { audio_srcpad, video_srcpad, - audio_sinkpad, + audio_sinkpad: audio_sinkpad.into(), video_sinkpad, state: Mutex::new(None), settings: Mutex::new(Settings::default()), @@ -1122,37 +1301,20 @@ impl ObjectImpl for TranscriberBin { ); } "mode" => { - let mut settings = self.settings.lock().unwrap(); - - let old_mode = settings.mode; - let new_mode = value.get().expect("type checked upstream"); - settings.mode = new_mode; - - if old_mode != new_mode { - drop(settings); - self.setup_cc_mode(self.state.lock().unwrap().as_ref().unwrap()); - } + self.audio_sinkpad.set_property( + "mode", + value.get::().expect("type checked upstream"), + ); } "cc-caps" => { let mut settings = self.settings.lock().unwrap(); settings.cc_caps = value.get().expect("type checked upstream"); } "transcriber" => { - let mut s = self.state.lock().unwrap(); - if let Some(ref mut state) = s.as_mut() { - let old_transcriber = state.transcriber.clone(); - state.transcriber = value.get().expect("type checked upstream"); - if old_transcriber != state.transcriber { - match self.relink_transcriber(state, &old_transcriber) { - Ok(()) => (), - Err(err) => { - gst::error!(CAT, "invalid transcriber: {}", err); - drop(s); - *self.state.lock().unwrap() = None; - } - } - } - } + self.audio_sinkpad.set_property( + "transcriber", + value.get::().expect("type checked upstream"), + ); } "caption-source" => { let mut settings = self.settings.lock().unwrap(); @@ -1170,19 +1332,12 @@ impl ObjectImpl for TranscriberBin { } } "translation-languages" => { - let mut settings = self.settings.lock().unwrap(); - settings.translation_languages = value - .get::>() - .expect("type checked upstream"); - gst::debug!( - CAT, - imp: self, - "Updated translation-languages {:?}", - settings.translation_languages + self.audio_sinkpad.set_property( + "translation-languages", + value + .get::>() + .expect("type checked upstream"), ); - drop(settings); - - self.update_languages(false); } "translate-latency" => { let mut settings = self.settings.lock().unwrap(); @@ -1191,25 +1346,12 @@ impl ObjectImpl for TranscriberBin { ); } "language-code" => { - let code = value - .get::>() - .expect("type checked upstream") - .unwrap_or_else(|| String::from(DEFAULT_INPUT_LANG_CODE)); - let mut settings = self.settings.lock().unwrap(); - if settings.language_code != code { - gst::debug!( - CAT, - imp: self, - "Updating language code {} -> {}", - settings.language_code, - code - ); - - settings.language_code = code; - drop(settings); - - self.update_languages(true) - } + self.audio_sinkpad.set_property( + "language-code", + value + .get::>() + .expect("type checked upstream"), + ); } "mux-method" => { let mut settings = self.settings.lock().unwrap(); @@ -1233,39 +1375,22 @@ impl ObjectImpl for TranscriberBin { let settings = self.settings.lock().unwrap(); (settings.accumulate_time.mseconds() as u32).to_value() } - "mode" => { - let settings = self.settings.lock().unwrap(); - settings.mode.to_value() - } + "mode" => self.audio_sinkpad.property("mode"), "cc-caps" => { let settings = self.settings.lock().unwrap(); settings.cc_caps.to_value() } - "transcriber" => { - let state = self.state.lock().unwrap(); - if let Some(state) = state.as_ref() { - state.transcriber.to_value() - } else { - let ret: Option = None; - ret.to_value() - } - } + "transcriber" => self.audio_sinkpad.property("transcriber"), "caption-source" => { let settings = self.settings.lock().unwrap(); settings.caption_source.to_value() } - "translation-languages" => { - let settings = self.settings.lock().unwrap(); - settings.translation_languages.to_value() - } + "translation-languages" => self.audio_sinkpad.property("translation-languages"), "translate-latency" => { let settings = self.settings.lock().unwrap(); (settings.translate_latency.mseconds() as u32).to_value() } - "language-code" => { - let settings = self.settings.lock().unwrap(); - settings.language_code.to_value() - } + "language-code" => self.audio_sinkpad.property("language-code"), "mux-method" => { let settings = self.settings.lock().unwrap(); settings.mux_method.to_value() @@ -1341,11 +1466,28 @@ impl ElementImpl for TranscriberBin { &caps, ) .unwrap(); - let audio_sink_pad_template = gst::PadTemplate::new( + let audio_sink_pad_template = gst::PadTemplate::with_gtype( "sink_audio", gst::PadDirection::Sink, gst::PadPresence::Always, &caps, + super::TranscriberSinkPad::static_type(), + ) + .unwrap(); + let secondary_audio_sink_pad_template = gst::PadTemplate::with_gtype( + "sink_audio_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &caps, + super::TranscriberSinkPad::static_type(), + ) + .unwrap(); + let secondary_audio_src_pad_template = gst::PadTemplate::with_gtype( + "src_audio_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &caps, + super::TranscriberSrcPad::static_type(), ) .unwrap(); @@ -1354,12 +1496,182 @@ impl ElementImpl for TranscriberBin { video_sink_pad_template, audio_src_pad_template, audio_sink_pad_template, + secondary_audio_sink_pad_template, + secondary_audio_src_pad_template, ] }); PAD_TEMPLATES.as_ref() } + fn release_pad(&self, pad: &gst::Pad) { + if self.obj().current_state() > gst::State::Null { + gst::fixme!(CAT, obj: pad, "releasing secondary audio stream while PLAYING is untested"); + } + + // In practice we will probably at least need some flushing here, + // and latency recalculating, but at least the basic skeleton for + // releasing is in place + + let Some(pad) = pad.downcast_ref::() else { + gst::error!(CAT, imp: self, "not a transcriber sink pad: {pad:?}"); + return; + }; + + let mut s = self.state.lock().unwrap(); + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + + pad_state.transcription_bin.set_locked_state(true); + let _ = pad_state.transcription_bin.set_state(gst::State::Null); + + if let Some(ref mut state) = s.as_mut() { + if let Some(srcpad) = pad_state.transcription_bin.static_pad("src") { + if let Some(peer) = srcpad.peer() { + let _ = state.ccmux.remove_pad(&peer); + } + } + + let _ = state.transcription_bin.remove(&pad_state.transcription_bin); + for srcpad in pad_state.audio_tee.iterate_src_pads() { + let srcpad = srcpad.unwrap(); + if let Some(peer) = srcpad.peer() { + if let Some(parent) = peer.parent() { + if parent == state.transcription_bin { + let _ = parent.downcast::().unwrap().remove_pad(&peer); + } + } + } + } + let _ = state.internal_bin.remove_many([ + &pad_state.clocksync, + &pad_state.identity, + &pad_state.audio_tee, + ]); + state.audio_sink_pads.remove(pad.name().as_str()); + } + + let srcpad = pad_state + .srcpad_name + .as_ref() + .and_then(|name| self.obj().static_pad(name)); + + drop(ps); + drop(s); + + let _ = pad.set_active(false); + let _ = self.obj().remove_pad(pad); + + if let Some(srcpad) = srcpad { + let _ = srcpad.set_active(false); + let _ = self.obj().remove_pad(&srcpad); + } + + self.obj() + .child_removed(pad.upcast_ref::(), &pad.name()); + } + + fn request_new_pad( + &self, + _templ: &gst::PadTemplate, + _name: Option<&str>, + _caps: Option<&gst::Caps>, + ) -> Option { + let element = self.obj(); + if element.current_state() > gst::State::Null { + gst::error!(CAT, "element pads can only be requested before starting"); + return None; + } + + let mut s = self.state.lock().unwrap(); + if let Some(ref mut state) = s.as_mut() { + let name = format!("sink_audio_{}", state.audio_serial); + + let templ = self.obj().pad_template("sink_audio_%u").unwrap(); + let sink_pad = gst::PadBuilder::::from_template(&templ) + .name(&name) + .build(); + + let src_pad = { + let settings = self.settings.lock().unwrap(); + let mut s = sink_pad.imp().state.lock().unwrap(); + let pad_state = match s.as_mut() { + Ok(s) => s, + Err(e) => { + gst::error!(CAT, "Failed to construct sink pad: {e}"); + return None; + } + }; + let pad_settings = sink_pad.imp().settings.lock().unwrap(); + self.construct_transcription_channels( + &pad_settings, + settings.mux_method, + &mut pad_state.transcription_channels, + ) + .unwrap(); + + if let Err(e) = self.link_input_audio_stream(&name, pad_state, state) { + gst::error!(CAT, "Failed to link secondary audio stream: {e}"); + return None; + } + + let internal_sink_pad = gst::GhostPad::builder_with_target( + &pad_state.clocksync.static_pad("sink").unwrap(), + ) + .unwrap() + .name(name.clone()) + .build(); + internal_sink_pad.set_active(true).unwrap(); + state.internal_bin.add_pad(&internal_sink_pad).unwrap(); + sink_pad.set_target(Some(&internal_sink_pad)).unwrap(); + sink_pad.set_active(true).unwrap(); + state + .audio_sink_pads + .insert(name.to_string(), sink_pad.clone()); + + let templ = self.obj().pad_template("src_audio_%u").unwrap(); + let name = format!("src_audio_{}", state.audio_serial); + let src_pad = gst::PadBuilder::::from_template(&templ) + .name(name.as_str()) + .query_function(|pad, parent, query| { + TranscriberBin::catch_panic_pad_function( + parent, + || false, + |transcriber| transcriber.src_query(pad.upcast_ref(), query), + ) + }) + .build(); + let internal_src_pad = gst::GhostPad::builder_with_target( + &pad_state.queue_passthrough.static_pad("src").unwrap(), + ) + .unwrap() + .name(&name) + .build(); + internal_src_pad.set_active(true).unwrap(); + state.internal_bin.add_pad(&internal_src_pad).unwrap(); + src_pad.set_target(Some(&internal_src_pad)).unwrap(); + src_pad.set_active(true).unwrap(); + + pad_state.srcpad_name = Some(name.clone()); + + src_pad + }; + + state.audio_serial += 1; + + drop(s); + + self.obj().add_pad(&sink_pad).unwrap(); + self.obj() + .child_added(sink_pad.upcast_ref::(), &sink_pad.name()); + self.obj().add_pad(&src_pad).unwrap(); + + Some(sink_pad.upcast()) + } else { + None + } + } + #[allow(clippy::single_match)] fn change_state( &self, @@ -1403,32 +1715,27 @@ impl BinImpl for TranscriberBin { match msg.view() { MessageView::Error(m) => { - /* We must have a state here */ - let s = self.state.lock().unwrap(); - - if let Some(state) = s.as_ref() { - if msg.src() == Some(state.transcriber.upcast_ref()) { - gst::error!( - CAT, - imp: self, - "Transcriber has posted an error ({:?}), going back to passthrough", - m - ); - drop(s); - let mut settings = self.settings.lock().unwrap(); - settings.passthrough = true; - drop(settings); - self.obj().notify("passthrough"); - self.obj().call_async(move |bin| { - let thiz = bin.imp(); - thiz.block_and_update(true); - }); - } else { - drop(s); - self.parent_handle_message(msg); - } + let pad = self + .audio_sinkpad + .downcast_ref::() + .unwrap(); + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + if msg.src() == Some(pad_state.transcriber.upcast_ref()) { + gst::error!( + CAT, + imp: self, + "Transcriber has posted an error ({m:?}), going back to passthrough", + ); + drop(ps); + self.settings.lock().unwrap().passthrough = true; + self.obj().notify("passthrough"); + self.obj().call_async(move |bin| { + let thiz = bin.imp(); + thiz.block_and_update(true); + }); } else { - drop(s); + drop(ps); self.parent_handle_message(msg); } } @@ -1436,3 +1743,262 @@ impl BinImpl for TranscriberBin { } } } + +#[derive(Debug, Clone)] +struct TranscriberSinkPadSettings { + translation_languages: Option, + language_code: String, + mode: Cea608Mode, +} + +impl Default for TranscriberSinkPadSettings { + fn default() -> Self { + Self { + translation_languages: Some( + gst::Structure::builder("languages") + .field("transcript", "cc1") + .build(), + ), + language_code: String::from(DEFAULT_INPUT_LANG_CODE), + mode: DEFAULT_MODE, + } + } +} + +struct TranscriberSinkPadState { + clocksync: gst::Element, + identity: gst::Element, + audio_tee: gst::Element, + transcription_bin: gst::Bin, + transcriber_aconv: gst::Element, + transcriber_resample: gst::Element, + transcriber: gst::Element, + queue_passthrough: gst::Element, + transcription_channels: HashMap, + srcpad_name: Option, +} + +impl TranscriberSinkPadState { + fn try_new() -> Result { + Ok(Self { + clocksync: gst::ElementFactory::make("clocksync").build()?, + identity: gst::ElementFactory::make("identity") + // We need to do that otherwise downstream may block for up to + // latency long until the allocation makes it through all branches. + // Audio buffer pools are fortunately not the most critical :) + .property("drop-allocation", true) + .build()?, + audio_tee: gst::ElementFactory::make("tee") + // Protect passthrough enable (and resulting dynamic reconfigure) + // from non-streaming thread + .property("allow-not-linked", true) + .build()?, + transcription_bin: gst::Bin::new(), + transcriber_resample: gst::ElementFactory::make("audioresample").build()?, + transcriber_aconv: gst::ElementFactory::make("audioconvert").build()?, + transcriber: gst::ElementFactory::make("awstranscriber").build()?, + queue_passthrough: gst::ElementFactory::make("queue").build()?, + transcription_channels: HashMap::new(), + srcpad_name: None, + }) + } +} + +pub struct TranscriberSinkPad { + state: Mutex>, + settings: Mutex, +} + +#[glib::object_subclass] +impl ObjectSubclass for TranscriberSinkPad { + const NAME: &'static str = "GstTranscriberSinkPad"; + type Type = super::TranscriberSinkPad; + type ParentType = gst::GhostPad; + + fn new() -> Self { + Self { + state: Mutex::new(TranscriberSinkPadState::try_new()), + settings: Mutex::new(TranscriberSinkPadSettings::default()), + } + } +} + +impl ObjectImpl for TranscriberSinkPad { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecBoxed::builder::("translation-languages") + .nick("Translation languages") + .blurb("A map of language codes to caption channels, e.g. translation-languages=\"languages, transcript={CC1, 708_1}, fr={708_2, CC3}\" will map the French translation to CC1/service 1 and the original transcript to CC3/service 2") + .mutable_playing() + .build(), + glib::ParamSpecString::builder("language-code") + .nick("Language Code") + .blurb("The language of the input stream") + .default_value(Some(DEFAULT_INPUT_LANG_CODE)) + .mutable_playing() + .build(), + glib::ParamSpecEnum::builder_with_default("mode", DEFAULT_MODE) + .nick("Mode") + .blurb("Which closed caption mode to operate in") + .mutable_playing() + .build(), + glib::ParamSpecObject::builder::("transcriber") + .nick("Transcriber") + .blurb("The transcriber element to use") + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "translation-languages" => { + if let Some(this) = self.obj().parent().and_downcast::() { + let mut settings = self.settings.lock().unwrap(); + settings.translation_languages = value + .get::>() + .expect("type checked upstream"); + gst::debug!( + CAT, + imp: self, + "Updated translation-languages {:?}", + settings.translation_languages + ); + drop(settings); + + this.imp().update_languages(&self.obj(), false); + } + } + "mode" => { + let mut settings = self.settings.lock().unwrap(); + + let old_mode = settings.mode; + let new_mode = value.get().expect("type checked upstream"); + settings.mode = new_mode; + + if old_mode != new_mode { + drop(settings); + if let Some(this) = self.obj().parent().and_downcast::() + { + if let Some(state) = this.imp().state.lock().unwrap().as_ref() { + let ps = self.state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + this.imp().setup_cc_mode( + &self.obj(), + pad_state, + state.mux_method, + new_mode, + ); + } + } + } + } + "language-code" => { + if let Some(this) = self.obj().parent().and_downcast::() { + let code = value + .get::>() + .expect("type checked upstream") + .unwrap_or_else(|| String::from(DEFAULT_INPUT_LANG_CODE)); + let mut settings = self.settings.lock().unwrap(); + if settings.language_code != code { + gst::debug!( + CAT, + imp: self, + "Updating language code {} -> {code}", + settings.language_code, + ); + + settings.language_code = code; + drop(settings); + + this.imp().update_languages(&self.obj(), true) + } + } + } + "transcriber" => { + if let Some(this) = self.obj().parent().and_downcast::() { + let mut s = this.imp().state.lock().unwrap(); + let mut ps = self.state.lock().unwrap(); + let pad_state = ps.as_mut().unwrap(); + let old_transcriber = pad_state.transcriber.clone(); + pad_state.transcriber = value.get().expect("type checked upstream"); + if old_transcriber != pad_state.transcriber { + if let Some(ref mut state) = s.as_mut() { + match this + .imp() + .relink_transcriber(state, pad_state, &old_transcriber) + { + Ok(()) => (), + Err(err) => { + gst::error!(CAT, "invalid transcriber: {err}"); + drop(s); + *this.imp().state.lock().unwrap() = None; + } + } + } + } + } + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "translation-languages" => { + let settings = self.settings.lock().unwrap(); + settings.translation_languages.to_value() + } + "language-code" => { + let settings = self.settings.lock().unwrap(); + settings.language_code.to_value() + } + "mode" => { + let settings = self.settings.lock().unwrap(); + settings.mode.to_value() + } + "transcriber" => { + let ps = self.state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + pad_state.transcriber.to_value() + } + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for TranscriberSinkPad {} + +impl PadImpl for TranscriberSinkPad {} + +impl ProxyPadImpl for TranscriberSinkPad {} + +impl GhostPadImpl for TranscriberSinkPad {} + +#[derive(Debug, Default)] +pub struct TranscriberSrcPad {} + +#[glib::object_subclass] +impl ObjectSubclass for TranscriberSrcPad { + const NAME: &'static str = "GstTranscriberSrcPad"; + type Type = super::TranscriberSrcPad; + type ParentType = gst::GhostPad; + + fn new() -> Self { + Default::default() + } +} + +impl ObjectImpl for TranscriberSrcPad {} + +impl GstObjectImpl for TranscriberSrcPad {} + +impl PadImpl for TranscriberSrcPad {} + +impl ProxyPadImpl for TranscriberSrcPad {} + +impl GhostPadImpl for TranscriberSrcPad {} diff --git a/video/closedcaption/src/transcriberbin/mod.rs b/video/closedcaption/src/transcriberbin/mod.rs index 0d115e02..bcec76cd 100644 --- a/video/closedcaption/src/transcriberbin/mod.rs +++ b/video/closedcaption/src/transcriberbin/mod.rs @@ -30,7 +30,15 @@ enum MuxMethod { } glib::wrapper! { - pub struct TranscriberBin(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; + pub struct TranscriberBin(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy; +} + +glib::wrapper! { + pub struct TranscriberSinkPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; +} + +glib::wrapper! { + pub struct TranscriberSrcPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { @@ -38,6 +46,8 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { { CaptionSource::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); MuxMethod::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + TranscriberSinkPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + TranscriberSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); } gst::Element::register(