aws/polly: bring in shift forward logic from 11labs

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/2510>
This commit is contained in:
Mathieu Duponchelle 2025-08-28 15:50:38 +02:00 committed by GStreamer Marge Bot
parent 98eec93dd5
commit a162b82b73

View file

@ -73,6 +73,8 @@ struct State {
client: Option<aws_sdk_polly::Client>,
send_abort_handle: Option<AbortHandle>,
in_format: Option<aws_sdk_polly::types::TextType>,
// (live, min, max)
upstream_latency: Option<(bool, gst::ClockTime, Option<gst::ClockTime>)>,
}
impl Default for State {
@ -82,6 +84,7 @@ impl Default for State {
client: None,
send_abort_handle: None,
in_format: None,
upstream_latency: None,
}
}
}
@ -95,6 +98,33 @@ pub struct Polly {
}
impl Polly {
fn upstream_latency(&self) -> Option<(bool, gst::ClockTime, Option<gst::ClockTime>)> {
if let Some(latency) = self.state.lock().unwrap().upstream_latency {
return Some(latency);
}
let mut peer_query = gst::query::Latency::new();
let ret = self.sinkpad.peer_query(&mut peer_query);
if ret {
let upstream_latency = peer_query.result();
gst::info!(
CAT,
imp = self,
"queried upstream latency: {upstream_latency:?}"
);
self.state.lock().unwrap().upstream_latency = Some(upstream_latency);
Some(upstream_latency)
} else {
gst::trace!(CAT, imp = self, "could not query upstream latency");
None
}
}
fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
gst::log!(CAT, obj = pad, "Handling event {event:?}");
@ -164,16 +194,30 @@ impl Polly {
content: String,
mut pts: gst::ClockTime,
input_duration: gst::ClockTime,
) -> Result<gst::Buffer, Error> {
let (client, in_format) = {
) -> Result<Option<gst::Buffer>, Error> {
let (client, in_format, out_segment) = {
let state = self.state.lock().unwrap();
(
state.client.as_ref().expect("connected").clone(),
state.in_format.as_ref().expect("received caps").clone(),
state.out_segment.clone(),
)
};
let Some(buffer_rtime) = out_segment.to_running_time(pts) else {
gst::warning!(
CAT,
imp = self,
"buffer PTS {pts} not in segment {out_segment:?}"
);
return Ok(None);
};
let our_latency = self.settings.lock().unwrap().latency;
let upstream_latency = self.upstream_latency();
gst::debug!(CAT, imp = self, "synthesizing speech from text {content}");
let job = {
@ -262,6 +306,29 @@ impl Polly {
}
}
if let Some(upstream_latency) = upstream_latency {
let (upstream_live, upstream_min, _) = upstream_latency;
if upstream_live {
let current_rtime = self
.obj()
.current_running_time()
.expect("upstream is live and should have provided a clock");
let deadline = buffer_rtime + upstream_min + our_latency;
if deadline < current_rtime {
let delta = current_rtime - deadline;
gst::error!(
CAT,
"received running time {buffer_rtime} + {upstream_min} + {our_latency} < current rtime {current_rtime}, shifting forward by {delta}, consider increasing latency"
);
pts += delta;
}
}
}
let discont = state
.out_segment
.position()
@ -288,7 +355,7 @@ impl Polly {
state.out_segment.set_position(pts + duration);
Ok(buf)
Ok(Some(buf))
}
fn do_send(
@ -296,7 +363,7 @@ impl Polly {
content: String,
pts: gst::ClockTime,
duration: gst::ClockTime,
) -> Result<gst::Buffer, gst::FlowError> {
) -> Result<Option<gst::Buffer>, gst::FlowError> {
self.ensure_connection().map_err(|err| {
gst::element_imp_error!(self, gst::StreamError::Failed, ["Streaming failed: {err}"]);
gst::FlowError::Error
@ -359,7 +426,9 @@ impl Polly {
gst::FlowError::Error
})?;
let mut outbuf = self.do_send(data, pts, duration)?;
let Some(mut outbuf) = self.do_send(data, pts, duration)? else {
return Ok(gst::FlowSuccess::Ok);
};
{
let outbuf_mut = outbuf.get_mut().unwrap();
@ -414,7 +483,9 @@ impl Polly {
let content = list_content.join(" ");
let mut outbuf = self.do_send(content, pts, duration)?;
let Some(mut outbuf) = self.do_send(content, pts, duration)? else {
return Ok(gst::FlowSuccess::Ok);
};
{
let outbuf_mut = outbuf.get_mut().unwrap();