diff --git a/audio/elevenlabs/src/synthesizer/imp.rs b/audio/elevenlabs/src/synthesizer/imp.rs index 2ef79ec20..9e881952e 100644 --- a/audio/elevenlabs/src/synthesizer/imp.rs +++ b/audio/elevenlabs/src/synthesizer/imp.rs @@ -121,7 +121,8 @@ struct State { send_abort_handle: Option, previous_request_ids: VecDeque, outcaps: Option, - upstream_is_live: Option, + // (live, min, max) + upstream_latency: Option<(bool, gst::ClockTime, Option)>, } impl Default for State { @@ -132,7 +133,7 @@ impl Default for State { send_abort_handle: None, previous_request_ids: VecDeque::new(), outcaps: None, - upstream_is_live: None, + upstream_latency: None, } } } @@ -169,9 +170,9 @@ async fn send_first_request( } impl Synthesizer { - fn upstream_is_live(&self, state: &mut State) -> bool { - if let Some(upstream_is_live) = state.upstream_is_live { - return upstream_is_live; + fn upstream_latency(&self) -> Option<(bool, gst::ClockTime, Option)> { + if let Some(latency) = self.state.lock().unwrap().upstream_latency { + return Some(latency); } let mut peer_query = gst::query::Latency::new(); @@ -179,16 +180,20 @@ impl Synthesizer { let ret = self.sinkpad.peer_query(&mut peer_query); if ret { - let (is_live, _, _) = peer_query.result(); - gst::info!(CAT, imp = self, "queried upstream liveness: {is_live}"); + let upstream_latency = peer_query.result(); + gst::info!( + CAT, + imp = self, + "queried upstream latency: {upstream_latency:?}" + ); - state.upstream_is_live = Some(is_live); + self.state.lock().unwrap().upstream_latency = Some(upstream_latency); - is_live + Some(upstream_latency) } else { gst::trace!(CAT, imp = self, "could not query upstream latency"); - false + None } } @@ -271,7 +276,7 @@ impl Synthesizer { mut pts: gst::ClockTime, input_duration: gst::ClockTime, ) -> Result, Error> { - let (voice_id, model_id, language_code, retry_with_speed) = { + let (voice_id, model_id, language_code, retry_with_speed, our_latency) = { let settings = self.settings.lock().unwrap(); ( @@ -279,11 +284,14 @@ impl Synthesizer { settings.model_id.clone(), settings.language_code.as_ref().cloned(), settings.retry_with_speed, + settings.latency, ) }; - let (client, previous_request_ids, out_info, out_segment, upstream_is_live) = { - let mut state = self.state.lock().unwrap(); + let upstream_latency = self.upstream_latency(); + + let (client, previous_request_ids, out_info, out_segment) = { + let state = self.state.lock().unwrap(); let Some(client) = state.client.as_ref().cloned() else { return Ok(None); @@ -295,7 +303,6 @@ impl Synthesizer { gst_audio::AudioInfo::from_caps(state.outcaps.as_ref().expect("negotiated")) .unwrap(), state.out_segment.clone(), - self.upstream_is_live(&mut state), ) }; @@ -437,20 +444,26 @@ impl Synthesizer { } } - if upstream_is_live { - let current_rtime = self - .obj() - .current_running_time() - .expect("upstream is live and should have provided a clock"); + if let Some(upstream_latency) = upstream_latency { + let (upstream_live, upstream_min, _) = upstream_latency; - if buffer_rtime < current_rtime { - let delta = current_rtime - buffer_rtime; - gst::warning!( - CAT, - "received running time {buffer_rtime} < current rtime {current_rtime}, shifting forward by {delta}, consider increasing latency" - ); + if upstream_live { + let current_rtime = self + .obj() + .current_running_time() + .expect("upstream is live and should have provided a clock"); - pts += delta; + let deadline = buffer_rtime + upstream_min + our_latency; + + if deadline < current_rtime { + let delta = current_rtime - deadline; + gst::warning!( + CAT, + "received running time {buffer_rtime} < current rtime {current_rtime}, shifting forward by {delta}, consider increasing latency" + ); + + pts += delta; + } } }