net/aws/transcriber: desambiguify SrcPad output items queue

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1137>
This commit is contained in:
François Laignel 2023-03-16 12:17:38 +01:00
parent 162db2f3b9
commit 5a5ca76d9d

View file

@ -115,6 +115,33 @@ impl Default for Settings {
} }
} }
#[derive(Clone, Debug, Default)]
struct OutputItem {
pts: gst::ClockTime,
duration: gst::ClockTime,
content: String,
}
impl From<&TranscriptItem> for OutputItem {
fn from(item: &TranscriptItem) -> Self {
OutputItem {
pts: item.pts,
duration: item.duration,
content: item.content.clone(),
}
}
}
impl From<TranslatedItem> for OutputItem {
fn from(item: TranslatedItem) -> Self {
OutputItem {
pts: item.pts,
duration: item.duration,
content: item.content,
}
}
}
struct State { struct State {
buffer_tx: Option<mpsc::Sender<gst::Buffer>>, buffer_tx: Option<mpsc::Sender<gst::Buffer>>,
transcriber_loop_handle: Option<task::JoinHandle<Result<(), gst::ErrorMessage>>>, transcriber_loop_handle: Option<task::JoinHandle<Result<(), gst::ErrorMessage>>>,
@ -856,7 +883,7 @@ struct TranslationPadTask {
translate_latency: gst::ClockTime, translate_latency: gst::ClockTime,
translate_lookahead: gst::ClockTime, translate_lookahead: gst::ClockTime,
send_events: bool, send_events: bool,
translated_items: VecDeque<TranslatedItem>, output_items: VecDeque<OutputItem>,
our_latency: gst::ClockTime, our_latency: gst::ClockTime,
seqnum: gst::Seqnum, seqnum: gst::Seqnum,
send_eos: bool, send_eos: bool,
@ -882,7 +909,7 @@ impl TranslationPadTask {
translate_latency: DEFAULT_TRANSLATE_LATENCY, translate_latency: DEFAULT_TRANSLATE_LATENCY,
translate_lookahead: DEFAULT_TRANSLATE_LOOKAHEAD, translate_lookahead: DEFAULT_TRANSLATE_LOOKAHEAD,
send_events: true, send_events: true,
translated_items: VecDeque::new(), output_items: VecDeque::new(),
our_latency: DEFAULT_TRANSCRIBE_LATENCY, our_latency: DEFAULT_TRANSCRIBE_LATENCY,
seqnum: gst::Seqnum::next(), seqnum: gst::Seqnum::next(),
send_eos: false, send_eos: false,
@ -941,9 +968,7 @@ impl TranslationPadTask {
use broadcast::error::RecvError; use broadcast::error::RecvError;
match items_res { match items_res {
Ok(Items(transcript_items)) => { Ok(Items(transcript_items)) => {
for transcript_item in transcript_items.iter() { self.output_items.extend(transcript_items.iter().map(Into::into));
self.translated_items.push_back(transcript_item.into());
}
} }
Ok(Eos) => { Ok(Eos) => {
gst::debug!(CAT, imp: self.pad, "Got eos"); gst::debug!(CAT, imp: self.pad, "Got eos");
@ -999,7 +1024,7 @@ impl TranslationPadTask {
return Err(gst::error_msg!(gst::StreamError::Failed, ["{ERR}"])); return Err(gst::error_msg!(gst::StreamError::Failed, ["{ERR}"]));
}; };
self.translated_items.extend(translated_items); self.output_items.extend(translated_items.into_iter().map(Into::into));
self.pending_translations = self.pending_translations.saturating_sub(1); self.pending_translations = self.pending_translations.saturating_sub(1);
return Ok(()); return Ok(());
@ -1029,8 +1054,36 @@ impl TranslationPadTask {
}; };
for items in transcript_items.iter() { for items in transcript_items.iter() {
if let Some(ready_items) = self.translate_queue.push(items) { if let Some(items_to_translate) = self.translate_queue.push(items) {
self.send_for_translation(ready_items).await?; self.send_for_translation(items_to_translate).await?;
}
}
Ok(())
}
async fn dequeue_for_translation(
&mut self,
start_time: gst::ClockTime,
now: gst::ClockTime,
) -> Result<(), gst::ErrorMessage> {
if !self.translate_queue.is_empty() {
// Latency budget for an item to be pushed to stream on time
// Margin:
// - 2 * GRANULARITY: to make sure we don't push items up to GRANULARITY late.
// - 1 * GRANULARITY: extra margin to account for additional overheads.
let latency = self.our_latency.saturating_sub(3 * GRANULARITY);
// Estimated time of arrival for an item sent to translation now.
// (in transcript item ts base)
let translation_eta = now + self.translate_latency - start_time;
if let Some(items_to_translate) =
self.translate_queue
.dequeue(latency, translation_eta, self.translate_lookahead)
{
gst::debug!(CAT, imp: self.pad, "Forcing to translation: {items_to_translate:?}");
self.send_for_translation(items_to_translate).await?;
} }
} }
@ -1058,30 +1111,12 @@ impl TranslationPadTask {
discont_pending = pad_state.discont_pending; discont_pending = pad_state.discont_pending;
} }
if self.needs_translate && !self.translate_queue.is_empty() { if self.needs_translate && self.dequeue_for_translation(start_time, now).await.is_err() {
// Latency budget for an item to be pushed to stream on time return false;
// Margin:
// - 2 * GRANULARITY: to make sure we don't push items up to GRANULARITY late.
// - 1 * GRANULARITY: extra margin to account for additional overheads.
let latency = self.our_latency.saturating_sub(3 * GRANULARITY);
// Estimated time of arrival for an item sent to translation now.
// (in transcript item ts base)
let translation_eta = now + self.translate_latency - start_time;
if let Some(ready_items) =
self.translate_queue
.dequeue(latency, translation_eta, self.translate_lookahead)
{
gst::debug!(CAT, imp: self.pad, "Forcing to translation: {ready_items:?}");
if self.send_for_translation(ready_items).await.is_err() {
return false;
}
}
} }
/* First, check our pending buffers */ /* First, check our pending buffers */
while let Some(item) = self.translated_items.front() { while let Some(item) = self.output_items.front() {
// Note: items pts start from 0 + lateness // Note: items pts start from 0 + lateness
gst::trace!( gst::trace!(
CAT, CAT,
@ -1097,11 +1132,11 @@ impl TranslationPadTask {
// - 1 * GRANULARITY: extra margin to account for additional overheads. // - 1 * GRANULARITY: extra margin to account for additional overheads.
if item.pts + self.our_latency.saturating_sub(3 * GRANULARITY) < now - start_time { if item.pts + self.our_latency.saturating_sub(3 * GRANULARITY) < now - start_time {
/* Safe unwrap, we know we have an item */ /* Safe unwrap, we know we have an item */
let TranslatedItem { let OutputItem {
pts: item_pts, pts: item_pts,
mut duration, mut duration,
content, content,
} = self.translated_items.pop_front().unwrap(); } = self.output_items.pop_front().unwrap();
let mut pts = start_time + item_pts; let mut pts = start_time + item_pts;
@ -1173,7 +1208,7 @@ impl TranslationPadTask {
if self.send_eos if self.send_eos
&& self.pending_translations == 0 && self.pending_translations == 0
&& self.translated_items.is_empty() && self.output_items.is_empty()
&& self.translate_queue.is_empty() && self.translate_queue.is_empty()
{ {
/* We're EOS, we can pause and exit early */ /* We're EOS, we can pause and exit early */