awstranscriber: further decouple output from input

As awstranscriber might in theory push out gap events without
any flow of input data, it needs to send its mandatory events
(stream-start, caps, segment) independently.

In addition, track a start time and use it to offset the 0-based
timestamps returned by AWS in order to output buffers timestamped
in the running-time domain, and perform item timing adjustment
only when dequeuing, instead of when queuing.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/525>
This commit is contained in:
Mathieu Duponchelle 2021-06-22 20:53:49 +02:00
parent 77c59f4f13
commit 9415c50200

View file

@ -19,7 +19,8 @@ use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{
element_error, error_msg, gst_debug, gst_error, gst_info, gst_log, gst_trace, loggable_error,
element_error, error_msg, gst_debug, gst_error, gst_info, gst_log, gst_trace, gst_warning,
loggable_error,
};
use std::default::Default;
@ -36,6 +37,7 @@ use futures::future::{abortable, AbortHandle};
use futures::prelude::*;
use tokio::runtime;
use std::cmp::Ordering;
use std::collections::VecDeque;
use std::pin::Pin;
use std::sync::Mutex;
@ -149,6 +151,8 @@ struct State {
send_eos: bool,
discont: bool,
partial_index: usize,
send_events: bool,
start_time: Option<gst::ClockTime>,
}
impl Default for State {
@ -165,6 +169,8 @@ impl Default for State {
send_eos: false,
discont: true,
partial_index: 0,
send_events: true,
start_time: None,
}
}
}
@ -206,38 +212,59 @@ impl Transcriber {
/* First, check our pending buffers */
let mut items = vec![];
let (latency, now, mut last_position, send_eos, seqnum) = {
let mut state = self.state.lock().unwrap();
// Multiply GRANULARITY by 2 in order to not send buffers that
// are less than GRANULARITY away too late
let latency = self.settings.lock().unwrap().latency - 2 * GRANULARITY;
let now = element.current_running_time();
let send_eos = state.send_eos && state.buffers.is_empty();
while let Some(buf) = state.buffers.front() {
if now
.zip(buf.pts())
.map_or(false, |(now, pts)| now - pts > latency)
{
/* Safe unwrap, we know we have an item */
let buf = state.buffers.pop_front().unwrap();
items.push(buf);
} else {
break;
}
let now = match element.current_running_time() {
Some(now) => now,
None => {
return true;
}
(
latency,
now,
state.out_segment.position(),
send_eos,
state.seqnum,
)
};
let latency = self.settings.lock().unwrap().latency;
let mut state = self.state.lock().unwrap();
if state.start_time.is_none() {
state.start_time = Some(now);
state.out_segment.set_position(now);
}
let start_time = state.start_time.unwrap();
let mut last_position = state.out_segment.position().unwrap();
let send_eos = state.send_eos && state.buffers.is_empty();
while let Some(buf) = state.buffers.front() {
let pts = buf.pts().unwrap();
gst_trace!(
CAT,
obj: element,
"Checking now {} if item is ready for dequeuing, PTS {}, threshold {} vs {}",
now,
pts,
pts + latency.saturating_sub(3 * GRANULARITY),
now - start_time
);
if pts + latency.saturating_sub(3 * GRANULARITY) < now - start_time {
/* Safe unwrap, we know we have an item */
let mut buf = state.buffers.pop_front().unwrap();
{
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(start_time + pts);
}
items.push(buf);
} else {
break;
}
}
let seqnum = state.seqnum;
drop(state);
/* We're EOS, we can pause and exit early */
if send_eos {
let _ = self.srcpad.pause_task();
@ -248,64 +275,80 @@ impl Transcriber {
}
for mut buf in items.drain(..) {
let delta = buf
.pts()
.zip(last_position)
.map(|(pts, last_pos)| pts.checked_sub(last_pos));
if let Some(delta) = delta {
let last_pos = last_position.expect("defined since delta could be computed");
let mut pts = buf.pts().unwrap();
let mut duration = buf.duration().unwrap();
let gap_event = gst::event::Gap::builder(last_pos)
.duration(delta)
.seqnum(seqnum)
.build();
gst_log!(
CAT,
"Pushing gap: {} -> {}",
last_pos,
buf.pts().display()
);
if !self.srcpad.push_event(gap_event) {
return false;
match pts.cmp(&last_position) {
Ordering::Greater => {
let gap_event = gst::event::Gap::builder(last_position)
.duration(pts - last_position)
.seqnum(seqnum)
.build();
gst_log!(CAT, "Pushing gap: {} -> {}", last_position, pts);
if !self.srcpad.push_event(gap_event) {
return false;
}
}
Ordering::Less => {
let delta = last_position - pts;
gst_warning!(
CAT,
obj: element,
"Updating item PTS ({} < {}), consider increasing latency",
pts,
last_position
);
pts = last_position;
duration = duration.saturating_sub(delta);
{
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(pts);
buf_mut.set_duration(duration);
}
}
_ => (),
}
last_position = buf
.pts()
.zip(buf.duration())
.map(|(pts, duration)| pts + duration);
{
let buf = buf.get_mut().unwrap();
buf.set_pts(buf.pts());
}
gst_log!(
CAT,
"Pushing buffer: {} -> {}",
buf.pts().display(),
buf.pts()
.zip(buf.duration())
.map(|(pts, duration)| pts + duration)
.display(),
);
last_position = pts + duration;
gst_debug!(CAT, "Pushing buffer: {} -> {}", pts, pts + duration);
if self.srcpad.push(buf).is_err() {
return false;
}
}
/* next, push a gap if we're lagging behind the target position */
gst_trace!(
CAT,
obj: element,
"Checking now: {} if we need to push a gap, last_position: {}, threshold: {}",
now,
last_position,
last_position + latency.saturating_sub(GRANULARITY)
);
let duration = now
.zip(last_position)
.and_then(|(now, last_position)| now.checked_sub(last_position))
.and_then(|delta| delta.checked_sub(latency));
if let Some(duration) = duration {
let last_pos = last_position.expect("defined since duration could be computed");
let gap_event = gst::event::Gap::builder(last_pos)
if now > last_position + latency.saturating_sub(GRANULARITY) {
let duration = now - last_position - latency.saturating_sub(GRANULARITY);
let gap_event = gst::event::Gap::builder(last_position)
.duration(duration)
.seqnum(seqnum)
.build();
let next_position = last_pos + duration;
gst_log!(CAT, "Pushing gap: {} -> {}", last_pos, next_position,);
last_position = Some(next_position);
gst_log!(
CAT,
"Pushing gap: {} -> {}",
last_position,
last_position + duration
);
last_position += duration;
if !self.srcpad.push_event(gap_event) {
return false;
}
@ -328,9 +371,9 @@ impl Transcriber {
partial: bool,
) {
for item in &alternative.items[state.partial_index..] {
let mut start_time =
let start_time =
gst::ClockTime::from_nseconds((item.start_time as f64 * 1_000_000_000.0) as u64);
let mut end_time =
let end_time =
gst::ClockTime::from_nseconds((item.end_time as f64 * 1_000_000_000.0) as u64);
if !item.stable {
@ -338,7 +381,13 @@ impl Transcriber {
}
/* Should be sent now */
gst_debug!(CAT, obj: element, "Item is ready: {}", item.content);
gst_debug!(
CAT,
obj: element,
"Item is ready for queuing: {}, PTS {}",
item.content,
start_time
);
let mut buf = gst::Buffer::from_mut_slice(item.content.clone().into_bytes());
{
@ -349,28 +398,6 @@ impl Transcriber {
state.discont = false;
}
if state
.out_segment
.position()
.map_or(false, |pos| start_time < pos)
{
let pos = state
.out_segment
.position()
.expect("position checked above");
gst_debug!(
CAT,
obj: element,
"Adjusting item timing({} < {})",
start_time,
pos,
);
start_time = pos;
if end_time < start_time {
end_time = start_time;
}
}
buf.set_pts(start_time);
buf.set_duration(end_time - start_time);
}
@ -390,6 +417,44 @@ impl Transcriber {
element: &super::Transcriber,
receiver: &mut mpsc::Receiver<Message>,
) -> Result<(), gst::ErrorMessage> {
let mut events = {
let mut events = vec![];
let mut state = self.state.lock().unwrap();
if state.send_events {
events.push(
gst::event::StreamStart::builder("transcription")
.seqnum(state.seqnum)
.build(),
);
let caps = gst::Caps::builder("text/x-raw")
.field("format", &"utf8")
.build();
events.push(
gst::event::Caps::builder(&caps)
.seqnum(state.seqnum)
.build(),
);
events.push(
gst::event::Segment::builder(&state.out_segment)
.seqnum(state.seqnum)
.build(),
);
state.send_events = false;
}
events
};
for event in events.drain(..) {
gst_info!(CAT, obj: element, "Sending {:?}", event);
self.srcpad.push_event(event);
}
let future = async move {
let msg = match receiver.next().await {
Some(msg) => msg,
@ -611,15 +676,19 @@ impl Transcriber {
},
EventView::FlushStart(_) => {
gst_info!(CAT, obj: element, "Received flush start, disconnecting");
self.disconnect(element);
let mut ret = pad.event_default(Some(element), event);
match self.srcpad.stop_task() {
Err(err) => {
gst_error!(CAT, obj: element, "Failed to stop srcpad task: {}", err);
self.disconnect(element);
ret = false;
}
Ok(_) => (),
Ok(_) => {
self.disconnect(element);
}
};
ret
@ -652,34 +721,19 @@ impl Transcriber {
Ok(segment) => segment,
};
let event = {
let mut state = self.state.lock().unwrap();
let mut state = self.state.lock().unwrap();
state.out_segment.set_time(segment.time());
state.out_segment.set_position(gst::ClockTime::ZERO);
state.in_segment = segment;
state.seqnum = e.seqnum();
state.in_segment = segment;
state.seqnum = e.seqnum();
gst::event::Segment::builder(&state.out_segment)
.seqnum(state.seqnum)
.build()
};
gst_info!(CAT, "Sending our own segment: {:?}", event);
pad.event_default(Some(element), event)
true
}
EventView::Tag(_) => true,
EventView::Caps(e) => {
gst_info!(CAT, "Received caps {:?}", e);
let caps = gst::Caps::builder("text/x-raw")
.field("format", &"utf8")
.build();
let seqnum = self.state.lock().unwrap().seqnum;
self.srcpad
.push_event(gst::event::Caps::builder(&caps).seqnum(seqnum).build())
true
}
EventView::StreamStart(_) => true,
_ => pad.event_default(Some(element), event),
}
}
@ -773,7 +827,7 @@ impl Transcriber {
}
fn ensure_connection(&self, element: &super::Transcriber) -> Result<(), gst::ErrorMessage> {
let mut state = self.state.lock().unwrap();
let state = self.state.lock().unwrap();
if state.connected {
return Ok(());
@ -838,6 +892,9 @@ impl Transcriber {
},
);
drop(settings);
drop(state);
let url = signed.generate_presigned_url(&creds, &std::time::Duration::from_secs(60), true);
let (ws, _) = {
@ -890,6 +947,8 @@ impl Transcriber {
}
};
let mut state = self.state.lock().unwrap();
let (future, abort_handle) = abortable(future);
state.recv_abort_handle = Some(abort_handle);
@ -1159,16 +1218,12 @@ impl ElementImpl for Transcriber {
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_info!(CAT, obj: element, "Changing state {:?}", transition);
let mut success = self.parent_change_state(element, transition)?;
match transition {
gst::StateChange::PausedToReady => {
self.disconnect(element);
}
_ => (),
}
let mut success = self.parent_change_state(element, transition)?;
match transition {
gst::StateChange::ReadyToPaused => {
success = gst::StateChangeSuccess::NoPreroll;
}