From c51a65d9737756c76771c2e039c298dfa8573147 Mon Sep 17 00:00:00 2001 From: Mathieu Duponchelle Date: Tue, 7 Jan 2025 17:35:09 +0100 Subject: [PATCH] awstranscriber, speechmatics: store language tags on translation source pads In order to do so we need to activate the pad as soon as it is added, which means we can no longer start the task at this point, instead wait for stream-start to do so now. Part-of: --- audio/speechmatics/src/transcriber/imp.rs | 63 ++++++++++++- net/aws/src/transcriber/imp.rs | 102 ++++++++++++++++++---- 2 files changed, 144 insertions(+), 21 deletions(-) diff --git a/audio/speechmatics/src/transcriber/imp.rs b/audio/speechmatics/src/transcriber/imp.rs index f29908edb..cef6bd60a 100644 --- a/audio/speechmatics/src/transcriber/imp.rs +++ b/audio/speechmatics/src/transcriber/imp.rs @@ -827,9 +827,7 @@ impl Transcriber { _mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError> { - if active { - pad.imp().start_task()?; - } else { + if !active { pad.imp().stop_task()?; } @@ -966,6 +964,36 @@ impl Transcriber { true } gst::EventView::Tag(_) => true, + gst::EventView::StreamStart(e) => { + let srcpads = self.state.lock().unwrap().srcpads.clone(); + for srcpad in &srcpads { + let sev = gst::event::StreamStart::builder("transcription") + .seqnum(e.seqnum()) + .build(); + if !srcpad.push_event(sev) { + gst::error!(CAT, obj = srcpad, "Failed to push stream start event"); + return false; + } + + let unsynced_pad = srcpad.imp().state.lock().unwrap().unsynced_pad.clone(); + + if let Some(pad) = unsynced_pad { + let sev = gst::event::StreamStart::builder("unsynced-transcription") + .seqnum(e.seqnum()) + .build(); + if !pad.push_event(sev) { + gst::error!(CAT, obj = pad, "Failed to push stream start event"); + return false; + } + } + + if let Err(err) = srcpad.imp().start_task() { + gst::error!(CAT, imp = self, "Failed to start srcpad task: {}", err); + return false; + } + } + true + } gst::EventView::Caps(_) => { let srcpads = self.state.lock().unwrap().srcpads.clone(); @@ -1852,6 +1880,9 @@ impl ElementImpl for Transcriber { self.obj().add_pad(&pad).unwrap(); self.obj().add_pad(&unsynced_srcpad).unwrap(); + pad.set_active(true).unwrap(); + unsynced_srcpad.set_active(true).unwrap(); + self.obj().child_added(&pad, &pad.name()); Some(pad.upcast()) @@ -1989,7 +2020,31 @@ impl ObjectImpl for TranscriberSrcPad { fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { OUTPUT_LANG_CODE_PROPERTY => { - self.settings.lock().unwrap().language_code = value.get().unwrap() + let language_code: Option = value.get().unwrap(); + + self.settings.lock().unwrap().language_code = language_code.clone(); + + if let Some(language_code) = language_code { + // Make sure our tags do not get overwritten + let sev = gst::event::StreamStart::builder("transcription").build(); + let _ = self.obj().store_sticky_event(&sev); + + let mut tl = gst::TagList::new(); + tl.make_mut().add::( + &language_code.as_str(), + gst::TagMergeMode::Append, + ); + let ev = gst::event::Tag::builder(tl).build(); + let _ = self.obj().store_sticky_event(&ev); + + if let Some(pad) = self.state.lock().unwrap().unsynced_pad.as_ref() { + // Make sure our tags do not get overwritten + let sev = + gst::event::StreamStart::builder("unsynced-transcription").build(); + let _ = pad.store_sticky_event(&sev); + let _ = pad.store_sticky_event(&ev); + } + } } _ => unimplemented!(), } diff --git a/net/aws/src/transcriber/imp.rs b/net/aws/src/transcriber/imp.rs index 51745fc19..59248c261 100644 --- a/net/aws/src/transcriber/imp.rs +++ b/net/aws/src/transcriber/imp.rs @@ -279,7 +279,16 @@ impl Transcriber { gst::info!(CAT, "Received caps {c:?}"); true } - StreamStart(_) => true, + StreamStart(_) => { + let state = self.state.lock().unwrap(); + match self.start_srcpad_tasks(&state) { + Err(err) => { + gst::error!(CAT, imp = self, "Failed to start srcpad tasks: {err}"); + false + } + Ok(_) => true, + } + } _ => gst::Pad::event_default(pad, Some(&*self.obj()), event), } } @@ -707,6 +716,18 @@ impl ObjectSubclass for Transcriber { } } +fn store_language_tag(pad: &gst::Pad, stream_id: &str, language_code: &str) { + // Make sure our tags do not get overwritten + let sev = gst::event::StreamStart::builder(stream_id).build(); + let _ = pad.store_sticky_event(&sev); + + let mut tl = gst::TagList::new(); + tl.make_mut() + .add::(&language_code, gst::TagMergeMode::Append); + let ev = gst::event::Tag::builder(tl).build(); + let _ = pad.store_sticky_event(&ev); +} + impl ObjectImpl for Transcriber { fn properties() -> &'static [glib::ParamSpec] { static PROPERTIES: LazyLock> = LazyLock::new(|| { @@ -809,28 +830,64 @@ impl ObjectImpl for Transcriber { fn constructed(&self) { self.parent_constructed(); + let language_code = self.settings.lock().unwrap().language_code.clone(); + let obj = self.obj(); obj.add_pad(&self.sinkpad).unwrap(); obj.add_pad(&self.static_srcpad).unwrap(); - obj.add_pad( - self.static_srcpad - .imp() - .state - .lock() - .unwrap() - .unsynced_pad - .as_ref() - .unwrap(), - ) - .unwrap(); + self.static_srcpad.set_active(true).unwrap(); + + store_language_tag( + self.static_srcpad.upcast_ref(), + "transcription", + &language_code, + ); + + let pad_state = self.static_srcpad.imp().state.lock().unwrap(); + + let unsynced_pad = pad_state.unsynced_pad.as_ref().unwrap(); + + obj.add_pad(unsynced_pad).unwrap(); + unsynced_pad.set_active(true).unwrap(); + + store_language_tag( + unsynced_pad.upcast_ref(), + "unsynced-transcription", + &language_code, + ); + obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK); } fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { "language-code" => { + let language_code: String = value.get().expect("type checked upstream"); + + store_language_tag( + self.static_srcpad.upcast_ref(), + "transcription", + &language_code, + ); + + if let Some(unsynced_pad) = self + .static_srcpad + .imp() + .state + .lock() + .unwrap() + .unsynced_pad + .as_ref() + { + store_language_tag( + unsynced_pad.upcast_ref(), + "unsynced-transcription", + &language_code, + ); + } + let mut settings = self.settings.lock().unwrap(); - settings.language_code = value.get().expect("type checked upstream"); + settings.language_code = language_code; } DEPRECATED_LATENCY_PROPERTY => { let mut settings = self.settings.lock().unwrap(); @@ -1120,6 +1177,9 @@ impl ElementImpl for Transcriber { self.obj().add_pad(&pad).unwrap(); self.obj().add_pad(&static_unsynced_srcpad).unwrap(); + pad.set_active(true).unwrap(); + static_unsynced_srcpad.set_active(true).unwrap(); + let _ = self .obj() .post_message(gst::message::Latency::builder().src(&*self.obj()).build()); @@ -1873,9 +1933,7 @@ impl TranslateSrcPad { _mode: gst::PadMode, active: bool, ) -> Result<(), gst::LoggableError> { - if active { - pad.imp().start_task()?; - } else { + if !active { pad.imp().stop_task(); } @@ -1971,7 +2029,17 @@ impl ObjectImpl for TranslateSrcPad { fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) { match pspec.name() { OUTPUT_LANG_CODE_PROPERTY => { - self.settings.lock().unwrap().language_code = value.get().unwrap() + let language_code: Option = value.get().unwrap(); + + self.settings.lock().unwrap().language_code = language_code.clone(); + + if let Some(language_code) = language_code { + store_language_tag(self.obj().upcast_ref(), "transcription", &language_code); + + if let Some(pad) = self.state.lock().unwrap().unsynced_pad.as_ref() { + store_language_tag(pad, "unsynced-transcription", &language_code); + } + } } TRANSLATION_TOKENIZATION_PROPERTY => { self.settings.lock().unwrap().tokenization_method = value.get().unwrap()