diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index fcc4bcd1..c5f304c4 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -5585,7 +5585,14 @@ "sink_audio": { "caps": "audio/x-raw:\n", "direction": "sink", - "presence": "always" + "presence": "always", + "type": "GstTranscriberSinkPad" + }, + "sink_audio_%%u": { + "caps": "audio/x-raw:\n", + "direction": "sink", + "presence": "request", + "type": "GstTranscriberSinkPad" }, "sink_video": { "caps": "video/x-raw(ANY):\n", @@ -5597,6 +5604,12 @@ "direction": "src", "presence": "always" }, + "src_audio_%%u": { + "caps": "audio/x-raw:\n", + "direction": "src", + "presence": "sometimes", + "type": "GstTranscriberSrcPad" + }, "src_video": { "caps": "video/x-raw(ANY):\n", "direction": "src", @@ -5735,6 +5748,7 @@ "construct": false, "construct-only": false, "controllable": false, + "default": "languages, transcript=(string)cc1;", "mutable": "playing", "readable": true, "type": "GstStructure", @@ -6038,6 +6052,79 @@ } ] }, + "GstTranscriberSinkPad": { + "hierarchy": [ + "GstTranscriberSinkPad", + "GstGhostPad", + "GstProxyPad", + "GstPad", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "kind": "object", + "properties": { + "language-code": { + "blurb": "The language of the input stream", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "en-US", + "mutable": "playing", + "readable": true, + "type": "gchararray", + "writable": true + }, + "mode": { + "blurb": "Which closed caption mode to operate in", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "roll-up2 (2)", + "mutable": "playing", + "readable": true, + "type": "GstTtToCea608Mode", + "writable": true + }, + "transcriber": { + "blurb": "The transcriber element to use", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "mutable": "ready", + "readable": true, + "type": "GstElement", + "writable": true + }, + "translation-languages": { + "blurb": "A map of language codes to caption channels, e.g. translation-languages=\"languages, transcript={CC1, 708_1}, fr={708_2, CC3}\" will map the French translation to CC1/service 1 and the original transcript to CC3/service 2", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "languages, transcript=(string)cc1;", + "mutable": "playing", + "readable": true, + "type": "GstStructure", + "writable": true + } + } + }, + "GstTranscriberSrcPad": { + "hierarchy": [ + "GstTranscriberSrcPad", + "GstGhostPad", + "GstProxyPad", + "GstPad", + "GstObject", + "GInitiallyUnowned", + "GObject" + ], + "kind": "object" + }, "GstTtToCea608Mode": { "kind": "enum", "values": [ diff --git a/video/closedcaption/src/transcriberbin/imp.rs b/video/closedcaption/src/transcriberbin/imp.rs index 7b85b946..03a20ae9 100644 --- a/video/closedcaption/src/transcriberbin/imp.rs +++ b/video/closedcaption/src/transcriberbin/imp.rs @@ -68,24 +68,22 @@ impl TranscriptionChannel { } } +/* Locking order: State, Settings, PadState, PadSettings */ + struct State { mux_method: MuxMethod, framerate: Option, - tearing_down: bool, + tearing_down: usize, internal_bin: gst::Bin, - audio_queue_passthrough: gst::Element, video_queue: gst::Element, - audio_tee: gst::Element, - transcriber_resample: gst::Element, - transcriber_aconv: gst::Element, - transcriber: gst::Element, ccmux: gst::Element, ccmux_filter: gst::Element, cccombiner: gst::Element, transcription_bin: gst::Bin, - transcription_channels: HashMap, cccapsfilter: gst::Element, transcription_valve: gst::Element, + audio_serial: u32, + audio_sink_pads: HashMap, } struct Settings { @@ -94,10 +92,7 @@ struct Settings { translate_latency: gst::ClockTime, passthrough: bool, accumulate_time: gst::ClockTime, - mode: Cea608Mode, caption_source: CaptionSource, - translation_languages: Option, - language_code: String, mux_method: MuxMethod, } @@ -111,10 +106,7 @@ impl Default for Settings { latency: DEFAULT_LATENCY, translate_latency: DEFAULT_TRANSLATE_LATENCY, accumulate_time: DEFAULT_ACCUMULATE, - mode: DEFAULT_MODE, caption_source: DEFAULT_CAPTION_SOURCE, - translation_languages: None, - language_code: String::from(DEFAULT_INPUT_LANG_CODE), mux_method: DEFAULT_MUX_METHOD, } } @@ -235,9 +227,34 @@ impl TranscriberBin { }) } - fn construct_transcription_bin(&self, state: &mut State) -> Result<(), Error> { - gst::debug!(CAT, imp: self, "Building transcription bin"); + fn link_input_audio_stream( + &self, + pad_name: &str, + pad_state: &TranscriberSinkPadState, + state: &mut State, + ) -> Result<(), Error> { + gst::debug!(CAT, imp: self, "Linking input audio stream {pad_name}"); + state + .internal_bin + .add_many([ + &pad_state.clocksync, + &pad_state.identity, + &pad_state.audio_tee, + &pad_state.queue_passthrough, + ]) + .unwrap(); + gst::Element::link_many([ + &pad_state.clocksync, + &pad_state.identity, + &pad_state.audio_tee, + ])?; + pad_state.audio_tee.link_pads( + Some("src_%u"), + &pad_state.queue_passthrough, + Some("sink"), + )?; + state.transcription_bin.add(&pad_state.transcription_bin)?; let aqueue_transcription = gst::ElementFactory::make("queue") .name("transqueue") .property("max-size-buffers", 0u32) @@ -245,58 +262,86 @@ impl TranscriberBin { .property("max-size-time", 5_000_000_000u64) .property_from_str("leaky", "downstream") .build()?; - let ccconverter = gst::ElementFactory::make("ccconverter").build()?; - state.transcription_bin.add_many([ + pad_state.transcription_bin.add_many([ &aqueue_transcription, - &state.transcriber_resample, - &state.transcriber_aconv, - &state.transcriber, - &state.ccmux, - &state.ccmux_filter, - &ccconverter, - &state.cccapsfilter, - &state.transcription_valve, + &pad_state.transcriber_resample, + &pad_state.transcriber_aconv, + &pad_state.transcriber, ])?; gst::Element::link_many([ &aqueue_transcription, - &state.transcriber_resample, - &state.transcriber_aconv, - &state.transcriber, + &pad_state.transcriber_resample, + &pad_state.transcriber_aconv, + &pad_state.transcriber, ])?; - gst::Element::link_many([ - &state.ccmux, - &state.ccmux_filter, - &ccconverter, - &state.cccapsfilter, - &state.transcription_valve, - ])?; - - for channel in state.transcription_channels.values() { - state.transcription_bin.add(&channel.bin)?; - - channel.link_transcriber(&state.transcriber)?; - - let ccmux_pad = state - .ccmux - .request_pad_simple(&channel.ccmux_pad_name) - .ok_or(anyhow!("Failed to request ccmux sink pad"))?; - channel.bin.static_pad("src").unwrap().link(&ccmux_pad)?; - } - - state.ccmux.set_property("latency", CEAX08MUX_LATENCY); - let transcription_audio_sinkpad = - gst::GhostPad::with_target(&aqueue_transcription.static_pad("sink").unwrap()).unwrap(); - let transcription_audio_srcpad = - gst::GhostPad::with_target(&state.transcription_valve.static_pad("src").unwrap()) - .unwrap(); + gst::GhostPad::builder_with_target(&aqueue_transcription.static_pad("sink").unwrap()) + .unwrap() + .name(pad_name) + .build(); + + pad_state + .transcription_bin + .add_pad(&transcription_audio_sinkpad)?; + + let transcription_audio_sinkpad = + gst::GhostPad::with_target(&transcription_audio_sinkpad).unwrap(); state .transcription_bin .add_pad(&transcription_audio_sinkpad)?; + + for channel in pad_state.transcription_channels.values() { + pad_state.transcription_bin.add(&channel.bin)?; + + channel.link_transcriber(&pad_state.transcriber)?; + + let srcpad = + gst::GhostPad::with_target(&channel.bin.static_pad("src").unwrap()).unwrap(); + + pad_state.transcription_bin.add_pad(&srcpad)?; + if state.ccmux.static_pad(&channel.ccmux_pad_name).is_none() { + let ccmux_pad = state + .ccmux + .request_pad_simple(&channel.ccmux_pad_name) + .ok_or(anyhow!("Failed to request ccmux sink pad"))?; + srcpad.link(&ccmux_pad)?; + } + } + + Ok(()) + } + + fn construct_transcription_bin(&self, state: &mut State) -> Result<(), Error> { + gst::debug!(CAT, imp: self, "Building transcription bin"); + + let ccconverter = gst::ElementFactory::make("ccconverter").build()?; + + state.transcription_bin.add_many([ + &state.ccmux, + &state.ccmux_filter, + &ccconverter, + &state.cccapsfilter, + &state.transcription_valve, + ])?; + + gst::Element::link_many([ + &state.ccmux, + &state.ccmux_filter, + &ccconverter, + &state.cccapsfilter, + &state.transcription_valve, + ])?; + + state.ccmux.set_property("latency", CEAX08MUX_LATENCY); + + let transcription_audio_srcpad = + gst::GhostPad::with_target(&state.transcription_valve.static_pad("src").unwrap()) + .unwrap(); + state .transcription_bin .add_pad(&transcription_audio_srcpad)?; @@ -309,39 +354,19 @@ impl TranscriberBin { } fn construct_internal_bin(&self, state: &mut State) -> Result<(), Error> { - let aclocksync = gst::ElementFactory::make("clocksync").build()?; + let vclocksync = gst::ElementFactory::make("clocksync") + .name("vclocksync") + .build()?; - let vclocksync = gst::ElementFactory::make("clocksync").build()?; - - state.internal_bin.add_many([ - &aclocksync, - &state.audio_tee, - &state.audio_queue_passthrough, - &vclocksync, - &state.video_queue, - &state.cccombiner, - ])?; - - aclocksync.link(&state.audio_tee)?; state - .audio_tee - .link_pads(Some("src_%u"), &state.audio_queue_passthrough, Some("sink"))?; + .internal_bin + .add_many([&vclocksync, &state.video_queue, &state.cccombiner])?; + vclocksync.link(&state.video_queue)?; state .video_queue .link_pads(Some("src"), &state.cccombiner, Some("sink"))?; - let internal_audio_sinkpad = - gst::GhostPad::builder_with_target(&aclocksync.static_pad("sink").unwrap()) - .unwrap() - .name("audio_sink") - .build(); - let internal_audio_srcpad = gst::GhostPad::builder_with_target( - &state.audio_queue_passthrough.static_pad("src").unwrap(), - ) - .unwrap() - .name("audio_src") - .build(); let internal_video_sinkpad = gst::GhostPad::builder_with_target(&vclocksync.static_pad("sink").unwrap()) .unwrap() @@ -353,8 +378,6 @@ impl TranscriberBin { .name("video_src") .build(); - state.internal_bin.add_pad(&internal_audio_sinkpad)?; - state.internal_bin.add_pad(&internal_audio_srcpad)?; state.internal_bin.add_pad(&internal_video_sinkpad)?; state.internal_bin.add_pad(&internal_video_srcpad)?; @@ -385,10 +408,6 @@ impl TranscriberBin { state.cccombiner.set_property("latency", 100.mseconds()); - self.audio_sinkpad - .set_target(Some(&state.internal_bin.static_pad("audio_sink").unwrap()))?; - self.audio_srcpad - .set_target(Some(&state.internal_bin.static_pad("audio_src").unwrap()))?; self.video_sinkpad .set_target(Some(&state.internal_bin.static_pad("video_sink").unwrap()))?; self.video_srcpad @@ -396,6 +415,35 @@ impl TranscriberBin { self.construct_transcription_bin(state)?; + let pad = self + .audio_sinkpad + .downcast_ref::() + .unwrap(); + // FIXME: replace this pattern with https://doc.rust-lang.org/nightly/std/sync/struct.MappedMutexGuard.html + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + + self.link_input_audio_stream("sink_audio", pad_state, state)?; + + let internal_audio_sinkpad = + gst::GhostPad::builder_with_target(&pad_state.clocksync.static_pad("sink").unwrap()) + .unwrap() + .name("audio_sink") + .build(); + let internal_audio_srcpad = gst::GhostPad::builder_with_target( + &pad_state.queue_passthrough.static_pad("src").unwrap(), + ) + .unwrap() + .name("audio_src") + .build(); + + state.internal_bin.add_pad(&internal_audio_sinkpad)?; + state.internal_bin.add_pad(&internal_audio_srcpad)?; + + self.audio_sinkpad + .set_target(Some(&state.internal_bin.static_pad("audio_sink").unwrap()))?; + self.audio_srcpad + .set_target(Some(&state.internal_bin.static_pad("audio_src").unwrap()))?; Ok(()) } @@ -427,21 +475,42 @@ impl TranscriberBin { + settings.accumulate_time + CEAX08MUX_LATENCY; - for queue in [&state.audio_queue_passthrough, &state.video_queue] { - queue.set_property("max-size-bytes", 0u32); - queue.set_property("max-size-buffers", 0u32); - queue.set_property("max-size-time", max_size_time); + gst::debug!( + CAT, + "Calculated max size time for passthrough branches: {max_size_time}" + ); + + state.video_queue.set_property("max-size-bytes", 0u32); + state.video_queue.set_property("max-size-buffers", 0u32); + state + .video_queue + .set_property("max-size-time", max_size_time); + + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + let latency_ms = settings.latency.mseconds() as u32; + pad_state + .transcriber + .set_property("transcribe-latency", latency_ms); + + let translate_latency_ms = settings.translate_latency.mseconds() as u32; + pad_state + .transcriber + .set_property("translate-latency", translate_latency_ms); + pad_state + .queue_passthrough + .set_property("max-size-bytes", 0u32); + pad_state + .queue_passthrough + .set_property("max-size-buffers", 0u32); + pad_state + .queue_passthrough + .set_property("max-size-time", max_size_time); } - let latency_ms = settings.latency.mseconds() as u32; - state.transcriber.set_property("latency", latency_ms); - - let translate_latency_ms = settings.translate_latency.mseconds() as u32; - state - .transcriber - .set_property("translate-latency", translate_latency_ms); - if !settings.passthrough { + gst::debug!(CAT, imp: self, "Linking transcription bins and synchronizing state"); state .transcription_bin .link_pads(Some("src"), &state.cccombiner, Some("caption")) @@ -450,47 +519,58 @@ impl TranscriberBin { state.transcription_bin.set_locked_state(false); state.transcription_bin.sync_state_with_parent().unwrap(); - let transcription_sink_pad = state.transcription_bin.static_pad("sink").unwrap(); - // Might be linked already if "translation-languages" is set - if transcription_sink_pad.peer().is_none() { - let audio_tee_pad = state.audio_tee.request_pad_simple("src_%u").unwrap(); - audio_tee_pad.link(&transcription_sink_pad).unwrap(); + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + pad_state.transcription_bin.set_locked_state(false); + pad_state + .transcription_bin + .sync_state_with_parent() + .unwrap(); + let transcription_sink_pad = + state.transcription_bin.static_pad(&pad.name()).unwrap(); + // Might be linked already if "translation-languages" is set + if transcription_sink_pad.peer().is_none() { + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); + audio_tee_pad.link(&transcription_sink_pad).unwrap(); + } } } - drop(settings); - - self.setup_cc_mode(state); + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + let pad_settings = pad.imp().settings.lock().unwrap(); + self.setup_cc_mode(pad, pad_state, state.mux_method, pad_settings.mode); + } } - fn disable_transcription_bin(&self) { - let mut state = self.state.lock().unwrap(); + fn disable_transcription_bin(&self, state: &mut State) { + // At this point, we want to check whether passthrough + // has been unset in the meantime + let passthrough = self.settings.lock().unwrap().passthrough; - if let Some(ref mut state) = state.as_mut() { - state.tearing_down = false; + if passthrough { + gst::debug!(CAT, imp: self, "disabling transcription bin"); - // At this point, we want to check whether passthrough - // has been unset in the meantime - let passthrough = self.settings.lock().unwrap().passthrough; - - if passthrough { - gst::debug!(CAT, imp: self, "disabling transcription bin"); - - let bin_sink_pad = state.transcription_bin.static_pad("sink").unwrap(); + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + let bin_sink_pad = state.transcription_bin.static_pad(&pad.name()).unwrap(); if let Some(audio_tee_pad) = bin_sink_pad.peer() { audio_tee_pad.unlink(&bin_sink_pad).unwrap(); - state.audio_tee.release_request_pad(&audio_tee_pad); + pad_state.audio_tee.release_request_pad(&audio_tee_pad); } - - let bin_src_pad = state.transcription_bin.static_pad("src").unwrap(); - if let Some(cccombiner_pad) = bin_src_pad.peer() { - bin_src_pad.unlink(&cccombiner_pad).unwrap(); - state.cccombiner.release_request_pad(&cccombiner_pad); - } - - state.transcription_bin.set_locked_state(true); - state.transcription_bin.set_state(gst::State::Null).unwrap(); } + + let bin_src_pad = state.transcription_bin.static_pad("src").unwrap(); + if let Some(cccombiner_pad) = bin_src_pad.peer() { + bin_src_pad.unlink(&cccombiner_pad).unwrap(); + state.cccombiner.release_request_pad(&cccombiner_pad); + } + + state.transcription_bin.set_locked_state(true); + state.transcription_bin.set_state(gst::State::Null).unwrap(); } } @@ -499,25 +579,34 @@ impl TranscriberBin { if let Some(ref mut state) = s.as_mut() { if passthrough { - let sinkpad = state.transcription_bin.static_pad("sink").unwrap(); - let imp_weak = self.downgrade(); - state.tearing_down = true; + state.tearing_down = state.audio_sink_pads.len(); + let sinkpads = state.audio_sink_pads.clone(); drop(s); - let _ = sinkpad.add_probe( - gst::PadProbeType::IDLE - | gst::PadProbeType::BUFFER - | gst::PadProbeType::EVENT_DOWNSTREAM, - move |_pad, _info| { - let Some(imp) = imp_weak.upgrade() else { - return gst::PadProbeReturn::Remove; - }; + for sinkpad in sinkpads.values() { + let imp_weak = self.downgrade(); + let _ = sinkpad.add_probe( + gst::PadProbeType::IDLE + | gst::PadProbeType::BUFFER + | gst::PadProbeType::EVENT_DOWNSTREAM, + move |_pad, _info| { + let Some(imp) = imp_weak.upgrade() else { + return gst::PadProbeReturn::Remove; + }; - imp.disable_transcription_bin(); + let mut s = imp.state.lock().unwrap(); - gst::PadProbeReturn::Remove - }, - ); - } else if state.tearing_down { + if let Some(ref mut state) = s.as_mut() { + state.tearing_down -= 1; + if state.tearing_down == 0 { + imp.disable_transcription_bin(state); + } + } + + gst::PadProbeReturn::Remove + }, + ); + } + } else if state.tearing_down > 0 { // Do nothing, wait for the previous transcription bin // to finish tearing down } else { @@ -528,20 +617,29 @@ impl TranscriberBin { state.transcription_bin.set_locked_state(false); state.transcription_bin.sync_state_with_parent().unwrap(); - let audio_tee_pad = state.audio_tee.request_pad_simple("src_%u").unwrap(); - let transcription_sink_pad = state.transcription_bin.static_pad("sink").unwrap(); - audio_tee_pad.link(&transcription_sink_pad).unwrap(); + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); + let transcription_sink_pad = + state.transcription_bin.static_pad(&pad.name()).unwrap(); + audio_tee_pad.link(&transcription_sink_pad).unwrap(); + } } } } - fn setup_cc_mode(&self, state: &State) { - let mode = self.settings.lock().unwrap().mode; + fn setup_cc_mode( + &self, + pad: &super::TranscriberSinkPad, + pad_state: &TranscriberSinkPadState, + mux_method: MuxMethod, + mode: Cea608Mode, + ) { + gst::debug!(CAT, imp: self, "setting CC mode {:?} for pad {:?}", mode, pad); - gst::debug!(CAT, imp: self, "setting CC mode {:?}", mode); - - for channel in state.transcription_channels.values() { - match state.mux_method { + for channel in pad_state.transcription_channels.values() { + match mux_method { MuxMethod::Cea608 => channel.tttoceax08.set_property("mode", mode), MuxMethod::Cea708 => match mode { Cea608Mode::PopOn => channel.tttoceax08.set_property("mode", Cea708Mode::PopOn), @@ -571,6 +669,7 @@ impl TranscriberBin { fn relink_transcriber( &self, state: &mut State, + pad_state: &TranscriberSinkPadState, old_transcriber: &gst::Element, ) -> Result<(), Error> { gst::debug!( @@ -578,23 +677,23 @@ impl TranscriberBin { imp: self, "Relinking transcriber, old: {:?}, new: {:?}", old_transcriber, - state.transcriber + pad_state.transcriber ); - state.transcriber_aconv.unlink(old_transcriber); + pad_state.transcriber_aconv.unlink(old_transcriber); - for channel in state.transcription_channels.values() { + for channel in pad_state.transcription_channels.values() { old_transcriber.unlink(&channel.bin); } state.transcription_bin.remove(old_transcriber).unwrap(); old_transcriber.set_state(gst::State::Null).unwrap(); - state.transcription_bin.add(&state.transcriber)?; - state.transcriber.sync_state_with_parent().unwrap(); - state.transcriber_aconv.link(&state.transcriber)?; + state.transcription_bin.add(&pad_state.transcriber)?; + pad_state.transcriber.sync_state_with_parent().unwrap(); + pad_state.transcriber_aconv.link(&pad_state.transcriber)?; - for channel in state.transcription_channels.values() { - channel.link_transcriber(&state.transcriber)?; + for channel in pad_state.transcription_channels.values() { + channel.link_transcriber(&pad_state.transcriber)?; } Ok(()) @@ -602,7 +701,7 @@ impl TranscriberBin { fn construct_transcription_channels( &self, - settings: &Settings, + settings: &TranscriberSinkPadSettings, mux_method: MuxMethod, transcription_channels: &mut HashMap, ) -> Result<(), Error> { @@ -667,11 +766,18 @@ impl TranscriberBin { Ok(()) } - fn reconfigure_transcription_bin(&self, lang_code_only: bool) -> Result<(), Error> { + fn reconfigure_transcription_bin( + &self, + pad: &TranscriberSinkPad, + lang_code_only: bool, + ) -> Result<(), Error> { let mut state = self.state.lock().unwrap(); if let Some(ref mut state) = state.as_mut() { let settings = self.settings.lock().unwrap(); + let mut ps = pad.state.lock().unwrap(); + let pad_state = ps.as_mut().unwrap(); + let pad_settings = pad.settings.lock().unwrap(); gst::debug!( CAT, @@ -680,20 +786,26 @@ impl TranscriberBin { ); // Unlink sinkpad temporarily - let sinkpad = state.transcription_bin.static_pad("sink").unwrap(); + let sinkpad = state + .transcription_bin + .static_pad(&pad.obj().name()) + .unwrap(); let peer = sinkpad.peer(); if let Some(peer) = &peer { gst::debug!(CAT, imp: self, "Unlinking {:?}", peer); peer.unlink(&sinkpad)?; - state.audio_tee.release_request_pad(peer); + pad_state.audio_tee.release_request_pad(peer); } - state.transcription_bin.set_locked_state(true); - state.transcription_bin.set_state(gst::State::Null).unwrap(); + pad_state.transcription_bin.set_locked_state(true); + pad_state + .transcription_bin + .set_state(gst::State::Null) + .unwrap(); - state + pad_state .transcriber - .set_property("language-code", &settings.language_code); + .set_property("language-code", &pad_settings.language_code); if lang_code_only { if !settings.passthrough { @@ -701,64 +813,89 @@ impl TranscriberBin { drop(settings); + // While we haven't locked the state here, the state of the + // top level transcription bin might be locked, for instance + // at start up. Unlock and sync both the inner and top level + // bin states to ensure data flows in the correct state state.transcription_bin.set_locked_state(false); - state.transcription_bin.sync_state_with_parent()?; + state.transcription_bin.sync_state_with_parent().unwrap(); - let audio_tee_pad = state.audio_tee.request_pad_simple("src_%u").unwrap(); + pad_state.transcription_bin.set_locked_state(false); + pad_state.transcription_bin.sync_state_with_parent()?; + + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); audio_tee_pad.link(&sinkpad)?; } return Ok(()); } - for channel in state.transcription_channels.values() { + for channel in pad_state.transcription_channels.values() { let sinkpad = channel.bin.static_pad("sink").unwrap(); if let Some(peer) = sinkpad.peer() { peer.unlink(&sinkpad)?; if channel.language != "transcript" { - state.transcriber.release_request_pad(&peer); + pad_state.transcriber.release_request_pad(&peer); } } let srcpad = channel.bin.static_pad("src").unwrap(); if let Some(peer) = srcpad.peer() { - srcpad.unlink(&peer)?; - state.ccmux.release_request_pad(&peer); + // The source pad might not have been linked to the muxer initially, for + // instance in case of a collision with another source pad's + // translation-languages mapping + if peer.parent().and_downcast_ref::() + == Some(state.ccmux.as_ref()) + { + srcpad.unlink(&peer)?; + state.ccmux.release_request_pad(&peer); + } } - state.transcription_bin.remove(&channel.bin)?; + pad_state.transcription_bin.remove(&channel.bin)?; } - state.transcription_channels.clear(); + pad_state.transcription_channels.clear(); self.construct_transcription_channels( - &settings, + &pad_settings, state.mux_method, - &mut state.transcription_channels, + &mut pad_state.transcription_channels, )?; - for channel in state.transcription_channels.values() { - state.transcription_bin.add(&channel.bin)?; + for channel in pad_state.transcription_channels.values() { + pad_state.transcription_bin.add(&channel.bin)?; - channel.link_transcriber(&state.transcriber)?; + channel.link_transcriber(&pad_state.transcriber)?; - let ccmux_pad = state - .ccmux - .request_pad_simple(&channel.ccmux_pad_name) - .ok_or(anyhow!("Failed to request ccmux sink pad"))?; - channel.bin.static_pad("src").unwrap().link(&ccmux_pad)?; + let srcpad = pad_state.transcription_bin.static_pad("src").unwrap(); + + srcpad + .downcast_ref::() + .unwrap() + .set_target(channel.bin.static_pad("src").as_ref())?; + + if state.ccmux.static_pad(&channel.ccmux_pad_name).is_none() { + let ccmux_pad = state + .ccmux + .request_pad_simple(&channel.ccmux_pad_name) + .ok_or(anyhow!("Failed to request ccmux sink pad"))?; + srcpad.link(&ccmux_pad)?; + } } - drop(settings); - self.setup_cc_mode(state); + self.setup_cc_mode(&pad.obj(), pad_state, state.mux_method, pad_settings.mode); - if !self.settings.lock().unwrap().passthrough { + if !settings.passthrough { gst::debug!(CAT, imp: self, "Syncing state with parent"); - state.transcription_bin.set_locked_state(false); - state.transcription_bin.sync_state_with_parent()?; + let audio_tee_pad = pad_state.audio_tee.request_pad_simple("src_%u").unwrap(); - let audio_tee_pad = state.audio_tee.request_pad_simple("src_%u").unwrap(); + drop(pad_settings); + drop(settings); + + pad_state.transcription_bin.set_locked_state(false); + pad_state.transcription_bin.sync_state_with_parent()?; audio_tee_pad.link(&sinkpad)?; } } @@ -766,43 +903,81 @@ impl TranscriberBin { Ok(()) } - fn update_languages(&self, lang_code_only: bool) { - let s = self.state.lock().unwrap(); + fn update_languages(&self, pad: &super::TranscriberSinkPad, lang_code_only: bool) { + gst::debug!( + CAT, + imp: self, + "Schedule transcription/translation language update for pad {pad:?}" + ); - if let Some(state) = s.as_ref() { - gst::debug!( - CAT, - imp: self, - "Schedule transcription/translation language update" - ); + let Some(sinkpad) = pad + .imp() + .state + .lock() + .unwrap() + .as_ref() + .unwrap() + .transcription_bin + .static_pad(pad.name().as_str()) + else { + gst::debug!(CAT, imp: pad.imp(), "transcription bin not set up yet"); + return; + }; - let sinkpad = state.transcription_bin.static_pad("sink").unwrap(); - let imp_weak = self.downgrade(); - drop(s); + let imp_weak = self.downgrade(); + let pad_weak = pad.downgrade(); - let _ = sinkpad.add_probe( - gst::PadProbeType::IDLE - | gst::PadProbeType::BUFFER - | gst::PadProbeType::EVENT_DOWNSTREAM, - move |_pad, _info| { - let Some(imp) = imp_weak.upgrade() else { - return gst::PadProbeReturn::Remove; - }; + let _ = sinkpad.add_probe( + gst::PadProbeType::IDLE + | gst::PadProbeType::BUFFER + | gst::PadProbeType::EVENT_DOWNSTREAM, + move |_pad, _info| { + let Some(imp) = imp_weak.upgrade() else { + return gst::PadProbeReturn::Remove; + }; - if imp.reconfigure_transcription_bin(lang_code_only).is_err() { - gst::element_imp_error!( - imp, - gst::StreamError::Failed, - ["Couldn't reconfigure channels"] - ); - } + let Some(pad) = pad_weak.upgrade() else { + return gst::PadProbeReturn::Remove; + }; - gst::PadProbeReturn::Remove - }, - ); - } else { - gst::debug!(CAT, imp: self, "Transcriber is not configured yet"); + if let Err(e) = imp.reconfigure_transcription_bin(pad.imp(), lang_code_only) { + gst::error!(CAT, "Couldn't reconfigure channels: {e}"); + gst::element_imp_error!( + imp, + gst::StreamError::Failed, + ["Couldn't reconfigure channels: {}", e] + ); + *imp.state.lock().unwrap() = None; + } + + gst::PadProbeReturn::Remove + }, + ); + } + + fn any_sink_is_translating(&self, state: &State) -> bool { + for pad in state.audio_sink_pads.values() { + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + if pad_state + .transcription_channels + .values() + .any(|c| c.language != "transcript") + { + return true; + } } + false + } + + fn any_sink_is_rollup(&self, state: &State) -> bool { + for pad in state.audio_sink_pads.values() { + let pad_settings = pad.imp().settings.lock().unwrap(); + if pad_settings.mode.is_rollup() { + return true; + } + } + false } #[allow(clippy::single_match)] @@ -819,16 +994,10 @@ impl TranscriberBin { if ret { let (_, mut min, _) = upstream_query.result(); + let state = self.state.lock().unwrap(); let (received_framerate, translating) = { - let state = self.state.lock().unwrap(); if let Some(state) = state.as_ref() { - ( - state.framerate, - state - .transcription_channels - .values() - .any(|c| c.language != "transcript"), - ) + (state.framerate, self.any_sink_is_translating(state)) } else { (None, false) } @@ -848,8 +1017,10 @@ impl TranscriberBin { .mul_div_floor(framerate.denom() as u64, framerate.numer() as u64) .unwrap(); } - } else if settings.mode.is_rollup() { - min += settings.accumulate_time; + } else if let Some(state) = state.as_ref() { + if self.any_sink_is_rollup(state) { + min += settings.accumulate_time; + } } q.set(true, min, gst::ClockTime::NONE); @@ -864,24 +1035,9 @@ impl TranscriberBin { fn build_state(&self) -> Result { let internal_bin = gst::Bin::with_name("internal"); let transcription_bin = gst::Bin::with_name("transcription-bin"); - let audio_tee = gst::ElementFactory::make("tee") - // Protect passthrough enable (and resulting dynamic reconfigure) - // from non-streaming thread - .property("allow-not-linked", true) - .build()?; let cccombiner = gst::ElementFactory::make("cccombiner") .name("cccombiner") .build()?; - let transcriber_resample = gst::ElementFactory::make("audioresample").build()?; - let transcriber_aconv = gst::ElementFactory::make("audioconvert").build()?; - let transcriber = gst::ElementFactory::make("awstranscriber") - .name("transcriber") - .property( - "language-code", - &self.settings.lock().unwrap().language_code, - ) - .build()?; - let audio_queue_passthrough = gst::ElementFactory::make("queue").build()?; let video_queue = gst::ElementFactory::make("queue").build()?; let cccapsfilter = gst::ElementFactory::make("capsfilter").build()?; let transcription_valve = gst::ElementFactory::make("valve") @@ -901,33 +1057,36 @@ impl TranscriberBin { }; let ccmux_filter = gst::ElementFactory::make("capsfilter").build()?; - let mut transcription_channels = HashMap::new(); - + let pad = self + .audio_sinkpad + .clone() + .downcast::() + .unwrap(); + let mut audio_sink_pads = HashMap::new(); + audio_sink_pads.insert(self.audio_sinkpad.name().to_string(), pad.clone()); + let mut ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_mut().unwrap(); + let pad_settings = pad.imp().settings.lock().unwrap(); self.construct_transcription_channels( - &settings, + &pad_settings, settings.mux_method, - &mut transcription_channels, + &mut pad_state.transcription_channels, )?; - drop(settings); Ok(State { mux_method, framerate: None, internal_bin, - audio_queue_passthrough, video_queue, - transcriber_resample, - transcriber_aconv, - transcriber, ccmux, ccmux_filter, - audio_tee, cccombiner, transcription_bin, - transcription_channels, cccapsfilter, transcription_valve, - tearing_down: false, + tearing_down: 0, + audio_serial: 0, + audio_sink_pads, }) } @@ -969,15 +1128,35 @@ impl TranscriberBin { } } +impl ChildProxyImpl for TranscriberBin { + fn child_by_index(&self, _index: u32) -> Option { + None + } + + fn children_count(&self) -> u32 { + 0 + } + + fn child_by_name(&self, name: &str) -> Option { + if let Some(child) = self.parent_child_by_name(name) { + Some(child) + } else { + self.obj().static_pad(name).map(|pad| pad.upcast()) + } + } +} + #[glib::object_subclass] impl ObjectSubclass for TranscriberBin { const NAME: &'static str = "GstTranscriberBin"; type Type = super::TranscriberBin; type ParentType = gst::Bin; + type Interfaces = (gst::ChildProxy,); fn with_class(klass: &Self::Class) -> Self { let templ = klass.pad_template("sink_audio").unwrap(); - let audio_sinkpad = gst::GhostPad::from_template(&templ); + let audio_sinkpad = + gst::PadBuilder::::from_template(&templ).build(); let templ = klass.pad_template("src_audio").unwrap(); let audio_srcpad = gst::GhostPad::builder_from_template(&templ) .query_function(|pad, parent, query| { @@ -1013,7 +1192,7 @@ impl ObjectSubclass for TranscriberBin { Self { audio_srcpad, video_srcpad, - audio_sinkpad, + audio_sinkpad: audio_sinkpad.into(), video_sinkpad, state: Mutex::new(None), settings: Mutex::new(Settings::default()), @@ -1122,37 +1301,20 @@ impl ObjectImpl for TranscriberBin { ); } "mode" => { - let mut settings = self.settings.lock().unwrap(); - - let old_mode = settings.mode; - let new_mode = value.get().expect("type checked upstream"); - settings.mode = new_mode; - - if old_mode != new_mode { - drop(settings); - self.setup_cc_mode(self.state.lock().unwrap().as_ref().unwrap()); - } + self.audio_sinkpad.set_property( + "mode", + value.get::().expect("type checked upstream"), + ); } "cc-caps" => { let mut settings = self.settings.lock().unwrap(); settings.cc_caps = value.get().expect("type checked upstream"); } "transcriber" => { - let mut s = self.state.lock().unwrap(); - if let Some(ref mut state) = s.as_mut() { - let old_transcriber = state.transcriber.clone(); - state.transcriber = value.get().expect("type checked upstream"); - if old_transcriber != state.transcriber { - match self.relink_transcriber(state, &old_transcriber) { - Ok(()) => (), - Err(err) => { - gst::error!(CAT, "invalid transcriber: {}", err); - drop(s); - *self.state.lock().unwrap() = None; - } - } - } - } + self.audio_sinkpad.set_property( + "transcriber", + value.get::().expect("type checked upstream"), + ); } "caption-source" => { let mut settings = self.settings.lock().unwrap(); @@ -1170,19 +1332,12 @@ impl ObjectImpl for TranscriberBin { } } "translation-languages" => { - let mut settings = self.settings.lock().unwrap(); - settings.translation_languages = value - .get::>() - .expect("type checked upstream"); - gst::debug!( - CAT, - imp: self, - "Updated translation-languages {:?}", - settings.translation_languages + self.audio_sinkpad.set_property( + "translation-languages", + value + .get::>() + .expect("type checked upstream"), ); - drop(settings); - - self.update_languages(false); } "translate-latency" => { let mut settings = self.settings.lock().unwrap(); @@ -1191,25 +1346,12 @@ impl ObjectImpl for TranscriberBin { ); } "language-code" => { - let code = value - .get::>() - .expect("type checked upstream") - .unwrap_or_else(|| String::from(DEFAULT_INPUT_LANG_CODE)); - let mut settings = self.settings.lock().unwrap(); - if settings.language_code != code { - gst::debug!( - CAT, - imp: self, - "Updating language code {} -> {}", - settings.language_code, - code - ); - - settings.language_code = code; - drop(settings); - - self.update_languages(true) - } + self.audio_sinkpad.set_property( + "language-code", + value + .get::>() + .expect("type checked upstream"), + ); } "mux-method" => { let mut settings = self.settings.lock().unwrap(); @@ -1233,39 +1375,22 @@ impl ObjectImpl for TranscriberBin { let settings = self.settings.lock().unwrap(); (settings.accumulate_time.mseconds() as u32).to_value() } - "mode" => { - let settings = self.settings.lock().unwrap(); - settings.mode.to_value() - } + "mode" => self.audio_sinkpad.property("mode"), "cc-caps" => { let settings = self.settings.lock().unwrap(); settings.cc_caps.to_value() } - "transcriber" => { - let state = self.state.lock().unwrap(); - if let Some(state) = state.as_ref() { - state.transcriber.to_value() - } else { - let ret: Option = None; - ret.to_value() - } - } + "transcriber" => self.audio_sinkpad.property("transcriber"), "caption-source" => { let settings = self.settings.lock().unwrap(); settings.caption_source.to_value() } - "translation-languages" => { - let settings = self.settings.lock().unwrap(); - settings.translation_languages.to_value() - } + "translation-languages" => self.audio_sinkpad.property("translation-languages"), "translate-latency" => { let settings = self.settings.lock().unwrap(); (settings.translate_latency.mseconds() as u32).to_value() } - "language-code" => { - let settings = self.settings.lock().unwrap(); - settings.language_code.to_value() - } + "language-code" => self.audio_sinkpad.property("language-code"), "mux-method" => { let settings = self.settings.lock().unwrap(); settings.mux_method.to_value() @@ -1341,11 +1466,28 @@ impl ElementImpl for TranscriberBin { &caps, ) .unwrap(); - let audio_sink_pad_template = gst::PadTemplate::new( + let audio_sink_pad_template = gst::PadTemplate::with_gtype( "sink_audio", gst::PadDirection::Sink, gst::PadPresence::Always, &caps, + super::TranscriberSinkPad::static_type(), + ) + .unwrap(); + let secondary_audio_sink_pad_template = gst::PadTemplate::with_gtype( + "sink_audio_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &caps, + super::TranscriberSinkPad::static_type(), + ) + .unwrap(); + let secondary_audio_src_pad_template = gst::PadTemplate::with_gtype( + "src_audio_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &caps, + super::TranscriberSrcPad::static_type(), ) .unwrap(); @@ -1354,12 +1496,182 @@ impl ElementImpl for TranscriberBin { video_sink_pad_template, audio_src_pad_template, audio_sink_pad_template, + secondary_audio_sink_pad_template, + secondary_audio_src_pad_template, ] }); PAD_TEMPLATES.as_ref() } + fn release_pad(&self, pad: &gst::Pad) { + if self.obj().current_state() > gst::State::Null { + gst::fixme!(CAT, obj: pad, "releasing secondary audio stream while PLAYING is untested"); + } + + // In practice we will probably at least need some flushing here, + // and latency recalculating, but at least the basic skeleton for + // releasing is in place + + let Some(pad) = pad.downcast_ref::() else { + gst::error!(CAT, imp: self, "not a transcriber sink pad: {pad:?}"); + return; + }; + + let mut s = self.state.lock().unwrap(); + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + + pad_state.transcription_bin.set_locked_state(true); + let _ = pad_state.transcription_bin.set_state(gst::State::Null); + + if let Some(ref mut state) = s.as_mut() { + if let Some(srcpad) = pad_state.transcription_bin.static_pad("src") { + if let Some(peer) = srcpad.peer() { + let _ = state.ccmux.remove_pad(&peer); + } + } + + let _ = state.transcription_bin.remove(&pad_state.transcription_bin); + for srcpad in pad_state.audio_tee.iterate_src_pads() { + let srcpad = srcpad.unwrap(); + if let Some(peer) = srcpad.peer() { + if let Some(parent) = peer.parent() { + if parent == state.transcription_bin { + let _ = parent.downcast::().unwrap().remove_pad(&peer); + } + } + } + } + let _ = state.internal_bin.remove_many([ + &pad_state.clocksync, + &pad_state.identity, + &pad_state.audio_tee, + ]); + state.audio_sink_pads.remove(pad.name().as_str()); + } + + let srcpad = pad_state + .srcpad_name + .as_ref() + .and_then(|name| self.obj().static_pad(name)); + + drop(ps); + drop(s); + + let _ = pad.set_active(false); + let _ = self.obj().remove_pad(pad); + + if let Some(srcpad) = srcpad { + let _ = srcpad.set_active(false); + let _ = self.obj().remove_pad(&srcpad); + } + + self.obj() + .child_removed(pad.upcast_ref::(), &pad.name()); + } + + fn request_new_pad( + &self, + _templ: &gst::PadTemplate, + _name: Option<&str>, + _caps: Option<&gst::Caps>, + ) -> Option { + let element = self.obj(); + if element.current_state() > gst::State::Null { + gst::error!(CAT, "element pads can only be requested before starting"); + return None; + } + + let mut s = self.state.lock().unwrap(); + if let Some(ref mut state) = s.as_mut() { + let name = format!("sink_audio_{}", state.audio_serial); + + let templ = self.obj().pad_template("sink_audio_%u").unwrap(); + let sink_pad = gst::PadBuilder::::from_template(&templ) + .name(&name) + .build(); + + let src_pad = { + let settings = self.settings.lock().unwrap(); + let mut s = sink_pad.imp().state.lock().unwrap(); + let pad_state = match s.as_mut() { + Ok(s) => s, + Err(e) => { + gst::error!(CAT, "Failed to construct sink pad: {e}"); + return None; + } + }; + let pad_settings = sink_pad.imp().settings.lock().unwrap(); + self.construct_transcription_channels( + &pad_settings, + settings.mux_method, + &mut pad_state.transcription_channels, + ) + .unwrap(); + + if let Err(e) = self.link_input_audio_stream(&name, pad_state, state) { + gst::error!(CAT, "Failed to link secondary audio stream: {e}"); + return None; + } + + let internal_sink_pad = gst::GhostPad::builder_with_target( + &pad_state.clocksync.static_pad("sink").unwrap(), + ) + .unwrap() + .name(name.clone()) + .build(); + internal_sink_pad.set_active(true).unwrap(); + state.internal_bin.add_pad(&internal_sink_pad).unwrap(); + sink_pad.set_target(Some(&internal_sink_pad)).unwrap(); + sink_pad.set_active(true).unwrap(); + state + .audio_sink_pads + .insert(name.to_string(), sink_pad.clone()); + + let templ = self.obj().pad_template("src_audio_%u").unwrap(); + let name = format!("src_audio_{}", state.audio_serial); + let src_pad = gst::PadBuilder::::from_template(&templ) + .name(name.as_str()) + .query_function(|pad, parent, query| { + TranscriberBin::catch_panic_pad_function( + parent, + || false, + |transcriber| transcriber.src_query(pad.upcast_ref(), query), + ) + }) + .build(); + let internal_src_pad = gst::GhostPad::builder_with_target( + &pad_state.queue_passthrough.static_pad("src").unwrap(), + ) + .unwrap() + .name(&name) + .build(); + internal_src_pad.set_active(true).unwrap(); + state.internal_bin.add_pad(&internal_src_pad).unwrap(); + src_pad.set_target(Some(&internal_src_pad)).unwrap(); + src_pad.set_active(true).unwrap(); + + pad_state.srcpad_name = Some(name.clone()); + + src_pad + }; + + state.audio_serial += 1; + + drop(s); + + self.obj().add_pad(&sink_pad).unwrap(); + self.obj() + .child_added(sink_pad.upcast_ref::(), &sink_pad.name()); + self.obj().add_pad(&src_pad).unwrap(); + + Some(sink_pad.upcast()) + } else { + None + } + } + #[allow(clippy::single_match)] fn change_state( &self, @@ -1403,32 +1715,27 @@ impl BinImpl for TranscriberBin { match msg.view() { MessageView::Error(m) => { - /* We must have a state here */ - let s = self.state.lock().unwrap(); - - if let Some(state) = s.as_ref() { - if msg.src() == Some(state.transcriber.upcast_ref()) { - gst::error!( - CAT, - imp: self, - "Transcriber has posted an error ({:?}), going back to passthrough", - m - ); - drop(s); - let mut settings = self.settings.lock().unwrap(); - settings.passthrough = true; - drop(settings); - self.obj().notify("passthrough"); - self.obj().call_async(move |bin| { - let thiz = bin.imp(); - thiz.block_and_update(true); - }); - } else { - drop(s); - self.parent_handle_message(msg); - } + let pad = self + .audio_sinkpad + .downcast_ref::() + .unwrap(); + let ps = pad.imp().state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + if msg.src() == Some(pad_state.transcriber.upcast_ref()) { + gst::error!( + CAT, + imp: self, + "Transcriber has posted an error ({m:?}), going back to passthrough", + ); + drop(ps); + self.settings.lock().unwrap().passthrough = true; + self.obj().notify("passthrough"); + self.obj().call_async(move |bin| { + let thiz = bin.imp(); + thiz.block_and_update(true); + }); } else { - drop(s); + drop(ps); self.parent_handle_message(msg); } } @@ -1436,3 +1743,262 @@ impl BinImpl for TranscriberBin { } } } + +#[derive(Debug, Clone)] +struct TranscriberSinkPadSettings { + translation_languages: Option, + language_code: String, + mode: Cea608Mode, +} + +impl Default for TranscriberSinkPadSettings { + fn default() -> Self { + Self { + translation_languages: Some( + gst::Structure::builder("languages") + .field("transcript", "cc1") + .build(), + ), + language_code: String::from(DEFAULT_INPUT_LANG_CODE), + mode: DEFAULT_MODE, + } + } +} + +struct TranscriberSinkPadState { + clocksync: gst::Element, + identity: gst::Element, + audio_tee: gst::Element, + transcription_bin: gst::Bin, + transcriber_aconv: gst::Element, + transcriber_resample: gst::Element, + transcriber: gst::Element, + queue_passthrough: gst::Element, + transcription_channels: HashMap, + srcpad_name: Option, +} + +impl TranscriberSinkPadState { + fn try_new() -> Result { + Ok(Self { + clocksync: gst::ElementFactory::make("clocksync").build()?, + identity: gst::ElementFactory::make("identity") + // We need to do that otherwise downstream may block for up to + // latency long until the allocation makes it through all branches. + // Audio buffer pools are fortunately not the most critical :) + .property("drop-allocation", true) + .build()?, + audio_tee: gst::ElementFactory::make("tee") + // Protect passthrough enable (and resulting dynamic reconfigure) + // from non-streaming thread + .property("allow-not-linked", true) + .build()?, + transcription_bin: gst::Bin::new(), + transcriber_resample: gst::ElementFactory::make("audioresample").build()?, + transcriber_aconv: gst::ElementFactory::make("audioconvert").build()?, + transcriber: gst::ElementFactory::make("awstranscriber").build()?, + queue_passthrough: gst::ElementFactory::make("queue").build()?, + transcription_channels: HashMap::new(), + srcpad_name: None, + }) + } +} + +pub struct TranscriberSinkPad { + state: Mutex>, + settings: Mutex, +} + +#[glib::object_subclass] +impl ObjectSubclass for TranscriberSinkPad { + const NAME: &'static str = "GstTranscriberSinkPad"; + type Type = super::TranscriberSinkPad; + type ParentType = gst::GhostPad; + + fn new() -> Self { + Self { + state: Mutex::new(TranscriberSinkPadState::try_new()), + settings: Mutex::new(TranscriberSinkPadSettings::default()), + } + } +} + +impl ObjectImpl for TranscriberSinkPad { + fn properties() -> &'static [glib::ParamSpec] { + static PROPERTIES: Lazy> = Lazy::new(|| { + vec![ + glib::ParamSpecBoxed::builder::("translation-languages") + .nick("Translation languages") + .blurb("A map of language codes to caption channels, e.g. translation-languages=\"languages, transcript={CC1, 708_1}, fr={708_2, CC3}\" will map the French translation to CC1/service 1 and the original transcript to CC3/service 2") + .mutable_playing() + .build(), + glib::ParamSpecString::builder("language-code") + .nick("Language Code") + .blurb("The language of the input stream") + .default_value(Some(DEFAULT_INPUT_LANG_CODE)) + .mutable_playing() + .build(), + glib::ParamSpecEnum::builder_with_default("mode", DEFAULT_MODE) + .nick("Mode") + .blurb("Which closed caption mode to operate in") + .mutable_playing() + .build(), + glib::ParamSpecObject::builder::("transcriber") + .nick("Transcriber") + .blurb("The transcriber element to use") + .mutable_ready() + .build(), + ] + }); + + PROPERTIES.as_ref() + } + + fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { + match pspec.name() { + "translation-languages" => { + if let Some(this) = self.obj().parent().and_downcast::() { + let mut settings = self.settings.lock().unwrap(); + settings.translation_languages = value + .get::>() + .expect("type checked upstream"); + gst::debug!( + CAT, + imp: self, + "Updated translation-languages {:?}", + settings.translation_languages + ); + drop(settings); + + this.imp().update_languages(&self.obj(), false); + } + } + "mode" => { + let mut settings = self.settings.lock().unwrap(); + + let old_mode = settings.mode; + let new_mode = value.get().expect("type checked upstream"); + settings.mode = new_mode; + + if old_mode != new_mode { + drop(settings); + if let Some(this) = self.obj().parent().and_downcast::() + { + if let Some(state) = this.imp().state.lock().unwrap().as_ref() { + let ps = self.state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + this.imp().setup_cc_mode( + &self.obj(), + pad_state, + state.mux_method, + new_mode, + ); + } + } + } + } + "language-code" => { + if let Some(this) = self.obj().parent().and_downcast::() { + let code = value + .get::>() + .expect("type checked upstream") + .unwrap_or_else(|| String::from(DEFAULT_INPUT_LANG_CODE)); + let mut settings = self.settings.lock().unwrap(); + if settings.language_code != code { + gst::debug!( + CAT, + imp: self, + "Updating language code {} -> {code}", + settings.language_code, + ); + + settings.language_code = code; + drop(settings); + + this.imp().update_languages(&self.obj(), true) + } + } + } + "transcriber" => { + if let Some(this) = self.obj().parent().and_downcast::() { + let mut s = this.imp().state.lock().unwrap(); + let mut ps = self.state.lock().unwrap(); + let pad_state = ps.as_mut().unwrap(); + let old_transcriber = pad_state.transcriber.clone(); + pad_state.transcriber = value.get().expect("type checked upstream"); + if old_transcriber != pad_state.transcriber { + if let Some(ref mut state) = s.as_mut() { + match this + .imp() + .relink_transcriber(state, pad_state, &old_transcriber) + { + Ok(()) => (), + Err(err) => { + gst::error!(CAT, "invalid transcriber: {err}"); + drop(s); + *this.imp().state.lock().unwrap() = None; + } + } + } + } + } + } + _ => unimplemented!(), + } + } + + fn property(&self, _id: usize, pspec: &glib::ParamSpec) -> glib::Value { + match pspec.name() { + "translation-languages" => { + let settings = self.settings.lock().unwrap(); + settings.translation_languages.to_value() + } + "language-code" => { + let settings = self.settings.lock().unwrap(); + settings.language_code.to_value() + } + "mode" => { + let settings = self.settings.lock().unwrap(); + settings.mode.to_value() + } + "transcriber" => { + let ps = self.state.lock().unwrap(); + let pad_state = ps.as_ref().unwrap(); + pad_state.transcriber.to_value() + } + _ => unimplemented!(), + } + } +} + +impl GstObjectImpl for TranscriberSinkPad {} + +impl PadImpl for TranscriberSinkPad {} + +impl ProxyPadImpl for TranscriberSinkPad {} + +impl GhostPadImpl for TranscriberSinkPad {} + +#[derive(Debug, Default)] +pub struct TranscriberSrcPad {} + +#[glib::object_subclass] +impl ObjectSubclass for TranscriberSrcPad { + const NAME: &'static str = "GstTranscriberSrcPad"; + type Type = super::TranscriberSrcPad; + type ParentType = gst::GhostPad; + + fn new() -> Self { + Default::default() + } +} + +impl ObjectImpl for TranscriberSrcPad {} + +impl GstObjectImpl for TranscriberSrcPad {} + +impl PadImpl for TranscriberSrcPad {} + +impl ProxyPadImpl for TranscriberSrcPad {} + +impl GhostPadImpl for TranscriberSrcPad {} diff --git a/video/closedcaption/src/transcriberbin/mod.rs b/video/closedcaption/src/transcriberbin/mod.rs index 0d115e02..bcec76cd 100644 --- a/video/closedcaption/src/transcriberbin/mod.rs +++ b/video/closedcaption/src/transcriberbin/mod.rs @@ -30,7 +30,15 @@ enum MuxMethod { } glib::wrapper! { - pub struct TranscriberBin(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object; + pub struct TranscriberBin(ObjectSubclass) @extends gst::Bin, gst::Element, gst::Object, @implements gst::ChildProxy; +} + +glib::wrapper! { + pub struct TranscriberSinkPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; +} + +glib::wrapper! { + pub struct TranscriberSrcPad(ObjectSubclass) @extends gst::GhostPad, gst::ProxyPad, gst::Pad, gst::Object; } pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { @@ -38,6 +46,8 @@ pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { { CaptionSource::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); MuxMethod::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + TranscriberSinkPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + TranscriberSrcPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); } gst::Element::register(