cea608tojson: expose "unbuffered" property

In this mode, cues are output as soon as they are ready for
display, without a duration. This can be useful in live mode,
when downstream is OK with determining the duration after the
fact, through clear=True.

The consequence of this is that the current roll-up window will
be output repetitively, it is up to downstream to deal with that
how it prefers.

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/554>
This commit is contained in:
Mathieu Duponchelle 2021-09-03 02:13:39 +02:00
parent ed90b338f8
commit bc587a09f8

View file

@ -38,6 +38,9 @@ use atomic_refcell::AtomicRefCell;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::Mutex;
const DEFAULT_UNBUFFERED: bool = false;
#[derive(Debug)] #[derive(Debug)]
struct TimestampedLines { struct TimestampedLines {
@ -178,6 +181,19 @@ impl From<Row> for Line {
} }
} }
#[derive(Clone)]
struct Settings {
unbuffered: bool,
}
impl Default for Settings {
fn default() -> Self {
Self {
unbuffered: DEFAULT_UNBUFFERED,
}
}
}
struct State { struct State {
mode: Option<Cea608Mode>, mode: Option<Cea608Mode>,
last_cc_data: Option<u16>, last_cc_data: Option<u16>,
@ -189,6 +205,7 @@ struct State {
clear: Option<bool>, clear: Option<bool>,
cursor: Cursor, cursor: Cursor,
pending_lines: Option<TimestampedLines>, pending_lines: Option<TimestampedLines>,
settings: Settings,
} }
impl Default for State { impl Default for State {
@ -209,6 +226,7 @@ impl Default for State {
underline: false, underline: false,
}, },
pending_lines: None, pending_lines: None,
settings: Settings::default(),
} }
} }
} }
@ -218,6 +236,7 @@ pub struct Cea608ToJson {
sinkpad: gst::Pad, sinkpad: gst::Pad,
state: AtomicRefCell<State>, state: AtomicRefCell<State>,
settings: Mutex<Settings>,
} }
static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| { static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
@ -413,14 +432,14 @@ impl State {
) -> Option<TimestampedLines> { ) -> Option<TimestampedLines> {
if mode.is_rollup() && self.mode == Some(Cea608Mode::PopOn) { if mode.is_rollup() && self.mode == Some(Cea608Mode::PopOn) {
// https://www.law.cornell.edu/cfr/text/47/79.101 (f)(2)(v) // https://www.law.cornell.edu/cfr/text/47/79.101 (f)(2)(v)
let _ = self.drain(element); let _ = self.drain(element, true);
} }
let ret = if Some(mode) != self.mode { let ret = if Some(mode) != self.mode {
if self.mode == Some(Cea608Mode::PopOn) { if self.mode == Some(Cea608Mode::PopOn) {
self.drain_pending(element) self.drain_pending(element)
} else { } else {
self.drain(element) self.drain(element, true)
} }
} else { } else {
None None
@ -443,12 +462,19 @@ impl State {
ret ret
} }
fn drain(&mut self, element: &super::Cea608ToJson) -> Option<TimestampedLines> { fn drain(&mut self, element: &super::Cea608ToJson, flush: bool) -> Option<TimestampedLines> {
gst_log!(CAT, obj: element, "Draining"); gst_log!(CAT, obj: element, "Draining");
let pts = self.first_pts; let pts = if self.settings.unbuffered {
self.current_pts
} else {
self.first_pts
};
let duration = match self.mode { let duration = if self.settings.unbuffered {
self.current_duration
} else {
match self.mode {
Some(Cea608Mode::PopOn) => gst::ClockTime::NONE, Some(Cea608Mode::PopOn) => gst::ClockTime::NONE,
_ => self _ => self
.current_pts .current_pts
@ -456,13 +482,14 @@ impl State {
.map(|(cur_pts, cur_duration)| cur_pts + cur_duration) .map(|(cur_pts, cur_duration)| cur_pts + cur_duration)
.zip(self.first_pts) .zip(self.first_pts)
.and_then(|(cur_end, first_pts)| cur_end.checked_sub(first_pts)), .and_then(|(cur_end, first_pts)| cur_end.checked_sub(first_pts)),
}
}; };
self.first_pts = gst::ClockTime::NONE; self.first_pts = gst::ClockTime::NONE;
let mut lines: Vec<Line> = vec![]; let mut lines: Vec<Line> = vec![];
// Wish BTreeMap had a drain() method if flush {
for (_idx, row) in std::mem::take(&mut self.rows).into_iter() { for (_idx, row) in std::mem::take(&mut self.rows).into_iter() {
if !row.is_empty() { if !row.is_empty() {
let mut line: Line = row.into(); let mut line: Line = row.into();
@ -472,6 +499,15 @@ impl State {
} }
self.rows.clear(); self.rows.clear();
} else {
for row in self.rows.values() {
if !row.is_empty() {
let mut line: Line = row.clone().into();
line.carriage_return = self.carriage_return.take();
lines.push(line);
}
}
}
let clear = self.clear.take(); let clear = self.clear.take();
@ -528,6 +564,37 @@ impl State {
gst_log!(CAT, obj: element, "preamble: {:?}", preamble); gst_log!(CAT, obj: element, "preamble: {:?}", preamble);
let drain_roll_up = self.cursor.row != preamble.row as u32;
// In unbuffered mode, we output the whole roll-up window
// and need to move it when the preamble relocates it
// https://www.law.cornell.edu/cfr/text/47/79.101 (f)(1)(ii)
if self.settings.unbuffered {
if let Some(mode) = self.mode {
if mode.is_rollup() && self.cursor.row != preamble.row as u32 {
let offset = match mode {
Cea608Mode::RollUp2 => 1,
Cea608Mode::RollUp3 => 2,
Cea608Mode::RollUp4 => 3,
_ => unreachable!(),
};
let current_top_row = self.cursor.row.saturating_sub(offset);
let new_row_offset = preamble.row - self.cursor.row as i32;
for row in current_top_row..self.cursor.row {
if let Some(mut row) = self.rows.remove(&row) {
let new_row = row.row as i32 + new_row_offset;
if new_row >= 0 {
row.row = new_row as u32;
self.rows.insert(row.row, row);
}
}
}
}
}
}
self.cursor.row = preamble.row as u32; self.cursor.row = preamble.row as u32;
self.cursor.col = preamble.col as usize; self.cursor.col = preamble.col as usize;
self.cursor.style = preamble.style; self.cursor.style = preamble.style;
@ -535,17 +602,33 @@ impl State {
if let Some(mode) = self.mode { if let Some(mode) = self.mode {
match mode { match mode {
// The relocation is potentially destructive, let us drain
Cea608Mode::RollUp2 Cea608Mode::RollUp2
| Cea608Mode::RollUp3 | Cea608Mode::RollUp3
| Cea608Mode::RollUp4 | Cea608Mode::RollUp4
| Cea608Mode::PaintOn => { | Cea608Mode::PaintOn => {
let ret = self.drain(element); if self.settings.unbuffered {
/* We only need to drain when the roll-up window was relocated */
let ret = if drain_roll_up {
self.drain(element, true)
} else {
None
};
if let std::collections::btree_map::Entry::Vacant(e) =
self.rows.entry(self.cursor.row)
{
e.insert(Row::new(self.cursor.row));
}
ret
// The relocation is potentially destructive, let us drain
} else {
let ret = self.drain(element, true);
self.rows.insert(self.cursor.row, Row::new(self.cursor.row)); self.rows.insert(self.cursor.row, Row::new(self.cursor.row));
ret ret
} }
}
Cea608Mode::PopOn => { Cea608Mode::PopOn => {
let row = self.cursor.row; let row = self.cursor.row;
self.rows.entry(row).or_insert_with(|| Row::new(row)); self.rows.entry(row).or_insert_with(|| Row::new(row));
@ -582,7 +665,7 @@ impl State {
self.drain_pending(element) self.drain_pending(element)
} }
_ => { _ => {
let ret = self.drain(element); let ret = self.drain(element, true);
self.clear = Some(true); self.clear = Some(true);
ret ret
} }
@ -603,8 +686,34 @@ impl State {
if let Some(mode) = self.mode { if let Some(mode) = self.mode {
// https://www.law.cornell.edu/cfr/text/47/79.101 (f)(2)(i) (f)(3)(i) // https://www.law.cornell.edu/cfr/text/47/79.101 (f)(2)(i) (f)(3)(i)
if mode.is_rollup() { if mode.is_rollup() {
let ret = self.drain(element); let ret = if self.settings.unbuffered {
let offset = match mode {
Cea608Mode::RollUp2 => 1,
Cea608Mode::RollUp3 => 2,
Cea608Mode::RollUp4 => 3,
_ => unreachable!(),
};
let top_row = self.cursor.row.saturating_sub(offset);
// https://www.law.cornell.edu/cfr/text/47/79.101 (f)(1)(iii)
self.rows.remove(&top_row);
for row in top_row + 1..self.cursor.row + 1 {
if let Some(mut row) = self.rows.remove(&row) {
row.row -= 1;
self.rows.insert(row.row, row);
}
}
self.rows.insert(self.cursor.row, Row::new(self.cursor.row));
self.drain(element, false)
} else {
let ret = self.drain(element, true);
self.carriage_return = Some(true); self.carriage_return = Some(true);
ret
};
return ret; return ret;
} }
} }
@ -626,8 +735,13 @@ impl State {
// https://www.law.cornell.edu/cfr/text/47/79.101 (f)(2) // https://www.law.cornell.edu/cfr/text/47/79.101 (f)(2)
self.update_mode(element, Cea608Mode::PopOn); self.update_mode(element, Cea608Mode::PopOn);
self.first_pts = self.current_pts; self.first_pts = self.current_pts;
let ret = if self.settings.unbuffered {
self.drain(element, true)
} else {
let ret = self.drain_pending(element); let ret = self.drain_pending(element);
self.pending_lines = self.drain(element); self.pending_lines = self.drain(element, true);
ret
};
return ret; return ret;
} }
ffi::eia608_control_t_eia608_tab_offset_0 ffi::eia608_control_t_eia608_tab_offset_0
@ -714,9 +828,15 @@ impl State {
gst_log!(CAT, obj: element, "control!"); gst_log!(CAT, obj: element, "control!");
return self.decode_control(element, cc_data); return self.decode_control(element, cc_data);
} else if is_basicna(cc_data) || is_specialna(cc_data) || is_westeu(cc_data) { } else if is_basicna(cc_data) || is_specialna(cc_data) || is_westeu(cc_data) {
if let Some(mode) = self.mode {
self.mode?; self.mode?;
gst_log!(CAT, obj: element, "text"); gst_log!(CAT, obj: element, "text");
self.decode_text(element, cc_data); self.decode_text(element, cc_data);
if mode.is_rollup() && self.settings.unbuffered {
return self.drain(element, false);
}
}
} else if is_preamble(cc_data) { } else if is_preamble(cc_data) {
gst_log!(CAT, obj: element, "preamble"); gst_log!(CAT, obj: element, "preamble");
return self.decode_preamble(element, cc_data); return self.decode_preamble(element, cc_data);
@ -799,6 +919,14 @@ impl Cea608ToJson {
if let Some(lines) = state.handle_cc_data(element, pts, duration, cc_data) { if let Some(lines) = state.handle_cc_data(element, pts, duration, cc_data) {
drop(state); drop(state);
self.output(element, lines) self.output(element, lines)
} else if state.settings.unbuffered {
drop(state);
self.srcpad.push_event(
gst::event::Gap::builder(pts.unwrap())
.duration(duration)
.build(),
);
Ok(gst::FlowSuccess::Ok)
} else { } else {
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
} }
@ -818,7 +946,9 @@ impl Cea608ToJson {
} }
EventView::FlushStop(..) => { EventView::FlushStop(..) => {
let mut state = self.state.borrow_mut(); let mut state = self.state.borrow_mut();
let old_settings = state.settings.clone();
*state = State::default(); *state = State::default();
state.settings = old_settings;
drop(state); drop(state);
pad.event_default(Some(element), event) pad.event_default(Some(element), event)
} }
@ -826,7 +956,7 @@ impl Cea608ToJson {
if let Some(lines) = self.state.borrow_mut().drain_pending(element) { if let Some(lines) = self.state.borrow_mut().drain_pending(element) {
let _ = self.output(element, lines); let _ = self.output(element, lines);
} }
if let Some(lines) = self.state.borrow_mut().drain(element) { if let Some(lines) = self.state.borrow_mut().drain(element, true) {
let _ = self.output(element, lines); let _ = self.output(element, lines);
} }
@ -872,6 +1002,7 @@ impl ObjectSubclass for Cea608ToJson {
srcpad, srcpad,
sinkpad, sinkpad,
state: AtomicRefCell::new(State::default()), state: AtomicRefCell::new(State::default()),
settings: Mutex::new(Settings::default()),
} }
} }
} }
@ -883,6 +1014,47 @@ impl ObjectImpl for Cea608ToJson {
obj.add_pad(&self.sinkpad).unwrap(); obj.add_pad(&self.sinkpad).unwrap();
obj.add_pad(&self.srcpad).unwrap(); obj.add_pad(&self.srcpad).unwrap();
} }
fn properties() -> &'static [glib::ParamSpec] {
static PROPERTIES: Lazy<Vec<glib::ParamSpec>> = Lazy::new(|| {
vec![glib::ParamSpec::new_boolean(
"unbuffered",
"Unbuffered",
"Whether captions should be output at display time, \
instead of waiting to determine durations. Useful with live input",
DEFAULT_UNBUFFERED,
glib::ParamFlags::READWRITE | gst::PARAM_FLAG_MUTABLE_READY,
)]
});
PROPERTIES.as_ref()
}
fn set_property(
&self,
_obj: &Self::Type,
_id: usize,
value: &glib::Value,
pspec: &glib::ParamSpec,
) {
match pspec.name() {
"unbuffered" => {
self.settings.lock().unwrap().unbuffered =
value.get().expect("type checked upstream");
}
_ => unimplemented!(),
}
}
fn property(&self, _obj: &Self::Type, _id: usize, pspec: &glib::ParamSpec) -> glib::Value {
match pspec.name() {
"unbuffered" => {
let settings = self.settings.lock().unwrap();
settings.unbuffered.to_value()
}
_ => unimplemented!(),
}
}
} }
impl ElementImpl for Cea608ToJson { impl ElementImpl for Cea608ToJson {
@ -941,6 +1113,7 @@ impl ElementImpl for Cea608ToJson {
gst::StateChange::ReadyToPaused => { gst::StateChange::ReadyToPaused => {
let mut state = self.state.borrow_mut(); let mut state = self.state.borrow_mut();
*state = State::default(); *state = State::default();
state.settings = self.settings.lock().unwrap().clone();
} }
_ => (), _ => (),
} }