diff --git a/docs/plugins/gst_plugins_cache.json b/docs/plugins/gst_plugins_cache.json index e3a1aa99f..ced3e7b79 100644 --- a/docs/plugins/gst_plugins_cache.json +++ b/docs/plugins/gst_plugins_cache.json @@ -1579,6 +1579,20 @@ "type": "gchararray", "writable": true }, + "accumulator-lateness": { + "blurb": "By how much to shift input timestamps forward for accumulating", + "conditionally-available": false, + "construct": false, + "construct-only": false, + "controllable": false, + "default": "0", + "max": "-1", + "min": "0", + "mutable": "ready", + "readable": true, + "type": "guint", + "writable": true + }, "input-language-code": { "blurb": "The Language of the input stream", "conditionally-available": false, @@ -1592,7 +1606,7 @@ "writable": true }, "latency": { - "blurb": "Amount of milliseconds to allow AWS Polly", + "blurb": "Amount of milliseconds to allow AWS translate", "conditionally-available": false, "construct": false, "construct-only": false, diff --git a/net/aws/src/translate/imp.rs b/net/aws/src/translate/imp.rs index effd6bab7..7cc63b0bf 100644 --- a/net/aws/src/translate/imp.rs +++ b/net/aws/src/translate/imp.rs @@ -31,6 +31,7 @@ static AWS_BEHAVIOR_VERSION: LazyLock = LazyLock::new(aws_config::BehaviorVersion::v2023_11_09); const DEFAULT_LATENCY: gst::ClockTime = gst::ClockTime::from_seconds(2); +const DEFAULT_ACCUMULATOR_LATENESS: gst::ClockTime = gst::ClockTime::from_seconds(0); const DEFAULT_REGION: &str = "us-east-1"; const DEFAULT_INPUT_LANG_CODE: &str = "en-US"; const DEFAULT_OUTPUT_LANG_CODE: &str = "fr-FR"; @@ -40,6 +41,7 @@ const DEFAULT_TOKENIZATION_METHOD: TranslationTokenizationMethod = #[derive(Debug, Clone)] struct Settings { latency: gst::ClockTime, + accumulator_lateness: gst::ClockTime, input_language_code: String, output_language_code: String, access_key: Option, @@ -52,6 +54,7 @@ impl Default for Settings { fn default() -> Self { Self { latency: DEFAULT_LATENCY, + accumulator_lateness: DEFAULT_ACCUMULATOR_LATENESS, input_language_code: String::from(DEFAULT_INPUT_LANG_CODE), output_language_code: String::from(DEFAULT_OUTPUT_LANG_CODE), access_key: None, @@ -88,10 +91,6 @@ impl InputItems { self.0.first().map(|item| item.discont).unwrap_or(false) } - fn end_pts(&self) -> Option { - self.0.last().map(|item| item.end_pts) - } - fn is_empty(&self) -> bool { self.0.is_empty() } @@ -136,12 +135,17 @@ impl InputItems { Self(items) } - fn timeout(&mut self, now: gst::ClockTime, upstream_min: gst::ClockTime) -> Option { + fn timeout( + &mut self, + now: gst::ClockTime, + upstream_min: gst::ClockTime, + lateness: gst::ClockTime, + ) -> Option { if let Some(start_rtime) = self.start_rtime() { - if start_rtime + upstream_min < now { + if start_rtime + upstream_min + lateness < now { gst::debug!( CAT, - "draining on timeout: {start_rtime} + {upstream_min} < {now}", + "draining on timeout: {start_rtime} + {upstream_min} + {lateness} < {now}", ); Some(self.drain(true)) } else { @@ -315,13 +319,18 @@ impl Translate { self.srcpad.push_event(event) } Gap(gap) => { + let lateness = self.settings.lock().unwrap().accumulator_lateness; + let state = self.state.lock().unwrap(); let (pts, duration) = gap.get(); if state.accumulator.is_empty() { if let Some(translate_tx) = state.translate_tx.as_ref() { - let _ = translate_tx.send(TranslateInput::Gap { pts, duration }); + let _ = translate_tx.send(TranslateInput::Gap { + pts: pts + lateness, + duration, + }); } } @@ -395,13 +404,15 @@ impl Translate { return Ok(gst::FlowSuccess::Ok); } + let lateness = self.settings.lock().unwrap().accumulator_lateness; + loop { - let to_translate = self - .state - .lock() - .unwrap() - .accumulator - .timeout(now, upstream_min); + let to_translate = + self.state + .lock() + .unwrap() + .accumulator + .timeout(now, upstream_min, lateness); if let Some(to_translate) = to_translate { self.do_send(to_translate)?; } else { @@ -451,25 +462,19 @@ impl Translate { } async fn send(&self, to_translate: InputItems) -> Result, Error> { - let (input_lang, output_lang, latency, tokenization_method) = { + let (input_lang, output_lang, latency, tokenization_method, lateness) = { let settings = self.settings.lock().unwrap(); ( settings.input_language_code.clone(), settings.output_language_code.clone(), settings.latency, settings.tokenization_method, + settings.accumulator_lateness, ) }; let (client, segment, speaker) = { - let mut state = self.state.lock().unwrap(); - - state - .segment - .as_mut() - .unwrap() - .set_position(to_translate.end_pts()); - + let state = self.state.lock().unwrap(); ( state.client.as_ref().unwrap().clone(), state.segment.as_ref().unwrap().clone(), @@ -568,7 +573,13 @@ impl Translate { let mut translated_items_builder = gst::Structure::builder("awstranslate/items"); + let mut end_pts: Option = None; + for mut item in translated_items.drain(..) { + translated_items_builder = translated_items_builder.field(&item.content, item.pts); + + item.pts += lateness; + if let Some((upstream_live, upstream_min, _)) = upstream_latency { if upstream_live { if let Some(now) = self.obj().current_running_time() { @@ -588,8 +599,6 @@ impl Translate { } } - translated_items_builder = translated_items_builder.field(&item.content, item.pts); - let mut buf = gst::Buffer::from_mut_slice(item.content.into_bytes()); { let buf_mut = buf.get_mut().unwrap(); @@ -602,6 +611,8 @@ impl Translate { } } + end_pts = Some(item.pts); + output.push(TranslateOutput::Item(buf)); } @@ -621,6 +632,16 @@ impl Translate { .build(), ); + if let Some(end_pts) = end_pts { + self.state + .lock() + .unwrap() + .segment + .as_mut() + .unwrap() + .set_position(end_pts); + } + Ok(output) } @@ -962,6 +983,29 @@ impl ObjectImpl for Translate { .mutable_ready() .deprecated() .build(), + /** + * GstAwsTranslate:accumulator-lateness + * + * The element will accumulate input text until a deadline is + * reached, function of the first item running time and the + * upstream latency. + * + * For live cases where overall latency is to be kept low at the + * expense of synchronization, this property can be set to still + * accumulate reasonable amounts of text for translation. + * + * The timestamps of the translated text will then be shifted forward + * by the value of this property. + * + * Since: plugins-rs-0.14.0 + */ + glib::ParamSpecUInt::builder("accumulator-lateness") + .nick("Accumulator Latenness") + .blurb("By how much to shift input timestamps forward for accumulating") + .default_value(DEFAULT_ACCUMULATOR_LATENESS.mseconds() as u32) + .mutable_ready() + .deprecated() + .build(), glib::ParamSpecString::builder("access-key") .nick("Access Key") .blurb("AWS Access Key") @@ -1017,6 +1061,12 @@ impl ObjectImpl for Translate { value.get::().expect("type checked upstream").into(), ); } + "accumulator-lateness" => { + let mut settings = self.settings.lock().unwrap(); + settings.accumulator_lateness = gst::ClockTime::from_mseconds( + value.get::().expect("type checked upstream").into(), + ); + } "access-key" => { let mut settings = self.settings.lock().unwrap(); settings.access_key = value.get().expect("type checked upstream"); @@ -1052,6 +1102,10 @@ impl ObjectImpl for Translate { let settings = self.settings.lock().unwrap(); (settings.latency.mseconds() as u32).to_value() } + "accumulator-lateness" => { + let settings = self.settings.lock().unwrap(); + (settings.accumulator_lateness.mseconds() as u32).to_value() + } "access-key" => { let settings = self.settings.lock().unwrap(); settings.access_key.to_value() @@ -1163,8 +1217,7 @@ mod tests { assert!(accumulator.is_empty()); assert_eq!(accumulator.start_rtime(), None); assert_eq!(accumulator.start_pts(), None); - assert_eq!(accumulator.discont(), false); - assert_eq!(accumulator.end_pts(), None); + assert!(!accumulator.discont()); assert!(accumulator.drain(false).is_empty()); assert!(accumulator @@ -1209,11 +1262,7 @@ mod tests { accumulator.start_pts(), Some(gst::ClockTime::from_nseconds(0)) ); - assert_eq!(accumulator.discont(), true); - assert_eq!( - accumulator.end_pts(), - Some(gst::ClockTime::from_nseconds(3)) - ); + assert!(accumulator.discont()); assert!(!accumulator.drain(false).is_empty()); } @@ -1240,14 +1289,15 @@ mod tests { ]); let upstream_min = gst::ClockTime::from_nseconds(5); + let lateness = gst::ClockTime::from_nseconds(0); assert!(accumulator - .timeout(gst::ClockTime::from_nseconds(5), upstream_min) + .timeout(gst::ClockTime::from_nseconds(5), upstream_min, lateness) .is_none()); assert_eq!( accumulator - .timeout(gst::ClockTime::from_nseconds(6), upstream_min) + .timeout(gst::ClockTime::from_nseconds(6), upstream_min, lateness) .unwrap() .0 .len(), @@ -1287,14 +1337,15 @@ mod tests { ]); let upstream_min = gst::ClockTime::from_nseconds(5); + let lateness = gst::ClockTime::from_nseconds(0); assert!(accumulator - .timeout(gst::ClockTime::from_nseconds(5), upstream_min) + .timeout(gst::ClockTime::from_nseconds(5), upstream_min, lateness) .is_none()); assert_eq!( accumulator - .timeout(gst::ClockTime::from_nseconds(6), upstream_min) + .timeout(gst::ClockTime::from_nseconds(6), upstream_min, lateness) .unwrap() .0 .len(), @@ -1303,4 +1354,44 @@ mod tests { assert_eq!(accumulator.0.len(), 1); } + + #[test] + fn test_accumulator_lateness() { + let mut accumulator = InputItems(vec![ + InputItem { + content: "0".into(), + pts: gst::ClockTime::from_nseconds(0), + rtime: gst::ClockTime::from_nseconds(0), + end_pts: gst::ClockTime::from_nseconds(1), + is_punctuation: false, + discont: true, + }, + InputItem { + content: "2".into(), + pts: gst::ClockTime::from_nseconds(2), + rtime: gst::ClockTime::from_nseconds(2), + end_pts: gst::ClockTime::from_nseconds(3), + is_punctuation: false, + discont: false, + }, + ]); + + let upstream_min = gst::ClockTime::from_nseconds(5); + let lateness = gst::ClockTime::from_nseconds(10); + + assert!(accumulator + .timeout(gst::ClockTime::from_nseconds(5), upstream_min, lateness) + .is_none()); + + assert_eq!( + accumulator + .timeout(gst::ClockTime::from_nseconds(16), upstream_min, lateness) + .unwrap() + .0 + .len(), + 2 + ); + + assert!(accumulator.is_empty()); + } } diff --git a/net/aws/src/translate/mod.rs b/net/aws/src/translate/mod.rs index a0f77a0c7..d198a69f5 100644 --- a/net/aws/src/translate/mod.rs +++ b/net/aws/src/translate/mod.rs @@ -6,6 +6,27 @@ // // SPDX-License-Identifier: MPL-2.0 +/** + * SECTION:element-awstranslate + * + * `awstranslate` is an element that can be used to translate text from one + * language to another. + * + * When working with live data, the element will accumulate input text until + * the deadline is reached, comparing the current running time with the running + * time of the input items and the upstream latency. + * + * At this point the input items will be drained up until the first item ending + * with a punctuation symbol. + * + * The accumulator will also be drained upon receiving `rstranscribe/final-transcript` + * and `rstranscribe/speaker-change` custom events. + * + * When the user wants to use a very low / no latency upstream, and is willing to + * accept desynchronization in order to build up long-enough sentences for translation, + * they can set the #GstAwsTranslate:accumulator-lateness property to shift the input + * timestamps forward when accumulating. + */ use gst::glib; use gst::prelude::*;