awstranslate: expose new accumulator-lateness property

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2221>
This commit is contained in:
Mathieu Duponchelle 2025-05-02 13:03:47 +02:00 committed by GStreamer Marge Bot
parent 9a784d5979
commit 2e03d4f693
3 changed files with 163 additions and 37 deletions

View file

@ -1579,6 +1579,20 @@
"type": "gchararray", "type": "gchararray",
"writable": true "writable": true
}, },
"accumulator-lateness": {
"blurb": "By how much to shift input timestamps forward for accumulating",
"conditionally-available": false,
"construct": false,
"construct-only": false,
"controllable": false,
"default": "0",
"max": "-1",
"min": "0",
"mutable": "ready",
"readable": true,
"type": "guint",
"writable": true
},
"input-language-code": { "input-language-code": {
"blurb": "The Language of the input stream", "blurb": "The Language of the input stream",
"conditionally-available": false, "conditionally-available": false,
@ -1592,7 +1606,7 @@
"writable": true "writable": true
}, },
"latency": { "latency": {
"blurb": "Amount of milliseconds to allow AWS Polly", "blurb": "Amount of milliseconds to allow AWS translate",
"conditionally-available": false, "conditionally-available": false,
"construct": false, "construct": false,
"construct-only": false, "construct-only": false,

View file

@ -31,6 +31,7 @@ static AWS_BEHAVIOR_VERSION: LazyLock<aws_config::BehaviorVersion> =
LazyLock::new(aws_config::BehaviorVersion::v2023_11_09); LazyLock::new(aws_config::BehaviorVersion::v2023_11_09);
const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_seconds(2); const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_seconds(2);
const DEFAULT_ACCUMULATOR_LATENESS: gst::ClockTime = gst::ClockTime::from_seconds(0);
const DEFAULT_REGION: &str = "us-east-1"; const DEFAULT_REGION: &str = "us-east-1";
const DEFAULT_INPUT_LANG_CODE: &str = "en-US"; const DEFAULT_INPUT_LANG_CODE: &str = "en-US";
const DEFAULT_OUTPUT_LANG_CODE: &str = "fr-FR"; const DEFAULT_OUTPUT_LANG_CODE: &str = "fr-FR";
@ -40,6 +41,7 @@ const DEFAULT_TOKENIZATION_METHOD: TranslationTokenizationMethod =
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct Settings { struct Settings {
latency: gst::ClockTime, latency: gst::ClockTime,
accumulator_lateness: gst::ClockTime,
input_language_code: String, input_language_code: String,
output_language_code: String, output_language_code: String,
access_key: Option<String>, access_key: Option<String>,
@ -52,6 +54,7 @@ impl Default for Settings {
fn default() -> Self { fn default() -> Self {
Self { Self {
latency: DEFAULT_LATENCY, latency: DEFAULT_LATENCY,
accumulator_lateness: DEFAULT_ACCUMULATOR_LATENESS,
input_language_code: String::from(DEFAULT_INPUT_LANG_CODE), input_language_code: String::from(DEFAULT_INPUT_LANG_CODE),
output_language_code: String::from(DEFAULT_OUTPUT_LANG_CODE), output_language_code: String::from(DEFAULT_OUTPUT_LANG_CODE),
access_key: None, access_key: None,
@ -88,10 +91,6 @@ impl InputItems {
self.0.first().map(|item| item.discont).unwrap_or(false) self.0.first().map(|item| item.discont).unwrap_or(false)
} }
fn end_pts(&self) -> Option<gst::ClockTime> {
self.0.last().map(|item| item.end_pts)
}
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
self.0.is_empty() self.0.is_empty()
} }
@ -136,12 +135,17 @@ impl InputItems {
Self(items) Self(items)
} }
fn timeout(&mut self, now: gst::ClockTime, upstream_min: gst::ClockTime) -> Option<Self> { fn timeout(
&mut self,
now: gst::ClockTime,
upstream_min: gst::ClockTime,
lateness: gst::ClockTime,
) -> Option<Self> {
if let Some(start_rtime) = self.start_rtime() { if let Some(start_rtime) = self.start_rtime() {
if start_rtime + upstream_min < now { if start_rtime + upstream_min + lateness < now {
gst::debug!( gst::debug!(
CAT, CAT,
"draining on timeout: {start_rtime} + {upstream_min} < {now}", "draining on timeout: {start_rtime} + {upstream_min} + {lateness} < {now}",
); );
Some(self.drain(true)) Some(self.drain(true))
} else { } else {
@ -315,13 +319,18 @@ impl Translate {
self.srcpad.push_event(event) self.srcpad.push_event(event)
} }
Gap(gap) => { Gap(gap) => {
let lateness = self.settings.lock().unwrap().accumulator_lateness;
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let (pts, duration) = gap.get(); let (pts, duration) = gap.get();
if state.accumulator.is_empty() { if state.accumulator.is_empty() {
if let Some(translate_tx) = state.translate_tx.as_ref() { if let Some(translate_tx) = state.translate_tx.as_ref() {
let _ = translate_tx.send(TranslateInput::Gap { pts, duration }); let _ = translate_tx.send(TranslateInput::Gap {
pts: pts + lateness,
duration,
});
} }
} }
@ -395,13 +404,15 @@ impl Translate {
return Ok(gst::FlowSuccess::Ok); return Ok(gst::FlowSuccess::Ok);
} }
let lateness = self.settings.lock().unwrap().accumulator_lateness;
loop { loop {
let to_translate = self let to_translate =
.state self.state
.lock() .lock()
.unwrap() .unwrap()
.accumulator .accumulator
.timeout(now, upstream_min); .timeout(now, upstream_min, lateness);
if let Some(to_translate) = to_translate { if let Some(to_translate) = to_translate {
self.do_send(to_translate)?; self.do_send(to_translate)?;
} else { } else {
@ -451,25 +462,19 @@ impl Translate {
} }
async fn send(&self, to_translate: InputItems) -> Result<Vec<TranslateOutput>, Error> { async fn send(&self, to_translate: InputItems) -> Result<Vec<TranslateOutput>, Error> {
let (input_lang, output_lang, latency, tokenization_method) = { let (input_lang, output_lang, latency, tokenization_method, lateness) = {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
( (
settings.input_language_code.clone(), settings.input_language_code.clone(),
settings.output_language_code.clone(), settings.output_language_code.clone(),
settings.latency, settings.latency,
settings.tokenization_method, settings.tokenization_method,
settings.accumulator_lateness,
) )
}; };
let (client, segment, speaker) = { let (client, segment, speaker) = {
let mut state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
state
.segment
.as_mut()
.unwrap()
.set_position(to_translate.end_pts());
( (
state.client.as_ref().unwrap().clone(), state.client.as_ref().unwrap().clone(),
state.segment.as_ref().unwrap().clone(), state.segment.as_ref().unwrap().clone(),
@ -568,7 +573,13 @@ impl Translate {
let mut translated_items_builder = gst::Structure::builder("awstranslate/items"); let mut translated_items_builder = gst::Structure::builder("awstranslate/items");
let mut end_pts: Option<gst::ClockTime> = None;
for mut item in translated_items.drain(..) { for mut item in translated_items.drain(..) {
translated_items_builder = translated_items_builder.field(&item.content, item.pts);
item.pts += lateness;
if let Some((upstream_live, upstream_min, _)) = upstream_latency { if let Some((upstream_live, upstream_min, _)) = upstream_latency {
if upstream_live { if upstream_live {
if let Some(now) = self.obj().current_running_time() { if let Some(now) = self.obj().current_running_time() {
@ -588,8 +599,6 @@ impl Translate {
} }
} }
translated_items_builder = translated_items_builder.field(&item.content, item.pts);
let mut buf = gst::Buffer::from_mut_slice(item.content.into_bytes()); let mut buf = gst::Buffer::from_mut_slice(item.content.into_bytes());
{ {
let buf_mut = buf.get_mut().unwrap(); let buf_mut = buf.get_mut().unwrap();
@ -602,6 +611,8 @@ impl Translate {
} }
} }
end_pts = Some(item.pts);
output.push(TranslateOutput::Item(buf)); output.push(TranslateOutput::Item(buf));
} }
@ -621,6 +632,16 @@ impl Translate {
.build(), .build(),
); );
if let Some(end_pts) = end_pts {
self.state
.lock()
.unwrap()
.segment
.as_mut()
.unwrap()
.set_position(end_pts);
}
Ok(output) Ok(output)
} }
@ -962,6 +983,29 @@ impl ObjectImpl for Translate {
.mutable_ready() .mutable_ready()
.deprecated() .deprecated()
.build(), .build(),
/**
* GstAwsTranslate:accumulator-lateness
*
* The element will accumulate input text until a deadline is
* reached, function of the first item running time and the
* upstream latency.
*
* For live cases where overall latency is to be kept low at the
* expense of synchronization, this property can be set to still
* accumulate reasonable amounts of text for translation.
*
* The timestamps of the translated text will then be shifted forward
* by the value of this property.
*
* Since: plugins-rs-0.14.0
*/
glib::ParamSpecUInt::builder("accumulator-lateness")
.nick("Accumulator Latenness")
.blurb("By how much to shift input timestamps forward for accumulating")
.default_value(DEFAULT_ACCUMULATOR_LATENESS.mseconds() as u32)
.mutable_ready()
.deprecated()
.build(),
glib::ParamSpecString::builder("access-key") glib::ParamSpecString::builder("access-key")
.nick("Access Key") .nick("Access Key")
.blurb("AWS Access Key") .blurb("AWS Access Key")
@ -1017,6 +1061,12 @@ impl ObjectImpl for Translate {
value.get::<u32>().expect("type checked upstream").into(), value.get::<u32>().expect("type checked upstream").into(),
); );
} }
"accumulator-lateness" => {
let mut settings = self.settings.lock().unwrap();
settings.accumulator_lateness = gst::ClockTime::from_mseconds(
value.get::<u32>().expect("type checked upstream").into(),
);
}
"access-key" => { "access-key" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
settings.access_key = value.get().expect("type checked upstream"); settings.access_key = value.get().expect("type checked upstream");
@ -1052,6 +1102,10 @@ impl ObjectImpl for Translate {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
(settings.latency.mseconds() as u32).to_value() (settings.latency.mseconds() as u32).to_value()
} }
"accumulator-lateness" => {
let settings = self.settings.lock().unwrap();
(settings.accumulator_lateness.mseconds() as u32).to_value()
}
"access-key" => { "access-key" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.access_key.to_value() settings.access_key.to_value()
@ -1163,8 +1217,7 @@ mod tests {
assert!(accumulator.is_empty()); assert!(accumulator.is_empty());
assert_eq!(accumulator.start_rtime(), None); assert_eq!(accumulator.start_rtime(), None);
assert_eq!(accumulator.start_pts(), None); assert_eq!(accumulator.start_pts(), None);
assert_eq!(accumulator.discont(), false); assert!(!accumulator.discont());
assert_eq!(accumulator.end_pts(), None);
assert!(accumulator.drain(false).is_empty()); assert!(accumulator.drain(false).is_empty());
assert!(accumulator assert!(accumulator
@ -1209,11 +1262,7 @@ mod tests {
accumulator.start_pts(), accumulator.start_pts(),
Some(gst::ClockTime::from_nseconds(0)) Some(gst::ClockTime::from_nseconds(0))
); );
assert_eq!(accumulator.discont(), true); assert!(accumulator.discont());
assert_eq!(
accumulator.end_pts(),
Some(gst::ClockTime::from_nseconds(3))
);
assert!(!accumulator.drain(false).is_empty()); assert!(!accumulator.drain(false).is_empty());
} }
@ -1240,14 +1289,15 @@ mod tests {
]); ]);
let upstream_min = gst::ClockTime::from_nseconds(5); let upstream_min = gst::ClockTime::from_nseconds(5);
let lateness = gst::ClockTime::from_nseconds(0);
assert!(accumulator assert!(accumulator
.timeout(gst::ClockTime::from_nseconds(5), upstream_min) .timeout(gst::ClockTime::from_nseconds(5), upstream_min, lateness)
.is_none()); .is_none());
assert_eq!( assert_eq!(
accumulator accumulator
.timeout(gst::ClockTime::from_nseconds(6), upstream_min) .timeout(gst::ClockTime::from_nseconds(6), upstream_min, lateness)
.unwrap() .unwrap()
.0 .0
.len(), .len(),
@ -1287,14 +1337,15 @@ mod tests {
]); ]);
let upstream_min = gst::ClockTime::from_nseconds(5); let upstream_min = gst::ClockTime::from_nseconds(5);
let lateness = gst::ClockTime::from_nseconds(0);
assert!(accumulator assert!(accumulator
.timeout(gst::ClockTime::from_nseconds(5), upstream_min) .timeout(gst::ClockTime::from_nseconds(5), upstream_min, lateness)
.is_none()); .is_none());
assert_eq!( assert_eq!(
accumulator accumulator
.timeout(gst::ClockTime::from_nseconds(6), upstream_min) .timeout(gst::ClockTime::from_nseconds(6), upstream_min, lateness)
.unwrap() .unwrap()
.0 .0
.len(), .len(),
@ -1303,4 +1354,44 @@ mod tests {
assert_eq!(accumulator.0.len(), 1); assert_eq!(accumulator.0.len(), 1);
} }
#[test]
fn test_accumulator_lateness() {
let mut accumulator = InputItems(vec![
InputItem {
content: "0".into(),
pts: gst::ClockTime::from_nseconds(0),
rtime: gst::ClockTime::from_nseconds(0),
end_pts: gst::ClockTime::from_nseconds(1),
is_punctuation: false,
discont: true,
},
InputItem {
content: "2".into(),
pts: gst::ClockTime::from_nseconds(2),
rtime: gst::ClockTime::from_nseconds(2),
end_pts: gst::ClockTime::from_nseconds(3),
is_punctuation: false,
discont: false,
},
]);
let upstream_min = gst::ClockTime::from_nseconds(5);
let lateness = gst::ClockTime::from_nseconds(10);
assert!(accumulator
.timeout(gst::ClockTime::from_nseconds(5), upstream_min, lateness)
.is_none());
assert_eq!(
accumulator
.timeout(gst::ClockTime::from_nseconds(16), upstream_min, lateness)
.unwrap()
.0
.len(),
2
);
assert!(accumulator.is_empty());
}
} }

View file

@ -6,6 +6,27 @@
// //
// SPDX-License-Identifier: MPL-2.0 // SPDX-License-Identifier: MPL-2.0
/**
* SECTION:element-awstranslate
*
* `awstranslate` is an element that can be used to translate text from one
* language to another.
*
* When working with live data, the element will accumulate input text until
* the deadline is reached, comparing the current running time with the running
* time of the input items and the upstream latency.
*
* At this point the input items will be drained up until the first item ending
* with a punctuation symbol.
*
* The accumulator will also be drained upon receiving `rstranscribe/final-transcript`
* and `rstranscribe/speaker-change` custom events.
*
* When the user wants to use a very low / no latency upstream, and is willing to
* accept desynchronization in order to build up long-enough sentences for translation,
* they can set the #GstAwsTranslate:accumulator-lateness property to shift the input
* timestamps forward when accumulating.
*/
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;