elevenlabssynthesizer: fix running time checks

The deadline for pushing a buffer out of the element is not its running
time, but its running time added to the upstream latency + the element
latency.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2348>
This commit is contained in:
Mathieu Duponchelle 2025-07-04 19:42:52 +02:00 committed by GStreamer Marge Bot
parent 6c1b331129
commit 464d8b6fbb

View file

@ -121,7 +121,8 @@ struct State {
send_abort_handle: Option<AbortHandle>,
previous_request_ids: VecDeque<String>,
outcaps: Option<gst::Caps>,
upstream_is_live: Option<bool>,
// (live, min, max)
upstream_latency: Option<(bool, gst::ClockTime, Option<gst::ClockTime>)>,
}
impl Default for State {
@ -132,7 +133,7 @@ impl Default for State {
send_abort_handle: None,
previous_request_ids: VecDeque::new(),
outcaps: None,
upstream_is_live: None,
upstream_latency: None,
}
}
}
@ -169,9 +170,9 @@ async fn send_first_request(
}
impl Synthesizer {
fn upstream_is_live(&self, state: &mut State) -> bool {
if let Some(upstream_is_live) = state.upstream_is_live {
return upstream_is_live;
fn upstream_latency(&self) -> Option<(bool, gst::ClockTime, Option<gst::ClockTime>)> {
if let Some(latency) = self.state.lock().unwrap().upstream_latency {
return Some(latency);
}
let mut peer_query = gst::query::Latency::new();
@ -179,16 +180,20 @@ impl Synthesizer {
let ret = self.sinkpad.peer_query(&mut peer_query);
if ret {
let (is_live, _, _) = peer_query.result();
gst::info!(CAT, imp = self, "queried upstream liveness: {is_live}");
let upstream_latency = peer_query.result();
gst::info!(
CAT,
imp = self,
"queried upstream latency: {upstream_latency:?}"
);
state.upstream_is_live = Some(is_live);
self.state.lock().unwrap().upstream_latency = Some(upstream_latency);
is_live
Some(upstream_latency)
} else {
gst::trace!(CAT, imp = self, "could not query upstream latency");
false
None
}
}
@ -271,7 +276,7 @@ impl Synthesizer {
mut pts: gst::ClockTime,
input_duration: gst::ClockTime,
) -> Result<Option<gst::Buffer>, Error> {
let (voice_id, model_id, language_code, retry_with_speed) = {
let (voice_id, model_id, language_code, retry_with_speed, our_latency) = {
let settings = self.settings.lock().unwrap();
(
@ -279,11 +284,14 @@ impl Synthesizer {
settings.model_id.clone(),
settings.language_code.as_ref().cloned(),
settings.retry_with_speed,
settings.latency,
)
};
let (client, previous_request_ids, out_info, out_segment, upstream_is_live) = {
let mut state = self.state.lock().unwrap();
let upstream_latency = self.upstream_latency();
let (client, previous_request_ids, out_info, out_segment) = {
let state = self.state.lock().unwrap();
let Some(client) = state.client.as_ref().cloned() else {
return Ok(None);
@ -295,7 +303,6 @@ impl Synthesizer {
gst_audio::AudioInfo::from_caps(state.outcaps.as_ref().expect("negotiated"))
.unwrap(),
state.out_segment.clone(),
self.upstream_is_live(&mut state),
)
};
@ -437,20 +444,26 @@ impl Synthesizer {
}
}
if upstream_is_live {
let current_rtime = self
.obj()
.current_running_time()
.expect("upstream is live and should have provided a clock");
if let Some(upstream_latency) = upstream_latency {
let (upstream_live, upstream_min, _) = upstream_latency;
if buffer_rtime < current_rtime {
let delta = current_rtime - buffer_rtime;
gst::warning!(
CAT,
"received running time {buffer_rtime} < current rtime {current_rtime}, shifting forward by {delta}, consider increasing latency"
);
if upstream_live {
let current_rtime = self
.obj()
.current_running_time()
.expect("upstream is live and should have provided a clock");
pts += delta;
let deadline = buffer_rtime + upstream_min + our_latency;
if deadline < current_rtime {
let delta = current_rtime - deadline;
gst::warning!(
CAT,
"received running time {buffer_rtime} < current rtime {current_rtime}, shifting forward by {delta}, consider increasing latency"
);
pts += delta;
}
}
}