net/aws/transcriber: fix translate lookahead

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1137>
This commit is contained in:
François Laignel 2023-03-16 10:29:24 +01:00
parent d5d6a4daf9
commit 162db2f3b9
3 changed files with 21 additions and 27 deletions

View file

@ -793,7 +793,7 @@
"construct": false, "construct": false,
"construct-only": false, "construct-only": false,
"controllable": false, "controllable": false,
"default": "5000", "default": "3000",
"max": "-1", "max": "-1",
"min": "0", "min": "0",
"mutable": "ready", "mutable": "ready",

View file

@ -60,7 +60,7 @@ const TRANSLATE_LATENCY_PROPERTY: &str = "translate-latency";
pub const DEFAULT_TRANSLATE_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(500); pub const DEFAULT_TRANSLATE_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(500);
const TRANSLATE_LOOKAHEAD_PROPERTY: &str = "translate-lookahead"; const TRANSLATE_LOOKAHEAD_PROPERTY: &str = "translate-lookahead";
pub const DEFAULT_TRANSLATE_LOOKAHEAD: gst::ClockTime = gst::ClockTime::from_seconds(5); pub const DEFAULT_TRANSLATE_LOOKAHEAD: gst::ClockTime = gst::ClockTime::from_seconds(3);
const DEFAULT_LATENESS: gst::ClockTime = gst::ClockTime::ZERO; const DEFAULT_LATENESS: gst::ClockTime = gst::ClockTime::ZERO;
pub const DEFAULT_INPUT_LANG_CODE: &str = "en-US"; pub const DEFAULT_INPUT_LANG_CODE: &str = "en-US";
@ -115,12 +115,12 @@ impl Default for Settings {
} }
} }
pub(super) struct State { struct State {
buffer_tx: Option<mpsc::Sender<gst::Buffer>>, buffer_tx: Option<mpsc::Sender<gst::Buffer>>,
transcriber_loop_handle: Option<task::JoinHandle<Result<(), gst::ErrorMessage>>>, transcriber_loop_handle: Option<task::JoinHandle<Result<(), gst::ErrorMessage>>>,
srcpads: BTreeSet<super::TranslateSrcPad>, srcpads: BTreeSet<super::TranslateSrcPad>,
pad_serial: u32, pad_serial: u32,
pub seqnum: gst::Seqnum, seqnum: gst::Seqnum,
} }
impl Default for State { impl Default for State {
@ -1059,25 +1059,21 @@ impl TranslationPadTask {
} }
if self.needs_translate && !self.translate_queue.is_empty() { if self.needs_translate && !self.translate_queue.is_empty() {
// Maximum delay for an item to be pushed to stream on time // Latency budget for an item to be pushed to stream on time
// Margin: // Margin:
// - 1 * GRANULARITY: the time it will take before we can check this again, // - 2 * GRANULARITY: to make sure we don't push items up to GRANULARITY late.
// without running late, in the case of a timeout. // - 1 * GRANULARITY: extra margin to account for additional overheads.
// - 2 * GRANULARITY: extra margin to account for additional overheads. let latency = self.our_latency.saturating_sub(3 * GRANULARITY);
// FIXME explaing which ones.
let max_delay = self.our_latency.saturating_sub(3 * GRANULARITY);
// Estimated time of arrival for an item sent to translation now. // Estimated time of arrival for an item sent to translation now.
// (in transcript item ts base) // (in transcript item ts base)
let translation_eta = now + self.translate_latency - start_time; let translation_eta = now + self.translate_latency - start_time;
let deadline = translation_eta.saturating_sub(max_delay); if let Some(ready_items) =
self.translate_queue
if let Some(ready_items) = self .dequeue(latency, translation_eta, self.translate_lookahead)
.translate_queue
.dequeue(deadline, self.translate_lookahead)
{ {
gst::debug!(CAT, imp: self.pad, "Forcing {} transcripts to translation", ready_items.len()); gst::debug!(CAT, imp: self.pad, "Forcing to translation: {ready_items:?}");
if self.send_for_translation(ready_items).await.is_err() { if self.send_for_translation(ready_items).await.is_err() {
return false; return false;
} }
@ -1097,10 +1093,8 @@ impl TranslationPadTask {
); );
// Margin: // Margin:
// - 1 * GRANULARITY: the time it will take before we can check this again, // - 2 * GRANULARITY: to make sure we don't push items up to GRANULARITY late.
// without running late, in the case of a timeout. // - 1 * GRANULARITY: extra margin to account for additional overheads.
// - 2 * GRANULARITY: extra margin to account for additional overheads.
// FIXME explaing which ones.
if item.pts + self.our_latency.saturating_sub(3 * GRANULARITY) < now - start_time { if item.pts + self.our_latency.saturating_sub(3 * GRANULARITY) < now - start_time {
/* Safe unwrap, we know we have an item */ /* Safe unwrap, we know we have an item */
let TranslatedItem { let TranslatedItem {
@ -1459,9 +1453,7 @@ impl TranslateSrcPad {
&elem_settings.language_code, &elem_settings.language_code,
pad_settings.language_code.as_deref(), pad_settings.language_code.as_deref(),
) { ) {
elem_settings.transcribe_latency elem_settings.transcribe_latency + elem_settings.translate_latency
+ elem_settings.translate_lookahead
+ elem_settings.translate_latency
} else { } else {
elem_settings.transcribe_latency elem_settings.transcribe_latency
} }

View file

@ -76,18 +76,20 @@ impl TranslateQueue {
/// Returns `Some(..)` if some items match the criteria. /// Returns `Some(..)` if some items match the criteria.
pub fn dequeue( pub fn dequeue(
&mut self, &mut self,
deadline: gst::ClockTime, latency: gst::ClockTime,
threshold: gst::ClockTime,
lookahead: gst::ClockTime, lookahead: gst::ClockTime,
) -> Option<Vec<TranscriptItem>> { ) -> Option<Vec<TranscriptItem>> {
if self.items.front()?.pts < deadline { let first_pts = self.items.front()?.pts;
if first_pts + latency > threshold {
// First item is too early to be sent to translation now // First item is too early to be sent to translation now
// we can wait for more items to accumulate. // we can wait for more items to accumulate.
return None; return None;
} }
// Can't wait any longer to send the first item to translation // Can't wait any longer to send the first item to translation
// Try to get up to lookahead more items to improve translation accuracy // Try to get up to lookahead worth of items to improve translation accuracy
let limit = deadline + lookahead; let limit = first_pts + lookahead;
let mut items_acc = vec![self.items.pop_front().unwrap()]; let mut items_acc = vec![self.items.pop_front().unwrap()];
while let Some(item) = self.items.front() { while let Some(item) = self.items.front() {