speechmaticstranscriber: store and use a start time

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1869>
This commit is contained in:
Mathieu Duponchelle 2024-10-22 13:27:17 +02:00 committed by GStreamer Marge Bot
parent a81b7f380f
commit dc1d63419e

View file

@ -214,6 +214,7 @@ struct State {
last_chained_buffer_rtime: Option<gst::ClockTime>, last_chained_buffer_rtime: Option<gst::ClockTime>,
pad_serial: u32, pad_serial: u32,
srcpads: BTreeSet<super::TranscriberSrcPad>, srcpads: BTreeSet<super::TranscriberSrcPad>,
start_time: Option<gst::ClockTime>,
} }
impl State {} impl State {}
@ -231,6 +232,7 @@ impl Default for State {
last_chained_buffer_rtime: gst::ClockTime::NONE, last_chained_buffer_rtime: gst::ClockTime::NONE,
pad_serial: 0, pad_serial: 0,
srcpads: BTreeSet::new(), srcpads: BTreeSet::new(),
start_time: None,
} }
} }
} }
@ -254,10 +256,14 @@ impl TranscriberSrcPad {
.downcast::<super::Transcriber>() .downcast::<super::Transcriber>()
.expect("parent is transcriber"); .expect("parent is transcriber");
let Some((start_time, now)) = transcriber.imp().get_start_time_and_now() else {
// Wait for the clock to be available
return true;
};
let latency = gst::ClockTime::from_mseconds( let latency = gst::ClockTime::from_mseconds(
transcriber.imp().settings.lock().unwrap().latency_ms as u64, transcriber.imp().settings.lock().unwrap().latency_ms as u64,
); );
let now = transcriber.current_running_time().unwrap();
/* First, check our pending buffers */ /* First, check our pending buffers */
let mut items = vec![]; let mut items = vec![];
@ -268,7 +274,9 @@ impl TranscriberSrcPad {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
if let Some(ref mut accumulator_inner) = state.accumulator { if let Some(ref mut accumulator_inner) = state.accumulator {
if now.saturating_sub(accumulator_inner.start_time) + granularity > latency { if now.saturating_sub(accumulator_inner.start_time + start_time) + granularity
> latency
{
gst::log!(CAT, "Finally draining accumulator"); gst::log!(CAT, "Finally draining accumulator");
gst::debug!( gst::debug!(
CAT, CAT,
@ -287,9 +295,30 @@ impl TranscriberSrcPad {
state.send_eos && state.buffers.is_empty() && state.accumulator.is_none(); state.send_eos && state.buffers.is_empty() && state.accumulator.is_none();
while let Some(buf) = state.buffers.front() { while let Some(buf) = state.buffers.front() {
if now.saturating_sub(buf.pts().unwrap()) + granularity > latency { if now.saturating_sub(buf.pts().unwrap() + start_time) + granularity > latency {
/* Safe unwrap, we know we have an item */ /* Safe unwrap, we know we have an item */
let buf = state.buffers.pop_front().unwrap(); let mut buf = state.buffers.pop_front().unwrap();
{
let buf_mut = buf.make_mut();
let mut pts = buf_mut.pts().unwrap() + start_time;
let mut duration = buf_mut.duration().unwrap();
if let Some(position) = state.out_segment.position() {
if pts < position {
gst::debug!(
CAT,
imp = self,
"Adjusting item timing({:?} < {:?})",
pts,
position,
);
duration = duration.saturating_sub(position - pts);
pts = position;
}
}
buf_mut.set_pts(pts);
buf_mut.set_duration(duration);
}
items.push(buf); items.push(buf);
} else { } else {
break; break;
@ -415,27 +444,11 @@ impl TranscriberSrcPad {
fn enqueue_translation(&self, state: &mut TranscriberSrcPadState, translation: &Translation) { fn enqueue_translation(&self, state: &mut TranscriberSrcPadState, translation: &Translation) {
gst::log!(CAT, "Enqueuing {:?}", translation); gst::log!(CAT, "Enqueuing {:?}", translation);
for item in &translation.results { for item in &translation.results {
let mut start_time = let start_time =
gst::ClockTime::from_nseconds((item.start_time as f64 * 1_000_000_000.0) as u64); gst::ClockTime::from_nseconds((item.start_time as f64 * 1_000_000_000.0) as u64);
let mut end_time = let end_time =
gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64); gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64);
if let Some(position) = state.out_segment.position() {
if start_time < position {
gst::debug!(
CAT,
imp = self,
"Adjusting item timing({:?} < {:?})",
start_time,
position,
);
start_time = position;
if end_time < start_time {
end_time = start_time;
}
}
}
let mut buf = gst::Buffer::from_mut_slice(item.content.clone().into_bytes()); let mut buf = gst::Buffer::from_mut_slice(item.content.clone().into_bytes());
{ {
@ -452,28 +465,12 @@ impl TranscriberSrcPad {
gst::log!(CAT, "Enqueuing {:?}", transcript); gst::log!(CAT, "Enqueuing {:?}", transcript);
for item in &transcript.results { for item in &transcript.results {
if let Some(alternative) = item.alternatives.first() { if let Some(alternative) = item.alternatives.first() {
let mut start_time = gst::ClockTime::from_nseconds( let start_time = gst::ClockTime::from_nseconds(
(item.start_time as f64 * 1_000_000_000.0) as u64, (item.start_time as f64 * 1_000_000_000.0) as u64,
); );
let mut end_time = let end_time =
gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64); gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64);
if let Some(position) = state.out_segment.position() {
if start_time < position {
gst::debug!(
CAT,
imp = self,
"Adjusting item timing({:?} < {:?})",
start_time,
position,
);
start_time = position;
if end_time < start_time {
end_time = start_time;
}
}
}
if let Some(ref mut accumulator_inner) = state.accumulator { if let Some(ref mut accumulator_inner) = state.accumulator {
if item.type_ == "punctuation" { if item.type_ == "punctuation" {
accumulator_inner.text.push_str(&alternative.content); accumulator_inner.text.push_str(&alternative.content);
@ -814,13 +811,14 @@ impl Transcriber {
Ok(_) => { Ok(_) => {
let mut ret = gst::Pad::event_default(pad, Some(&*self.obj()), event); let mut ret = gst::Pad::event_default(pad, Some(&*self.obj()), event);
let state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
for srcpad in &state.srcpads { for srcpad in &state.srcpads {
if let Err(err) = srcpad.imp().stop_task() { if let Err(err) = srcpad.imp().stop_task() {
gst::error!(CAT, imp = self, "Failed to stop srcpad task: {}", err); gst::error!(CAT, imp = self, "Failed to stop srcpad task: {}", err);
ret = false; ret = false;
} }
} }
state.start_time = None;
ret ret
} }
@ -1280,6 +1278,22 @@ impl Transcriber {
Ok(()) Ok(())
} }
fn get_start_time_and_now(&self) -> Option<(gst::ClockTime, gst::ClockTime)> {
let now = self.obj().current_running_time()?;
let mut state = self.state.lock().unwrap();
if state.start_time.is_none() {
state.start_time = Some(now);
for pad in state.srcpads.iter() {
let mut sstate = pad.imp().state.lock().unwrap();
sstate.out_segment.set_position(now);
}
}
Some((state.start_time.unwrap(), now))
}
} }
// Implementation of gst::ChildProxy virtual methods. // Implementation of gst::ChildProxy virtual methods.