text: migrate to new ClockTime design

This commit is contained in:
François Laignel 2021-05-26 19:03:09 +02:00
parent 88dfd97df6
commit b8ad30610b
7 changed files with 124 additions and 106 deletions

View file

@ -30,6 +30,7 @@ git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs"
package="gstreamer-check" package="gstreamer-check"
[features] [features]
default = ["gst/ser_de"]
# GStreamer 1.14 is required for static linking # GStreamer 1.14 is required for static linking
static = ["gst/v1_14"] static = ["default", "gst/v1_14"]

View file

@ -32,8 +32,8 @@ enum Line<'a> {
format: String, format: String,
}, },
Buffer { Buffer {
pts: i64, pts: Option<gst::ClockTime>,
duration: i64, duration: Option<gst::ClockTime>,
#[serde(borrow)] #[serde(borrow)]
data: &'a serde_json::value::RawValue, data: &'a serde_json::value::RawValue,
}, },
@ -49,8 +49,8 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
#[derive(Debug)] #[derive(Debug)]
struct State { struct State {
start_ts: gst::ClockTime, start_ts: Option<gst::ClockTime>,
end_ts: gst::ClockTime, end_ts: Option<gst::ClockTime>,
current_line: String, current_line: String,
format: Option<String>, format: Option<String>,
} }
@ -58,8 +58,8 @@ struct State {
impl Default for State { impl Default for State {
fn default() -> Self { fn default() -> Self {
Self { Self {
start_ts: gst::CLOCK_TIME_NONE, start_ts: None,
end_ts: gst::CLOCK_TIME_NONE, end_ts: None,
current_line: "".to_string(), current_line: "".to_string(),
format: None, format: None,
} }
@ -146,8 +146,8 @@ impl JsonGstEnc {
})?; })?;
let line = Line::Buffer { let line = Line::Buffer {
pts: pts.unwrap_or(std::u64::MAX) as i64, pts,
duration: duration.unwrap_or(std::u64::MAX) as i64, duration,
data, data,
}; };

View file

@ -43,7 +43,7 @@ struct PullState {
need_stream_start: bool, need_stream_start: bool,
stream_id: String, stream_id: String,
offset: u64, offset: u64,
duration: gst::ClockTime, duration: Option<gst::ClockTime>,
} }
impl PullState { impl PullState {
@ -52,7 +52,7 @@ impl PullState {
need_stream_start: true, need_stream_start: true,
stream_id: pad.create_stream_id(element, Some("src")).to_string(), stream_id: pad.create_stream_id(element, Some("src")).to_string(),
offset: 0, offset: 0,
duration: gst::CLOCK_TIME_NONE, duration: None,
} }
} }
} }
@ -64,7 +64,7 @@ struct State {
need_caps: bool, need_caps: bool,
format: Option<String>, format: Option<String>,
pending_events: Vec<gst::Event>, pending_events: Vec<gst::Event>,
last_position: gst::ClockTime, last_position: Option<gst::ClockTime>,
segment: gst::FormattedSegment<gst::ClockTime>, segment: gst::FormattedSegment<gst::ClockTime>,
// Pull mode // Pull mode
@ -87,7 +87,7 @@ impl Default for State {
need_caps: true, need_caps: true,
format: None, format: None,
pending_events: Vec::new(), pending_events: Vec::new(),
last_position: gst::CLOCK_TIME_NONE, last_position: None,
segment: gst::FormattedSegment::<gst::ClockTime>::new(), segment: gst::FormattedSegment::<gst::ClockTime>::new(),
pull: None, pull: None,
seeking: false, seeking: false,
@ -106,8 +106,8 @@ enum Line<'a> {
format: String, format: String,
}, },
Buffer { Buffer {
pts: u64, pts: Option<gst::ClockTime>,
duration: u64, duration: Option<gst::ClockTime>,
#[serde(borrow)] #[serde(borrow)]
data: &'a serde_json::value::RawValue, data: &'a serde_json::value::RawValue,
}, },
@ -189,12 +189,12 @@ impl State {
&mut self, &mut self,
_element: &super::JsonGstParse, _element: &super::JsonGstParse,
buffer: &mut gst::buffer::Buffer, buffer: &mut gst::buffer::Buffer,
pts: gst::ClockTime, pts: Option<gst::ClockTime>,
duration: gst::ClockTime, duration: Option<gst::ClockTime>,
) { ) {
let buffer = buffer.get_mut().unwrap(); let buffer = buffer.get_mut().unwrap();
self.last_position = pts + duration; self.last_position = pts.zip(duration).map(|(pts, duration)| pts + duration);
buffer.set_pts(pts); buffer.set_pts(pts);
@ -252,30 +252,34 @@ impl JsonGstParse {
CAT, CAT,
obj: element, obj: element,
"Got buffer with timestamp {} and duration {}", "Got buffer with timestamp {} and duration {}",
pts, pts.display(),
duration duration.display(),
); );
let pts: gst::ClockTime = pts.into();
let duration: gst::ClockTime = duration.into();
if !seeking { if !seeking {
let data = data.to_string().clone(); let data = data.to_string().clone();
let mut events = state.create_events(element); let mut events = state.create_events(element);
let mut buffer = gst::Buffer::from_slice(data); let mut buffer = gst::Buffer::from_slice(data);
if state.last_position < pts { if let Some(last_position) = state.last_position {
events.push(gst::event::Gap::new( if let Some(duration) = pts.map(|pts| pts.checked_sub(last_position)) {
state.last_position, events.push(
pts - state.last_position, gst::event::Gap::builder(last_position)
)); .duration(duration)
.build(),
);
}
} }
state.add_buffer_metadata(element, &mut buffer, pts, duration); state.add_buffer_metadata(element, &mut buffer, pts, duration);
let send_eos = state.segment.stop().is_some() let send_eos = state
&& buffer.pts() + buffer.duration() >= state.segment.stop(); .segment
.stop()
.zip(buffer.pts())
.zip(buffer.duration())
.map_or(false, |((stop, pts), duration)| pts + duration >= stop);
// Drop our state mutex while we push out buffers or events // Drop our state mutex while we push out buffers or events
drop(state); drop(state);
@ -339,10 +343,14 @@ impl JsonGstParse {
fn handle_skipped_line( fn handle_skipped_line(
&self, &self,
element: &super::JsonGstParse, element: &super::JsonGstParse,
pts: gst::ClockTime, pts: impl Into<Option<gst::ClockTime>>,
mut state: MutexGuard<State>, mut state: MutexGuard<State>,
) -> MutexGuard<State> { ) -> MutexGuard<State> {
if pts >= state.segment.start() { if pts
.into()
.zip(state.segment.start())
.map_or(false, |(pts, start)| pts >= start)
{
state.seeking = false; state.seeking = false;
state.discont = true; state.discont = true;
state.replay_last_line = true; state.replay_last_line = true;
@ -445,9 +453,9 @@ impl JsonGstParse {
)); ));
} }
let size = match q.result().try_into().unwrap() { let size = match q.result().try_into().ok().flatten() {
gst::format::Bytes(Some(size)) => size, Some(gst::format::Bytes(size)) => size,
gst::format::Bytes(None) => { None => {
return Err(gst::loggable_error!( return Err(gst::loggable_error!(
CAT, CAT,
"Failed to query upstream duration" "Failed to query upstream duration"
@ -495,7 +503,7 @@ impl JsonGstParse {
data: _data, data: _data,
}) = serde_json::from_slice(&line) }) = serde_json::from_slice(&line)
{ {
last_pts = Some((pts + duration).into()); last_pts = pts.zip(duration).map(|(pts, duration)| pts + duration);
} }
} }
@ -549,16 +557,11 @@ impl JsonGstParse {
if scan_duration { if scan_duration {
match self.scan_duration(element) { match self.scan_duration(element) {
Ok(Some(pts)) => { Ok(pts) => {
let mut state = self.state.lock().unwrap(); let mut state = self.state.lock().unwrap();
let mut pull = state.pull.as_mut().unwrap(); let mut pull = state.pull.as_mut().unwrap();
pull.duration = pts; pull.duration = pts;
} }
Ok(None) => {
let mut state = self.state.lock().unwrap();
let mut pull = state.pull.as_mut().unwrap();
pull.duration = 0.into();
}
Err(err) => { Err(err) => {
err.log(); err.log();
@ -643,7 +646,7 @@ impl JsonGstParse {
state.need_segment = true; state.need_segment = true;
state.need_caps = true; state.need_caps = true;
state.pending_events.clear(); state.pending_events.clear();
state.last_position = 0.into(); state.last_position = None;
state.last_raw_line = [].to_vec(); state.last_raw_line = [].to_vec();
state.format = None; state.format = None;
} }
@ -702,7 +705,7 @@ impl JsonGstParse {
let (rate, flags, start_type, start, stop_type, stop) = event.get(); let (rate, flags, start_type, start, stop_type, stop) = event.get();
let mut start: gst::ClockTime = match start.try_into() { let mut start: Option<gst::ClockTime> = match start.try_into() {
Ok(start) => start, Ok(start) => start,
Err(_) => { Err(_) => {
gst_error!(CAT, obj: element, "seek has invalid format"); gst_error!(CAT, obj: element, "seek has invalid format");
@ -710,7 +713,7 @@ impl JsonGstParse {
} }
}; };
let mut stop: gst::ClockTime = match stop.try_into() { let mut stop: Option<gst::ClockTime> = match stop.try_into() {
Ok(stop) => stop, Ok(stop) => stop,
Err(_) => { Err(_) => {
gst_error!(CAT, obj: element, "seek has invalid format"); gst_error!(CAT, obj: element, "seek has invalid format");
@ -750,11 +753,17 @@ impl JsonGstParse {
let pull = state.pull.as_ref().unwrap(); let pull = state.pull.as_ref().unwrap();
if start_type == gst::SeekType::Set { if start_type == gst::SeekType::Set {
start = start.min(pull.duration).unwrap_or(start); start = start
.zip(pull.duration)
.map(|(start, duration)| start.min(duration))
.or(start);
} }
if stop_type == gst::SeekType::Set { if stop_type == gst::SeekType::Set {
stop = stop.min(pull.duration).unwrap_or(stop); stop = stop
.zip(pull.duration)
.map(|(stop, duration)| stop.min(duration))
.or(stop);
} }
state.seeking = true; state.seeking = true;
@ -817,7 +826,7 @@ impl JsonGstParse {
if let Some(pull) = state.pull.as_ref() { if let Some(pull) = state.pull.as_ref() {
q.set( q.set(
true, true,
gst::GenericFormattedValue::Time(0.into()), gst::GenericFormattedValue::Time(Some(gst::ClockTime::ZERO)),
gst::GenericFormattedValue::Time(pull.duration), gst::GenericFormattedValue::Time(pull.duration),
); );
true true

View file

@ -55,8 +55,8 @@ fn test_enc() {
); );
let buf = h.pull().expect("Couldn't pull buffer"); let buf = h.pull().expect("Couldn't pull buffer");
assert_eq!(buf.pts(), 0.into()); assert_eq!(buf.pts(), Some(gst::ClockTime::ZERO));
assert_eq!(buf.duration(), 2 * gst::SECOND); assert_eq!(buf.duration(), Some(2 * gst::ClockTime::SECOND));
let map = buf.map_readable().expect("Couldn't map buffer readable"); let map = buf.map_readable().expect("Couldn't map buffer readable");
assert_eq!( assert_eq!(
std::str::from_utf8(map.as_ref()), std::str::from_utf8(map.as_ref()),
@ -98,7 +98,7 @@ fn test_parse() {
let buf = h.pull().expect("Couldn't pull buffer"); let buf = h.pull().expect("Couldn't pull buffer");
let map = buf.map_readable().expect("Couldn't map buffer readable"); let map = buf.map_readable().expect("Couldn't map buffer readable");
assert_eq!(buf.pts(), 0.into()); assert_eq!(buf.pts(), Some(gst::ClockTime::ZERO));
assert_eq!(buf.duration(), 2 * gst::SECOND); assert_eq!(buf.duration(), Some(2 * gst::ClockTime::SECOND));
assert_eq!(std::str::from_utf8(map.as_ref()), Ok("{\"foo\":42}")); assert_eq!(std::str::from_utf8(map.as_ref()), Ok("{\"foo\":42}"));
} }

View file

@ -64,8 +64,8 @@ fn test_replace_all() {
let buf = h.pull().expect("Couldn't pull buffer"); let buf = h.pull().expect("Couldn't pull buffer");
assert_eq!(buf.pts(), 0.into()); assert_eq!(buf.pts(), Some(gst::ClockTime::ZERO));
assert_eq!(buf.duration(), 2 * gst::SECOND); assert_eq!(buf.duration(), Some(2 * gst::ClockTime::SECOND));
let map = buf.map_readable().expect("Couldn't map buffer readable"); let map = buf.map_readable().expect("Couldn't map buffer readable");

View file

@ -15,6 +15,7 @@
// Free Software Foundation, Inc., 51 Franklin Street, Suite 500, // Free Software Foundation, Inc., 51 Franklin Street, Suite 500,
// Boston, MA 02110-1335, USA. // Boston, MA 02110-1335, USA.
use glib::translate::{from_glib, IntoGlib};
use gst::glib; use gst::glib;
use gst::prelude::*; use gst::prelude::*;
use gst::subclass::prelude::*; use gst::subclass::prelude::*;
@ -48,7 +49,7 @@ struct Settings {
dictionary: Option<String>, dictionary: Option<String>,
columns: u32, columns: u32,
lines: u32, lines: u32,
accumulate_time: gst::ClockTime, accumulate_time: Option<gst::ClockTime>,
} }
impl Default for Settings { impl Default for Settings {
@ -57,7 +58,7 @@ impl Default for Settings {
dictionary: DEFAULT_DICTIONARY, dictionary: DEFAULT_DICTIONARY,
columns: DEFAULT_COLUMNS, /* CEA 608 max columns */ columns: DEFAULT_COLUMNS, /* CEA 608 max columns */
lines: DEFAULT_LINES, lines: DEFAULT_LINES,
accumulate_time: gst::CLOCK_TIME_NONE, accumulate_time: None,
} }
} }
} }
@ -66,8 +67,8 @@ struct State {
options: Option<textwrap::Options<'static, Box<dyn textwrap::WordSplitter + Send>>>, options: Option<textwrap::Options<'static, Box<dyn textwrap::WordSplitter + Send>>>,
current_text: String, current_text: String,
start_ts: gst::ClockTime, start_ts: Option<gst::ClockTime>,
end_ts: gst::ClockTime, end_ts: Option<gst::ClockTime>,
} }
impl Default for State { impl Default for State {
@ -76,8 +77,8 @@ impl Default for State {
options: None, options: None,
current_text: "".to_string(), current_text: "".to_string(),
start_ts: gst::CLOCK_TIME_NONE, start_ts: None,
end_ts: gst::CLOCK_TIME_NONE, end_ts: None,
} }
} }
} }
@ -145,25 +146,18 @@ impl TextWrap {
) -> Result<gst::FlowSuccess, gst::FlowError> { ) -> Result<gst::FlowSuccess, gst::FlowError> {
self.update_wrapper(element); self.update_wrapper(element);
let mut pts: gst::ClockTime = buffer let mut pts = buffer.pts().ok_or_else(|| {
.pts() gst_error!(CAT, obj: element, "Need timestamped buffers");
.ok_or_else(|| { gst::FlowError::Error
gst_error!(CAT, obj: element, "Need timestamped buffers"); })?;
gst::FlowError::Error
})?
.into();
let duration: gst::ClockTime = buffer let duration = buffer.duration().ok_or_else(|| {
.duration() gst_error!(CAT, obj: element, "Need buffers with duration");
.ok_or_else(|| { gst::FlowError::Error
gst_error!(CAT, obj: element, "Need buffers with duration"); })?;
gst::FlowError::Error
})?
.into();
let data = buffer.map_readable().map_err(|_| { let data = buffer.map_readable().map_err(|_| {
gst_error!(CAT, obj: element, "Can't map buffer readable"); gst_error!(CAT, obj: element, "Can't map buffer readable");
gst::FlowError::Error gst::FlowError::Error
})?; })?;
@ -180,19 +174,30 @@ impl TextWrap {
let mut bufferlist = gst::BufferList::new(); let mut bufferlist = gst::BufferList::new();
let n_lines = std::cmp::max(self.settings.lock().unwrap().lines, 1); let n_lines = std::cmp::max(self.settings.lock().unwrap().lines, 1);
if state.start_ts.is_some() && state.start_ts + accumulate_time < buffer.pts() { if state
.start_ts
.zip(accumulate_time)
.map_or(false, |(start_ts, accumulate_time)| {
start_ts + accumulate_time < pts
})
{
let mut buf = gst::Buffer::from_mut_slice( let mut buf = gst::Buffer::from_mut_slice(
mem::replace(&mut state.current_text, String::new()).into_bytes(), mem::replace(&mut state.current_text, String::new()).into_bytes(),
); );
{ {
let buf_mut = buf.get_mut().unwrap(); let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(state.start_ts); buf_mut.set_pts(state.start_ts);
buf_mut.set_duration(state.end_ts - state.start_ts); buf_mut.set_duration(
state
.end_ts
.zip(state.start_ts)
.and_then(|(end_ts, start_ts)| end_ts.checked_sub(start_ts)),
);
} }
bufferlist.get_mut().unwrap().add(buf); bufferlist.get_mut().unwrap().add(buf);
state.start_ts = gst::CLOCK_TIME_NONE; state.start_ts = None;
state.end_ts = gst::CLOCK_TIME_NONE; state.end_ts = None;
} }
let duration_per_word: gst::ClockTime = let duration_per_word: gst::ClockTime =
@ -230,6 +235,10 @@ impl TextWrap {
.collect::<Vec<String>>() .collect::<Vec<String>>()
.join("\n"); .join("\n");
} else { } else {
let duration = state
.end_ts
.zip(state.start_ts)
.and_then(|(end_ts, start_ts)| end_ts.checked_sub(start_ts));
let contents = chunk let contents = chunk
.iter() .iter()
.map(|l| l.to_string()) .map(|l| l.to_string())
@ -240,14 +249,14 @@ impl TextWrap {
obj: element, obj: element,
"Outputting contents {}, ts: {}, duration: {}", "Outputting contents {}, ts: {}, duration: {}",
contents.to_string(), contents.to_string(),
state.start_ts, state.start_ts.display(),
state.end_ts - state.start_ts duration.display(),
); );
let mut buf = gst::Buffer::from_mut_slice(contents.into_bytes()); let mut buf = gst::Buffer::from_mut_slice(contents.into_bytes());
{ {
let buf_mut = buf.get_mut().unwrap(); let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(state.start_ts); buf_mut.set_pts(state.start_ts);
buf_mut.set_duration(state.end_ts - state.start_ts); buf_mut.set_duration(duration);
} }
bufferlist.get_mut().unwrap().add(buf); bufferlist.get_mut().unwrap().add(buf);
state.start_ts = state.end_ts; state.start_ts = state.end_ts;
@ -255,14 +264,14 @@ impl TextWrap {
} }
current_text = trailing; current_text = trailing;
state.end_ts += duration_per_word; state.end_ts = state.end_ts.map(|end_ts| end_ts + duration_per_word);
} }
state.current_text = current_text; state.current_text = current_text;
if state.current_text.is_empty() { if state.current_text.is_empty() {
state.start_ts = gst::CLOCK_TIME_NONE; state.start_ts = None;
state.end_ts = gst::CLOCK_TIME_NONE; state.end_ts = None;
} }
drop(state); drop(state);
@ -362,11 +371,16 @@ impl TextWrap {
{ {
let buf_mut = buf.get_mut().unwrap(); let buf_mut = buf.get_mut().unwrap();
buf_mut.set_pts(state.start_ts); buf_mut.set_pts(state.start_ts);
buf_mut.set_duration(state.end_ts - state.start_ts); buf_mut.set_duration(
state
.end_ts
.zip(state.start_ts)
.and_then(|(end_ts, start_ts)| end_ts.checked_sub(start_ts)),
);
} }
state.start_ts = gst::CLOCK_TIME_NONE; state.start_ts = None;
state.end_ts = gst::CLOCK_TIME_NONE; state.end_ts = None;
drop(state); drop(state);
let _ = self.srcpad.push(buf); let _ = self.srcpad.push(buf);
@ -402,8 +416,7 @@ impl TextWrap {
.lock() .lock()
.unwrap() .unwrap()
.accumulate_time .accumulate_time
.unwrap_or(0) .unwrap_or(gst::ClockTime::ZERO);
.into();
gst_info!( gst_info!(
CAT, CAT,
obj: element, obj: element,
@ -411,7 +424,7 @@ impl TextWrap {
our_latency, our_latency,
min min
); );
q.set(live, our_latency + min, gst::CLOCK_TIME_NONE); q.set(live, our_latency + min, gst::ClockTime::NONE);
} }
ret ret
} }
@ -549,16 +562,14 @@ impl ObjectImpl for TextWrap {
"accumulate-time" => { "accumulate-time" => {
let mut settings = self.settings.lock().unwrap(); let mut settings = self.settings.lock().unwrap();
let old_accumulate_time = settings.accumulate_time; let old_accumulate_time = settings.accumulate_time;
settings.accumulate_time = match value.get().expect("type checked upstream") { settings.accumulate_time =
-1i64 => gst::CLOCK_TIME_NONE, unsafe { from_glib(value.get::<i64>().expect("type checked upstream")) };
time => (time as u64).into(),
};
if settings.accumulate_time != old_accumulate_time { if settings.accumulate_time != old_accumulate_time {
gst_debug!( gst_debug!(
CAT, CAT,
obj: obj, obj: obj,
"Accumulate time changed: {}", "Accumulate time changed: {}",
settings.accumulate_time settings.accumulate_time.display(),
); );
drop(settings); drop(settings);
let _ = obj.post_message(gst::message::Latency::builder().src(obj).build()); let _ = obj.post_message(gst::message::Latency::builder().src(obj).build());
@ -584,10 +595,7 @@ impl ObjectImpl for TextWrap {
} }
"accumulate-time" => { "accumulate-time" => {
let settings = self.settings.lock().unwrap(); let settings = self.settings.lock().unwrap();
match settings.accumulate_time.0 { settings.accumulate_time.into_glib().to_value()
Some(time) => (time as i64).to_value(),
None => (-1i64).to_value(),
}
} }
_ => unimplemented!(), _ => unimplemented!(),
} }

View file

@ -56,8 +56,8 @@ fn test_columns() {
let buf = h.pull().expect("Couldn't pull buffer"); let buf = h.pull().expect("Couldn't pull buffer");
assert_eq!(buf.pts(), 0.into()); assert_eq!(buf.pts(), Some(gst::ClockTime::ZERO));
assert_eq!(buf.duration(), 2 * gst::SECOND); assert_eq!(buf.duration(), Some(2 * gst::ClockTime::SECOND));
let map = buf.map_readable().expect("Couldn't map buffer readable"); let map = buf.map_readable().expect("Couldn't map buffer readable");
@ -95,8 +95,8 @@ fn test_lines() {
let buf = h.pull().expect("Couldn't pull buffer"); let buf = h.pull().expect("Couldn't pull buffer");
assert_eq!(buf.pts(), 0.into()); assert_eq!(buf.pts(), Some(gst::ClockTime::ZERO));
assert_eq!(buf.duration(), gst::SECOND); assert_eq!(buf.duration(), Some(gst::ClockTime::SECOND));
let expected_output = "Split\nthis"; let expected_output = "Split\nthis";
@ -109,8 +109,8 @@ fn test_lines() {
let buf = h.pull().expect("Couldn't pull buffer"); let buf = h.pull().expect("Couldn't pull buffer");
assert_eq!(buf.pts(), gst::SECOND); assert_eq!(buf.pts(), Some(gst::ClockTime::SECOND));
assert_eq!(buf.duration(), gst::SECOND); assert_eq!(buf.duration(), Some(gst::ClockTime::SECOND));
let expected_output = "text\nup"; let expected_output = "text\nup";