awstranscriber, speechmatics: store language tags on translation source pads

In order to do so we need to activate the pad as soon as it is added,
which means we can no longer start the task at this point, instead wait
for stream-start to do so now.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2029>
This commit is contained in:
Mathieu Duponchelle 2025-01-07 17:35:09 +01:00 committed by GStreamer Marge Bot
parent 2ec5747ec7
commit c51a65d973
2 changed files with 144 additions and 21 deletions

View file

@ -827,9 +827,7 @@ impl Transcriber {
_mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
if active {
pad.imp().start_task()?;
} else {
if !active {
pad.imp().stop_task()?;
}
@ -966,6 +964,36 @@ impl Transcriber {
true
}
gst::EventView::Tag(_) => true,
gst::EventView::StreamStart(e) => {
let srcpads = self.state.lock().unwrap().srcpads.clone();
for srcpad in &srcpads {
let sev = gst::event::StreamStart::builder("transcription")
.seqnum(e.seqnum())
.build();
if !srcpad.push_event(sev) {
gst::error!(CAT, obj = srcpad, "Failed to push stream start event");
return false;
}
let unsynced_pad = srcpad.imp().state.lock().unwrap().unsynced_pad.clone();
if let Some(pad) = unsynced_pad {
let sev = gst::event::StreamStart::builder("unsynced-transcription")
.seqnum(e.seqnum())
.build();
if !pad.push_event(sev) {
gst::error!(CAT, obj = pad, "Failed to push stream start event");
return false;
}
}
if let Err(err) = srcpad.imp().start_task() {
gst::error!(CAT, imp = self, "Failed to start srcpad task: {}", err);
return false;
}
}
true
}
gst::EventView::Caps(_) => {
let srcpads = self.state.lock().unwrap().srcpads.clone();
@ -1852,6 +1880,9 @@ impl ElementImpl for Transcriber {
self.obj().add_pad(&pad).unwrap();
self.obj().add_pad(&unsynced_srcpad).unwrap();
pad.set_active(true).unwrap();
unsynced_srcpad.set_active(true).unwrap();
self.obj().child_added(&pad, &pad.name());
Some(pad.upcast())
@ -1989,7 +2020,31 @@ impl ObjectImpl for TranscriberSrcPad {
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
OUTPUT_LANG_CODE_PROPERTY => {
self.settings.lock().unwrap().language_code = value.get().unwrap()
let language_code: Option<String> = value.get().unwrap();
self.settings.lock().unwrap().language_code = language_code.clone();
if let Some(language_code) = language_code {
// Make sure our tags do not get overwritten
let sev = gst::event::StreamStart::builder("transcription").build();
let _ = self.obj().store_sticky_event(&sev);
let mut tl = gst::TagList::new();
tl.make_mut().add::<gst::tags::LanguageCode>(
&language_code.as_str(),
gst::TagMergeMode::Append,
);
let ev = gst::event::Tag::builder(tl).build();
let _ = self.obj().store_sticky_event(&ev);
if let Some(pad) = self.state.lock().unwrap().unsynced_pad.as_ref() {
// Make sure our tags do not get overwritten
let sev =
gst::event::StreamStart::builder("unsynced-transcription").build();
let _ = pad.store_sticky_event(&sev);
let _ = pad.store_sticky_event(&ev);
}
}
}
_ => unimplemented!(),
}

View file

@ -279,7 +279,16 @@ impl Transcriber {
gst::info!(CAT, "Received caps {c:?}");
true
}
StreamStart(_) => true,
StreamStart(_) => {
let state = self.state.lock().unwrap();
match self.start_srcpad_tasks(&state) {
Err(err) => {
gst::error!(CAT, imp = self, "Failed to start srcpad tasks: {err}");
false
}
Ok(_) => true,
}
}
_ => gst::Pad::event_default(pad, Some(&*self.obj()), event),
}
}
@ -707,6 +716,18 @@ impl ObjectSubclass for Transcriber {
}
}
fn store_language_tag(pad: &gst::Pad, stream_id: &str, language_code: &str) {
// Make sure our tags do not get overwritten
let sev = gst::event::StreamStart::builder(stream_id).build();
let _ = pad.store_sticky_event(&sev);
let mut tl = gst::TagList::new();
tl.make_mut()
.add::<gst::tags::LanguageCode>(&language_code, gst::TagMergeMode::Append);
let ev = gst::event::Tag::builder(tl).build();
let _ = pad.store_sticky_event(&ev);
}
impl ObjectImpl for Transcriber {
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: LazyLock<Vec<glib::ParamSpec>> = LazyLock::new(|| {
@ -809,28 +830,64 @@ impl ObjectImpl for Transcriber {
fn constructed(&self) {
self.parent_constructed();
let language_code = self.settings.lock().unwrap().language_code.clone();
let obj = self.obj();
obj.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.static_srcpad).unwrap();
obj.add_pad(
self.static_srcpad
.imp()
.state
.lock()
.unwrap()
.unsynced_pad
.as_ref()
.unwrap(),
)
.unwrap();
self.static_srcpad.set_active(true).unwrap();
store_language_tag(
self.static_srcpad.upcast_ref(),
"transcription",
&language_code,
);
let pad_state = self.static_srcpad.imp().state.lock().unwrap();
let unsynced_pad = pad_state.unsynced_pad.as_ref().unwrap();
obj.add_pad(unsynced_pad).unwrap();
unsynced_pad.set_active(true).unwrap();
store_language_tag(
unsynced_pad.upcast_ref(),
"unsynced-transcription",
&language_code,
);
obj.set_element_flags(gst::ElementFlags::PROVIDE_CLOCK | gst::ElementFlags::REQUIRE_CLOCK);
}
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
"language-code" => {
let language_code: String = value.get().expect("type checked upstream");
store_language_tag(
self.static_srcpad.upcast_ref(),
"transcription",
&language_code,
);
if let Some(unsynced_pad) = self
.static_srcpad
.imp()
.state
.lock()
.unwrap()
.unsynced_pad
.as_ref()
{
store_language_tag(
unsynced_pad.upcast_ref(),
"unsynced-transcription",
&language_code,
);
}
let mut settings = self.settings.lock().unwrap();
settings.language_code = value.get().expect("type checked upstream");
settings.language_code = language_code;
}
DEPRECATED_LATENCY_PROPERTY => {
let mut settings = self.settings.lock().unwrap();
@ -1120,6 +1177,9 @@ impl ElementImpl for Transcriber {
self.obj().add_pad(&pad).unwrap();
self.obj().add_pad(&static_unsynced_srcpad).unwrap();
pad.set_active(true).unwrap();
static_unsynced_srcpad.set_active(true).unwrap();
let _ = self
.obj()
.post_message(gst::message::Latency::builder().src(&*self.obj()).build());
@ -1873,9 +1933,7 @@ impl TranslateSrcPad {
_mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
if active {
pad.imp().start_task()?;
} else {
if !active {
pad.imp().stop_task();
}
@ -1971,7 +2029,17 @@ impl ObjectImpl for TranslateSrcPad {
fn set_property(&self, _id: usize, value: &glib::Value, pspec: &glib::ParamSpec) {
match pspec.name() {
OUTPUT_LANG_CODE_PROPERTY => {
self.settings.lock().unwrap().language_code = value.get().unwrap()
let language_code: Option<String> = value.get().unwrap();
self.settings.lock().unwrap().language_code = language_code.clone();
if let Some(language_code) = language_code {
store_language_tag(self.obj().upcast_ref(), "transcription", &language_code);
if let Some(pad) = self.state.lock().unwrap().unsynced_pad.as_ref() {
store_language_tag(pad, "unsynced-transcription", &language_code);
}
}
}
TRANSLATION_TOKENIZATION_PROPERTY => {
self.settings.lock().unwrap().tokenization_method = value.get().unwrap()