textwrap: expose accumulate-time property

In its standard mode, textwrap simply splits up text in chained
buffers into multiple lines / buffers, not keeping any state.

When accumulate-time is specified, multiple input buffers will be
wrapped together, outputting one-line buffers of text once a
sufficient width (specified by the columns property) is reached,
or the interval between two input buffers is greater than
accumulate-time.

This is useful to format the output of an element such as
awstranscribe, which outputs its transcription with one buffer
per word.
This commit is contained in:
Mathieu Duponchelle 2020-12-07 19:44:50 +01:00
parent b062f63ec3
commit cbf1266a8c

View file

@ -20,11 +20,12 @@ use glib::subclass;
use glib::subclass::prelude::*; use glib::subclass::prelude::*;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
use gst::{gst_error, gst_info}; use gst::{gst_error, gst_info, gst_log};
use std::default::Default; use std::default::Default;
use std::fs::File; use std::fs::File;
use std::io; use std::io;
use std::mem;
use std::sync::Mutex; use std::sync::Mutex;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
@ -42,8 +43,9 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
const DEFAULT_DICTIONARY: Option<String> = None; const DEFAULT_DICTIONARY: Option<String> = None;
const DEFAULT_COLUMNS: u32 = 32; /* CEA 608 max columns */ const DEFAULT_COLUMNS: u32 = 32; /* CEA 608 max columns */
const DEFAULT_LINES: u32 = 0; const DEFAULT_LINES: u32 = 0;
const DEFAULT_ACCUMULATE: i64 = -1;
static PROPERTIES: [subclass::Property; 3] = [ static PROPERTIES: [subclass::Property; 4] = [
subclass::Property("dictionary", |name| { subclass::Property("dictionary", |name| {
glib::ParamSpec::string( glib::ParamSpec::string(
name, name,
@ -76,6 +78,17 @@ static PROPERTIES: [subclass::Property; 3] = [
glib::ParamFlags::READWRITE, glib::ParamFlags::READWRITE,
) )
}), }),
subclass::Property("accumulate-time", |name| {
glib::ParamSpec::int64(
name,
"accumulate-time",
"Cut-off time for input text accumulation (-1=do not accumulate)",
-1,
std::i64::MAX,
DEFAULT_ACCUMULATE,
glib::ParamFlags::READWRITE,
)
}),
]; ];
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
@ -83,6 +96,7 @@ struct Settings {
dictionary: Option<String>, dictionary: Option<String>,
columns: u32, columns: u32,
lines: u32, lines: u32,
accumulate_time: gst::ClockTime,
} }
impl Default for Settings { impl Default for Settings {
@ -91,17 +105,28 @@ impl Default for Settings {
dictionary: DEFAULT_DICTIONARY, dictionary: DEFAULT_DICTIONARY,
columns: DEFAULT_COLUMNS, /* CEA 608 max columns */ columns: DEFAULT_COLUMNS, /* CEA 608 max columns */
lines: DEFAULT_LINES, lines: DEFAULT_LINES,
accumulate_time: gst::CLOCK_TIME_NONE,
} }
} }
} }
struct State { struct State {
options: Option<textwrap::Options<'static, Box<dyn textwrap::WordSplitter + Send>>>, options: Option<textwrap::Options<'static, Box<dyn textwrap::WordSplitter + Send>>>,
current_text: String,
start_ts: gst::ClockTime,
end_ts: gst::ClockTime,
} }
impl Default for State { impl Default for State {
fn default() -> Self { fn default() -> Self {
Self { options: None } Self {
options: None,
current_text: "".to_string(),
start_ts: gst::CLOCK_TIME_NONE,
end_ts: gst::CLOCK_TIME_NONE,
}
} }
} }
@ -192,30 +217,128 @@ impl TextWrap {
gst::FlowError::Error gst::FlowError::Error
})?; })?;
let lines = self.settings.lock().unwrap().lines; let accumulate_time = self.settings.lock().unwrap().accumulate_time;
let mut state = self.state.lock().unwrap();
let data = { if accumulate_time.is_some() {
let state = self.state.lock().unwrap();
let options = state
.options
.as_ref()
.expect("We should have a wrapper by now");
textwrap::fill(data, options)
};
// If the lines property was set, we want to split the result into buffers
// of at most N lines. We compute the duration for each of those based on
// the total number of words, and the number of words in each of the split-up
// buffers.
if lines > 0 {
let mut bufferlist = gst::BufferList::new(); let mut bufferlist = gst::BufferList::new();
if state.end_ts.is_some() && state.end_ts + accumulate_time < buffer.get_pts() {
let mut buf = gst::Buffer::from_mut_slice(
mem::replace(&mut state.current_text, String::new()).into_bytes(),
);
{
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(state.start_ts);
buf_mut.set_duration(state.end_ts - state.start_ts);
}
bufferlist.get_mut().unwrap().add(buf);
state.start_ts = gst::CLOCK_TIME_NONE;
state.end_ts = gst::CLOCK_TIME_NONE;
}
let duration_per_word: gst::ClockTime = let duration_per_word: gst::ClockTime =
duration / data.split_whitespace().count() as u64; duration / data.split_whitespace().count() as u64;
for chunk in data.lines().collect::<Vec<&str>>().chunks(lines as usize) { if state.start_ts.is_none() {
let data = chunk.join("\n"); state.start_ts = buffer.get_pts();
let duration: gst::ClockTime = }
duration_per_word * data.split_whitespace().count() as u64;
state.end_ts = buffer.get_pts();
let words = data.split_whitespace();
let mut current_text = state.current_text.to_string();
for word in words {
if !current_text.is_empty() {
current_text.push(' ');
}
current_text.push_str(word);
let options = state
.options
.as_ref()
.expect("We should have a wrapper by now");
let lines = textwrap::wrap(&current_text, options);
let len = lines.len();
let mut trailing = "".to_string();
for (i, line) in lines.iter().enumerate() {
if i + 1 == len {
trailing = line.to_string();
} else {
let mut buf = gst::Buffer::from_mut_slice(line.to_string().into_bytes());
{
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(state.start_ts);
buf_mut.set_duration(state.end_ts - state.start_ts);
}
bufferlist.get_mut().unwrap().add(buf);
state.start_ts = state.end_ts;
}
}
current_text = trailing;
state.end_ts += duration_per_word;
}
state.current_text = current_text;
if state.current_text.is_empty() {
state.start_ts = gst::CLOCK_TIME_NONE;
state.end_ts = gst::CLOCK_TIME_NONE;
}
drop(state);
if bufferlist.is_empty() {
Ok(gst::FlowSuccess::Ok)
} else {
self.srcpad.push_list(bufferlist)
}
} else {
let lines = self.settings.lock().unwrap().lines;
let data = {
let options = state
.options
.as_ref()
.expect("We should have a wrapper by now");
textwrap::fill(data, options)
};
// If the lines property was set, we want to split the result into buffers
// of at most N lines. We compute the duration for each of those based on
// the total number of words, and the number of words in each of the split-up
// buffers.
if lines > 0 {
let mut bufferlist = gst::BufferList::new();
let duration_per_word: gst::ClockTime =
duration / data.split_whitespace().count() as u64;
for chunk in data.lines().collect::<Vec<&str>>().chunks(lines as usize) {
let data = chunk.join("\n");
let duration: gst::ClockTime =
duration_per_word * data.split_whitespace().count() as u64;
let mut buf = gst::Buffer::from_mut_slice(data.into_bytes());
{
let buf = buf.get_mut().unwrap();
buf.set_pts(pts);
buf.set_duration(duration);
pts += duration;
}
bufferlist.get_mut().unwrap().add(buf);
}
drop(state);
self.srcpad.push_list(bufferlist)
} else {
let mut buf = gst::Buffer::from_mut_slice(data.into_bytes()); let mut buf = gst::Buffer::from_mut_slice(data.into_bytes());
{ {
@ -223,24 +346,61 @@ impl TextWrap {
buf.set_pts(pts); buf.set_pts(pts);
buf.set_duration(duration); buf.set_duration(duration);
pts += duration;
} }
bufferlist.get_mut().unwrap().add(buf); drop(state);
self.srcpad.push(buf)
} }
}
}
self.srcpad.push_list(bufferlist) fn sink_event(&self, pad: &gst::Pad, element: &super::TextWrap, event: gst::Event) -> bool {
} else { gst_log!(CAT, obj: pad, "Handling event {:?}", event);
let mut buf = gst::Buffer::from_mut_slice(data.into_bytes());
{ use gst::EventView;
let buf = buf.get_mut().unwrap();
buf.set_pts(pts); match event.view() {
buf.set_duration(duration); EventView::Gap(_) => {
let state = self.state.lock().unwrap();
/* We are currently accumulating text, no need to forward the gap */
if state.start_ts.is_some() {
true
} else {
pad.event_default(Some(element), event)
}
} }
EventView::FlushStart(_) => {
let mut state = self.state.lock().unwrap();
let options = state.options.take();
*state = State::default();
state.options = options;
drop(state);
pad.event_default(Some(element), event)
}
EventView::Eos(_) => {
let mut state = self.state.lock().unwrap();
if !state.current_text.is_empty() {
let mut buf = gst::Buffer::from_mut_slice(
mem::replace(&mut state.current_text, String::new()).into_bytes(),
);
{
let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(state.start_ts);
buf_mut.set_duration(state.end_ts - state.start_ts);
}
self.srcpad.push(buf) state.start_ts = gst::CLOCK_TIME_NONE;
state.end_ts = gst::CLOCK_TIME_NONE;
drop(state);
let _ = self.srcpad.push(buf);
} else {
drop(state);
}
pad.event_default(Some(element), event)
}
_ => pad.event_default(Some(element), event),
} }
} }
} }
@ -264,6 +424,13 @@ impl ObjectSubclass for TextWrap {
|textwrap, element| textwrap.sink_chain(pad, element, buffer), |textwrap, element| textwrap.sink_chain(pad, element, buffer),
) )
}) })
.event_function(|pad, parent, event| {
TextWrap::catch_panic_pad_function(
parent,
|| false,
|textwrap, element| textwrap.sink_event(pad, element, event),
)
})
.flags(gst::PadFlags::PROXY_CAPS | gst::PadFlags::FIXED_CAPS) .flags(gst::PadFlags::PROXY_CAPS | gst::PadFlags::FIXED_CAPS)
.build(); .build();
@ -344,6 +511,13 @@ impl ObjectImpl for TextWrap {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
settings.lines = value.get_some().expect("type checked upstream"); settings.lines = value.get_some().expect("type checked upstream");
} }
subclass::Property("accumulate-time", ..) => {
let mut settings = self.settings.lock().unwrap();
settings.accumulate_time = match value.get_some().expect("type checked upstream") {
-1i64 => gst::CLOCK_TIME_NONE,
time => (time as u64).into(),
};
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -364,6 +538,13 @@ impl ObjectImpl for TextWrap {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
settings.lines.to_value() settings.lines.to_value()
} }
subclass::Property("accumulate-time", ..) => {
let settings = self.settings.lock().unwrap();
match settings.accumulate_time.0 {
Some(time) => (time as i64).to_value(),
None => (-1i64).to_value(),
}
}
_ => unimplemented!(), _ => unimplemented!(),
} }
} }
@ -377,12 +558,9 @@ impl ElementImpl for TextWrap {
) -> Result<gst::StateChangeSuccess, gst::StateChangeError> { ) -> Result<gst::StateChangeSuccess, gst::StateChangeError> {
gst_info!(CAT, obj: element, "Changing state {:?}", transition); gst_info!(CAT, obj: element, "Changing state {:?}", transition);
match transition { if let gst::StateChange::PausedToReady = transition {
gst::StateChange::PausedToReady => { let mut state = self.state.lock().unwrap();
let mut state = self.state.lock().unwrap(); *state = State::default();
*state = State::default();
}
_ => (),
} }
let success = self.parent_change_state(element, transition)?; let success = self.parent_change_state(element, transition)?;