gst-plugins-rs/net/aws/src/transcriber/translate.rs
François Laignel c3f17b22f3 aws: transcriber: add support for language identification
This commit adds support for [language identification] to the transcriber element
and makes use of the identified language in the translation pad.

Language identification is activated with either of the following properties
(which match the service API):

- 'identify-language' when a single language is expected in the stream.
- 'identify-multiple-languages' otherwise.

In both cases, the property 'language-options' must list the possible
languages. Ex.: "en-US,es-US,fr-FR".

The following pipeline identifies languages from a stream prossibly containing
multiple languages, outputs the transcription to the 'src' pad and translates
when needed to French ('translate_src_0') & English ('translate_src_1'):

```shell
gst-launch-1.0 -e uridecodebin uri=file:///__PATH_TO_FILE__ ! audioconvert
  ! awstranscriber name=t \
      access-key="__TO_BE_DEFINED__" secret-access-key="__TO_BE_DEFINED__" \
      identify-multiple-languages=true \
      language-options="en-US,es-US,fr-FR" \
      translate_src_0::language-code=fr \
      translate_src_1::language-code=en \
        t. ! fakesink dump=true \
        t.translate_src_0 ! fakesink dump=true \
        t.translate_src_1 ! fakesink dump=true
```

[language identification]: https://docs.aws.amazon.com/transcribe/latest/dg/lang-id-stream.html
2024-04-05 13:43:22 +02:00

647 lines
22 KiB
Rust

// Copyright (C) 2023 François Laignel <francois@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::subclass::prelude::*;
use aws_sdk_translate as aws_translate;
use aws_sdk_translate::error::ProvideErrorMetadata;
use futures::channel::mpsc;
use futures::prelude::*;
use std::ops::ControlFlow;
use std::sync::Arc;
use super::imp::TranslateSrcPad;
use super::transcribe::TranscriptItem;
use super::{TranslationTokenizationMethod, CAT};
const SPAN_START: &str = "<span>";
const SPAN_END: &str = "</span>";
#[derive(Debug)]
pub struct TranslatedItem {
pub pts: gst::ClockTime,
pub duration: gst::ClockTime,
pub content: String,
}
impl From<&TranscriptItem> for TranslatedItem {
fn from(transcript_item: &TranscriptItem) -> Self {
TranslatedItem {
pts: transcript_item.pts,
duration: transcript_item.duration,
content: transcript_item.content.clone(),
}
}
}
pub struct TranslateLoop {
pad: glib::subclass::ObjectImplRef<TranslateSrcPad>,
client: aws_translate::Client,
output_lang: String,
tokenization_method: TranslationTokenizationMethod,
transcript_rx: mpsc::Receiver<Arc<Vec<TranscriptItem>>>,
translate_tx: mpsc::Sender<Vec<TranslatedItem>>,
}
impl TranslateLoop {
pub fn new(
imp: &super::imp::Transcriber,
pad: &TranslateSrcPad,
output_lang: &str,
tokenization_method: TranslationTokenizationMethod,
transcript_rx: mpsc::Receiver<Arc<Vec<TranscriptItem>>>,
translate_tx: mpsc::Sender<Vec<TranslatedItem>>,
) -> Self {
let aws_config = imp.aws_config.lock().unwrap();
let aws_config = aws_config
.as_ref()
.expect("aws_config must be initialized at this stage");
TranslateLoop {
pad: pad.ref_counted(),
client: aws_sdk_translate::Client::new(aws_config),
output_lang: output_lang.to_string(),
tokenization_method,
transcript_rx,
translate_tx,
}
}
pub async fn check_language(&self) -> Result<(), gst::ErrorMessage> {
let language_list = self.client.list_languages().send().await.map_err(|err| {
let err = format!(
"Failed to call list_languages service: {err}: {}",
err.meta()
);
gst::info!(CAT, imp: self.pad, "{err}");
gst::error_msg!(gst::LibraryError::Failed, ["{err}"])
})?;
let found_output_lang = language_list
.languages()
.iter()
.any(|lang| lang.language_code() == self.output_lang);
if !found_output_lang {
let err = format!("Unknown output languages: {}", self.output_lang);
gst::info!(CAT, imp: self.pad, "{err}");
return Err(gst::error_msg!(gst::LibraryError::Failed, ["{err}"]));
}
Ok(())
}
pub async fn run(mut self) -> Result<(), gst::ErrorMessage> {
use TranslationTokenizationMethod as Tokenization;
while let Some(transcript_items) = self.transcript_rx.next().await {
if transcript_items.is_empty() {
continue;
}
let mut ts_duration_list: Vec<(gst::ClockTime, gst::ClockTime)> = vec![];
let mut content = String::new();
let mut content_lang = Option::<String>::None;
let mut needs_translation = false;
let mut it = transcript_items.iter().peekable();
while let Some(item) = it.next() {
let lang_changed = if !content.is_empty() {
// Some items already buffered
match (content_lang.as_ref(), item.lang_code.as_ref()) {
(Some(clang), Some(ilang)) => {
// Content and new item langs are defined
!clang.eq_ignore_ascii_case(ilang.as_str())
}
(None, Some(_)) | (Some(_), None) => {
// Content uses an undefined lang
// but new item's lang is defined
// or Content uses a defined lang
// but incoming item's lang is undefined
true
}
(None, None) => false,
}
} else {
false
};
if lang_changed
&& self
.handle_transcript_items(
&mut ts_duration_list,
&mut content,
needs_translation,
content_lang.take(),
)
.await?
.is_break()
{
gst::info!(CAT, imp: self.pad, "exiting translation loop");
break;
}
if content.is_empty() {
// Either first item or content drained above
content_lang = item.lang_code.as_ref().map(|lang| lang.to_string());
needs_translation = self.needs_translation(content_lang.as_deref());
}
let suffix = match it.peek() {
Some(next_item) => {
if next_item.is_punctuation {
""
} else {
" "
}
}
None => "",
};
ts_duration_list.push((item.pts, item.duration));
let item_content =
if needs_translation && self.tokenization_method == Tokenization::SpanBased {
format!("{SPAN_START}{}{SPAN_END}{}", item.content, suffix)
} else {
format!("{}{}", item.content, suffix)
};
content.push_str(&item_content);
}
if !content.is_empty()
&& self
.handle_transcript_items(
&mut ts_duration_list,
&mut content,
needs_translation,
content_lang.take(),
)
.await?
.is_break()
{
gst::info!(CAT, imp: self.pad, "exiting translation loop");
break;
}
}
Ok(())
}
#[inline]
fn needs_translation(&self, lang: Option<&str>) -> bool {
let Some(lang) = lang else { return false };
!lang.to_ascii_lowercase().starts_with(&self.output_lang)
}
async fn handle_transcript_items(
&mut self,
ts_duration_list: &mut Vec<(gst::ClockTime, gst::ClockTime)>,
content: &mut String,
needs_translation: bool,
content_lang: Option<String>,
) -> Result<ControlFlow<()>, gst::ErrorMessage> {
use std::mem;
use TranslationTokenizationMethod as Tokenization;
let output_text = if needs_translation {
gst::debug!(CAT, imp: self.pad,
"Translating: '{content}' from {content_lang:?} to {} with {ts_duration_list:?}",
self.output_lang,
);
let translated_text = self
.client
.translate_text()
.set_source_language_code(content_lang)
.set_target_language_code(Some(self.output_lang.clone()))
.set_text(Some(mem::take(content)))
.send()
.await
.map_err(|err| {
let err = format!("Failed to call translation service: {err}: {}", err.meta());
gst::info!(CAT, imp: self.pad, "{err}");
gst::error_msg!(gst::LibraryError::Failed, ["{err}"])
})?
.translated_text;
gst::debug!(CAT, imp: self.pad, "Got translation: '{translated_text}'");
translated_text
} else {
gst::debug!(CAT, imp: self.pad,
"Not translating: '{content}' from {content_lang:?} to {} with {ts_duration_list:?}",
self.output_lang,
);
mem::take(content)
};
let translated_items =
if needs_translation && self.tokenization_method == Tokenization::SpanBased {
span_tokenize_items(&output_text, ts_duration_list.drain(..))
} else {
// Push translation as a single item
let mut ts_duration_iter = ts_duration_list.drain(..).peekable();
let &(first_pts, _) = ts_duration_iter.peek().expect("at least one item");
let (last_pts, last_duration) = ts_duration_iter.last().expect("at least one item");
vec![TranslatedItem {
pts: first_pts,
duration: last_pts.saturating_sub(first_pts) + last_duration,
content: output_text,
}]
};
gst::trace!(CAT, imp: self.pad, "Sending {translated_items:?}");
if self.translate_tx.send(translated_items).await.is_err() {
gst::info!(CAT, imp: self.pad, "translation chan terminated");
return Ok(ControlFlow::Break(()));
}
Ok(ControlFlow::Continue(()))
}
}
/// Parses translated items from the `translation` `String` using `span` tags.
///
/// The `translation` is expected to have been returned by the `Translate` ws.
/// It can contain id-less `<span>` and `</span>` tags, matching similar
/// id-less tags from the content submitted to the `Translate` ws.
///
/// This parser accepts both serial `<span></span>` as well as nested
/// `<span><span></span></span>`.
///
/// The parsed items are assigned the ts and duration from `ts_duration_list`
/// in their order of appearance.
///
/// If more parsed items are found, the last item will concatenate the remaining items.
///
/// If less parsed items are found, the last item will be assign the remaining
/// duration from the `ts_duration_list`.
fn span_tokenize_items(
translation: &str,
ts_duration_list: impl IntoIterator<Item = (gst::ClockTime, gst::ClockTime)>,
) -> Vec<TranslatedItem> {
const SPAN_START_LEN: usize = SPAN_START.len();
const SPAN_END_LEN: usize = SPAN_END.len();
let mut translated_items = vec![];
let mut ts_duration_iter = ts_duration_list.into_iter();
// Content for a translated item
let mut content = String::new();
// Alleged span chunk
let mut chunk = String::new();
for c in translation.chars() {
if content.is_empty() && c.is_whitespace() {
// ignore leading whitespaces
continue;
}
if chunk.is_empty() {
if c == '<' {
// Start an alleged span chunk
chunk.push(c);
} else {
content.push(c);
}
continue;
}
chunk.push(c);
match chunk.len() {
len if len < SPAN_START_LEN => continue,
SPAN_START_LEN => {
if chunk != SPAN_START {
continue;
}
// Got a <span>
}
SPAN_END_LEN => {
if chunk != SPAN_END {
continue;
}
// Got a </span>
}
_ => {
// Can no longer be a span
content.extend(chunk.drain(..));
continue;
}
}
// got a span
chunk.clear();
if content.is_empty() {
continue;
}
// Add pending content
// assign it the next pts and duration from the input list
if let Some((pts, duration)) = ts_duration_iter.next() {
translated_items.push(TranslatedItem {
pts,
duration,
content,
});
content = String::new();
} else if let Some(last_item) = translated_items.last_mut() {
// exhausted available pts and duration
// add content to last item
if !last_item.content.ends_with(' ') {
last_item.content.push(' ');
}
last_item.content.extend(content.drain(..));
}
}
content.extend(chunk.drain(..));
if !content.is_empty() {
// Add last content
if let Some((pts, mut duration)) = ts_duration_iter.next() {
if let Some((last_pts, last_duration)) = ts_duration_iter.last() {
// Fix remaining duration
duration = last_pts.saturating_sub(pts) + last_duration;
}
translated_items.push(TranslatedItem {
pts,
duration,
content,
});
} else if let Some(last_item) = translated_items.last_mut() {
// No more pts and duration in the index
// Add remaining content to the last item pushed
if !last_item.content.ends_with(' ') {
last_item.content.push(' ');
}
last_item.content.push_str(&content);
}
} else if let Some((last_pts, last_duration)) = ts_duration_iter.last() {
if let Some(last_item) = translated_items.last_mut() {
// No more content, but need to fix last item's duration
last_item.duration = last_pts.saturating_sub(last_item.pts) + last_duration;
}
}
translated_items
}
#[cfg(test)]
mod tests {
use super::span_tokenize_items;
use gst::prelude::*;
#[test]
fn serial_spans() {
let input = "<span>first</span> <span>second</span> <span>third</span>";
let ts_duration_list = vec![
(0.seconds(), 1.seconds()),
(1.seconds(), 2.seconds()),
(4.seconds(), 3.seconds()),
];
let mut items = span_tokenize_items(input, ts_duration_list).into_iter();
let first = items.next().unwrap();
assert_eq!(first.pts, 0.seconds());
assert_eq!(first.duration, 1.seconds());
assert_eq!(first.content, "first");
let second = items.next().unwrap();
assert_eq!(second.pts, 1.seconds());
assert_eq!(second.duration, 2.seconds());
assert_eq!(second.content, "second");
let third = items.next().unwrap();
assert_eq!(third.pts, 4.seconds());
assert_eq!(third.duration, 3.seconds());
assert_eq!(third.content, "third");
assert!(items.next().is_none());
}
#[test]
fn serial_and_nested_spans() {
let input = "<span>first</span> <span>second <span>third</span></span> <span>fourth</span>";
let ts_duration_list = vec![
(0.seconds(), 1.seconds()),
(1.seconds(), 2.seconds()),
(3.seconds(), 1.seconds()),
(4.seconds(), 2.seconds()),
];
let mut items = span_tokenize_items(input, ts_duration_list).into_iter();
let first = items.next().unwrap();
assert_eq!(first.pts, 0.seconds());
assert_eq!(first.duration, 1.seconds());
assert_eq!(first.content, "first");
let second = items.next().unwrap();
assert_eq!(second.pts, 1.seconds());
assert_eq!(second.duration, 2.seconds());
assert_eq!(second.content, "second ");
let third = items.next().unwrap();
assert_eq!(third.pts, 3.seconds());
assert_eq!(third.duration, 1.seconds());
assert_eq!(third.content, "third");
let fourth = items.next().unwrap();
assert_eq!(fourth.pts, 4.seconds());
assert_eq!(fourth.duration, 2.seconds());
assert_eq!(fourth.content, "fourth");
assert!(items.next().is_none());
}
#[test]
fn nonspaned_serial_and_nested_spans() {
let input = "Initial <span>first</span> <span>second <span>third</span></span> <span>fourth</span> final";
let ts_duration_list = vec![
(0.seconds(), 1.seconds()),
(1.seconds(), 1.seconds()),
(2.seconds(), 1.seconds()),
(3.seconds(), 1.seconds()),
(4.seconds(), 1.seconds()),
(5.seconds(), 1.seconds()),
];
let mut items = span_tokenize_items(input, ts_duration_list).into_iter();
let init = items.next().unwrap();
assert_eq!(init.pts, 0.seconds());
assert_eq!(init.duration, 1.seconds());
assert_eq!(init.content, "Initial ");
let first = items.next().unwrap();
assert_eq!(first.pts, 1.seconds());
assert_eq!(first.duration, 1.seconds());
assert_eq!(first.content, "first");
let second = items.next().unwrap();
assert_eq!(second.pts, 2.seconds());
assert_eq!(second.duration, 1.seconds());
assert_eq!(second.content, "second ");
let third = items.next().unwrap();
assert_eq!(third.pts, 3.seconds());
assert_eq!(third.duration, 1.seconds());
assert_eq!(third.content, "third");
let fourth = items.next().unwrap();
assert_eq!(fourth.pts, 4.seconds());
assert_eq!(fourth.duration, 1.seconds());
assert_eq!(fourth.content, "fourth");
let final_ = items.next().unwrap();
assert_eq!(final_.pts, 5.seconds());
assert_eq!(final_.duration, 1.seconds());
assert_eq!(final_.content, "final");
assert!(items.next().is_none());
}
#[test]
fn more_parsed_items() {
let input = "<span>first</span> <span>second</span> <span>third</span> <span>fourth</span>";
let ts_duration_list = vec![
(0.seconds(), 1.seconds()),
(1.seconds(), 2.seconds()),
(4.seconds(), 3.seconds()),
];
let mut items = span_tokenize_items(input, ts_duration_list).into_iter();
let first = items.next().unwrap();
assert_eq!(first.pts, 0.seconds());
assert_eq!(first.duration, 1.seconds());
assert_eq!(first.content, "first");
let second = items.next().unwrap();
assert_eq!(second.pts, 1.seconds());
assert_eq!(second.duration, 2.seconds());
assert_eq!(second.content, "second");
let third = items.next().unwrap();
assert_eq!(third.pts, 4.seconds());
assert_eq!(third.duration, 3.seconds());
assert_eq!(third.content, "third fourth");
assert!(items.next().is_none());
}
#[test]
fn more_parsed_items_nonspan_final() {
let input = "<span>first</span> <span>second</span> <span>third</span> final";
let ts_duration_list = vec![
(0.seconds(), 1.seconds()),
(1.seconds(), 2.seconds()),
(4.seconds(), 3.seconds()),
];
let mut items = span_tokenize_items(input, ts_duration_list).into_iter();
let first = items.next().unwrap();
assert_eq!(first.pts, 0.seconds());
assert_eq!(first.duration, 1.seconds());
assert_eq!(first.content, "first");
let second = items.next().unwrap();
assert_eq!(second.pts, 1.seconds());
assert_eq!(second.duration, 2.seconds());
assert_eq!(second.content, "second");
let third = items.next().unwrap();
assert_eq!(third.pts, 4.seconds());
assert_eq!(third.duration, 3.seconds());
assert_eq!(third.content, "third final");
assert!(items.next().is_none());
}
#[test]
fn less_parsed_items() {
let input = "<span>first</span> <span>second</span>";
let ts_duration_list = vec![
(0.seconds(), 1.seconds()),
(1.seconds(), 2.seconds()),
(4.seconds(), 3.seconds()),
];
let mut items = span_tokenize_items(input, ts_duration_list).into_iter();
let first = items.next().unwrap();
assert_eq!(first.pts, 0.seconds());
assert_eq!(first.duration, 1.seconds());
assert_eq!(first.content, "first");
let second = items.next().unwrap();
assert_eq!(second.pts, 1.seconds());
assert_eq!(second.duration, 6.seconds());
assert_eq!(second.content, "second");
assert!(items.next().is_none());
}
#[test]
fn less_parsed_items_nonspan_final() {
let input = "<span>first</span> final";
let ts_duration_list = vec![
(0.seconds(), 1.seconds()),
(1.seconds(), 2.seconds()),
(4.seconds(), 3.seconds()),
];
let mut items = span_tokenize_items(input, ts_duration_list).into_iter();
let first = items.next().unwrap();
assert_eq!(first.pts, 0.seconds());
assert_eq!(first.duration, 1.seconds());
assert_eq!(first.content, "first");
let final_ = items.next().unwrap();
assert_eq!(final_.pts, 1.seconds());
assert_eq!(final_.duration, 6.seconds());
assert_eq!(final_.content, "final");
assert!(items.next().is_none());
}
#[test]
fn utf8_input() {
let input = "caractères accentués";
let ts_duration_list = vec![(0.seconds(), 1.seconds())];
let mut items = span_tokenize_items(input, ts_duration_list).into_iter();
let first = items.next().unwrap();
assert_eq!(first.pts, 0.seconds());
assert_eq!(first.duration, 1.seconds());
assert_eq!(first.content, "caractères accentués");
assert!(items.next().is_none());
}
}