diff --git a/audio/speechmatics/src/transcriber/imp.rs b/audio/speechmatics/src/transcriber/imp.rs index 32d0028e..740ca187 100644 --- a/audio/speechmatics/src/transcriber/imp.rs +++ b/audio/speechmatics/src/transcriber/imp.rs @@ -214,6 +214,7 @@ struct State { last_chained_buffer_rtime: Option, pad_serial: u32, srcpads: BTreeSet, + start_time: Option, } impl State {} @@ -231,6 +232,7 @@ impl Default for State { last_chained_buffer_rtime: gst::ClockTime::NONE, pad_serial: 0, srcpads: BTreeSet::new(), + start_time: None, } } } @@ -254,10 +256,14 @@ impl TranscriberSrcPad { .downcast::() .expect("parent is transcriber"); + let Some((start_time, now)) = transcriber.imp().get_start_time_and_now() else { + // Wait for the clock to be available + return true; + }; + let latency = gst::ClockTime::from_mseconds( transcriber.imp().settings.lock().unwrap().latency_ms as u64, ); - let now = transcriber.current_running_time().unwrap(); /* First, check our pending buffers */ let mut items = vec![]; @@ -268,7 +274,9 @@ impl TranscriberSrcPad { let mut state = self.state.lock().unwrap(); if let Some(ref mut accumulator_inner) = state.accumulator { - if now.saturating_sub(accumulator_inner.start_time) + granularity > latency { + if now.saturating_sub(accumulator_inner.start_time + start_time) + granularity + > latency + { gst::log!(CAT, "Finally draining accumulator"); gst::debug!( CAT, @@ -287,9 +295,30 @@ impl TranscriberSrcPad { state.send_eos && state.buffers.is_empty() && state.accumulator.is_none(); while let Some(buf) = state.buffers.front() { - if now.saturating_sub(buf.pts().unwrap()) + granularity > latency { + if now.saturating_sub(buf.pts().unwrap() + start_time) + granularity > latency { /* Safe unwrap, we know we have an item */ - let buf = state.buffers.pop_front().unwrap(); + let mut buf = state.buffers.pop_front().unwrap(); + { + let buf_mut = buf.make_mut(); + let mut pts = buf_mut.pts().unwrap() + start_time; + let mut duration = buf_mut.duration().unwrap(); + if let Some(position) = state.out_segment.position() { + if pts < position { + gst::debug!( + CAT, + imp = self, + "Adjusting item timing({:?} < {:?})", + pts, + position, + ); + duration = duration.saturating_sub(position - pts); + pts = position; + } + } + + buf_mut.set_pts(pts); + buf_mut.set_duration(duration); + } items.push(buf); } else { break; @@ -415,27 +444,11 @@ impl TranscriberSrcPad { fn enqueue_translation(&self, state: &mut TranscriberSrcPadState, translation: &Translation) { gst::log!(CAT, "Enqueuing {:?}", translation); for item in &translation.results { - let mut start_time = + let start_time = gst::ClockTime::from_nseconds((item.start_time as f64 * 1_000_000_000.0) as u64); - let mut end_time = + let end_time = gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64); - if let Some(position) = state.out_segment.position() { - if start_time < position { - gst::debug!( - CAT, - imp = self, - "Adjusting item timing({:?} < {:?})", - start_time, - position, - ); - start_time = position; - if end_time < start_time { - end_time = start_time; - } - } - } - let mut buf = gst::Buffer::from_mut_slice(item.content.clone().into_bytes()); { @@ -452,28 +465,12 @@ impl TranscriberSrcPad { gst::log!(CAT, "Enqueuing {:?}", transcript); for item in &transcript.results { if let Some(alternative) = item.alternatives.first() { - let mut start_time = gst::ClockTime::from_nseconds( + let start_time = gst::ClockTime::from_nseconds( (item.start_time as f64 * 1_000_000_000.0) as u64, ); - let mut end_time = + let end_time = gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64); - if let Some(position) = state.out_segment.position() { - if start_time < position { - gst::debug!( - CAT, - imp = self, - "Adjusting item timing({:?} < {:?})", - start_time, - position, - ); - start_time = position; - if end_time < start_time { - end_time = start_time; - } - } - } - if let Some(ref mut accumulator_inner) = state.accumulator { if item.type_ == "punctuation" { accumulator_inner.text.push_str(&alternative.content); @@ -814,13 +811,14 @@ impl Transcriber { Ok(_) => { let mut ret = gst::Pad::event_default(pad, Some(&*self.obj()), event); - let state = self.state.lock().unwrap(); + let mut state = self.state.lock().unwrap(); for srcpad in &state.srcpads { if let Err(err) = srcpad.imp().stop_task() { gst::error!(CAT, imp = self, "Failed to stop srcpad task: {}", err); ret = false; } } + state.start_time = None; ret } @@ -1280,6 +1278,22 @@ impl Transcriber { Ok(()) } + + fn get_start_time_and_now(&self) -> Option<(gst::ClockTime, gst::ClockTime)> { + let now = self.obj().current_running_time()?; + + let mut state = self.state.lock().unwrap(); + + if state.start_time.is_none() { + state.start_time = Some(now); + for pad in state.srcpads.iter() { + let mut sstate = pad.imp().state.lock().unwrap(); + sstate.out_segment.set_position(now); + } + } + + Some((state.start_time.unwrap(), now)) + } } // Implementation of gst::ChildProxy virtual methods.