sccparse: Add support for seeking

Only pull mode can support seeking for now and reverse playback
is not implemented yet. Note that this restriction is the same as that of
mccparse.
This commit is contained in:
Seungha Yang 2020-11-27 21:40:40 +09:00 committed by Sebastian Dröge
parent 8b8380992f
commit 58786fa0b5
3 changed files with 743 additions and 56 deletions

View file

@ -20,8 +20,13 @@ use glib::subclass;
use glib::subclass::prelude::*;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst::{gst_debug, gst_element_error, gst_error, gst_fixme, gst_log, gst_trace, gst_warning};
use gst::{
gst_debug, gst_element_error, gst_error, gst_fixme, gst_info, gst_log, gst_loggable_error,
gst_trace, gst_warning,
};
use std::cmp;
use std::convert::TryInto;
use std::sync::{Mutex, MutexGuard};
use once_cell::sync::Lazy;
@ -38,6 +43,28 @@ static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
)
});
#[derive(Debug)]
struct PullState {
need_stream_start: bool,
stream_id: String,
offset: u64,
duration: gst::ClockTime,
}
impl PullState {
fn new(element: &super::SccParse, pad: &gst::Pad) -> Self {
Self {
need_stream_start: true,
stream_id: pad
.create_stream_id(element, Some("src"))
.unwrap()
.to_string(),
offset: 0,
duration: gst::CLOCK_TIME_NONE,
}
}
}
#[derive(Debug)]
struct State {
reader: LineReader<gst::MappedBuffer<gst::buffer::Readable>>,
@ -47,6 +74,16 @@ struct State {
framerate: Option<gst::Fraction>,
last_position: gst::ClockTime,
last_timecode: Option<gst_video::ValidVideoTimeCode>,
segment: gst::FormattedSegment<gst::ClockTime>,
// Pull mode
pull: Option<PullState>,
// seeking
seeking: bool,
discont: bool,
seek_seqnum: Option<gst::Seqnum>,
need_flush_stop: bool,
}
impl Default for State {
@ -59,10 +96,43 @@ impl Default for State {
framerate: None,
last_position: gst::CLOCK_TIME_NONE,
last_timecode: None,
segment: gst::FormattedSegment::<gst::ClockTime>::new(),
pull: None,
seeking: false,
discont: false,
seek_seqnum: None,
need_flush_stop: false,
}
}
}
fn parse_timecode(
framerate: gst::Fraction,
tc: &TimeCode,
) -> Result<gst_video::ValidVideoTimeCode, gst::FlowError> {
use std::convert::TryFrom;
let timecode = gst_video::VideoTimeCode::new(
framerate,
None,
if tc.drop_frame {
gst_video::VideoTimeCodeFlags::DROP_FRAME
} else {
gst_video::VideoTimeCodeFlags::empty()
},
tc.hours,
tc.minutes,
tc.seconds,
tc.frames,
0,
);
match gst_video::ValidVideoTimeCode::try_from(timecode).map_err(|_| gst::FlowError::Error) {
Ok(timecode) => Ok(timecode),
Err(timecode) => Err(timecode),
}
}
impl State {
#[allow(clippy::type_complexity)]
fn get_line(
@ -84,28 +154,11 @@ impl State {
fn handle_timecode(
&mut self,
tc: TimeCode,
tc: &TimeCode,
framerate: gst::Fraction,
element: &super::SccParse,
) -> Result<gst_video::ValidVideoTimeCode, gst::FlowError> {
use std::convert::TryFrom;
let timecode = gst_video::VideoTimeCode::new(
framerate,
None,
if tc.drop_frame {
gst_video::VideoTimeCodeFlags::DROP_FRAME
} else {
gst_video::VideoTimeCodeFlags::empty()
},
tc.hours,
tc.minutes,
tc.seconds,
tc.frames,
0,
);
match gst_video::ValidVideoTimeCode::try_from(timecode) {
match parse_timecode(framerate, &tc) {
Ok(timecode) => Ok(timecode),
Err(timecode) => {
let last_timecode =
@ -176,6 +229,61 @@ impl State {
.unwrap_or(gst::CLOCK_TIME_NONE),
);
}
fn create_events(
&mut self,
element: &super::SccParse,
framerate: Option<gst::Fraction>,
) -> Vec<gst::Event> {
let mut events = Vec::new();
if self.need_flush_stop {
let mut b = gst::event::FlushStop::builder(true);
if let Some(seek_seqnum) = self.seek_seqnum {
b = b.seqnum(seek_seqnum);
}
events.push(b.build());
self.need_flush_stop = false;
}
if let Some(pull) = &mut self.pull {
if pull.need_stream_start {
events.push(gst::event::StreamStart::new(&pull.stream_id));
pull.need_stream_start = false;
}
}
if let Some(framerate) = framerate {
if self.framerate != Some(framerate) {
self.framerate = Some(framerate);
let caps = gst::Caps::builder("closedcaption/x-cea-608")
.field("format", &"raw")
.field("framerate", &framerate)
.build();
self.framerate = Some(framerate);
events.push(gst::event::Caps::new(&caps));
gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps);
}
}
if self.need_segment {
let mut b = gst::event::Segment::builder(&self.segment);
if let Some(seek_seqnum) = self.seek_seqnum {
b = b.seqnum(seek_seqnum);
}
events.push(b.build());
self.need_segment = false;
}
events.extend(self.pending_events.drain(..));
events
}
}
pub struct SccParse {
@ -228,7 +336,12 @@ impl SccParse {
break Err(gst::FlowError::Error);
}
Ok(None) => break Ok(gst::FlowSuccess::Ok),
Ok(None) => {
if drain && state.pull.is_some() {
break Err(gst::FlowError::Eos);
}
break Ok(gst::FlowSuccess::Ok);
}
}
}
}
@ -256,27 +369,52 @@ impl SccParse {
gst::Fraction::new(30, 1)
};
let mut events = Vec::new();
let mut timecode = state.handle_timecode(&tc, framerate, element)?;
let start_time = gst::ClockTime::from(timecode.nsec_since_daily_jam());
let segment_start = state.segment.get_start();
let clip_buffers = if state.seeking {
// If we are in the middle of seeking, check whether this line
// contains start frame, and if so, unset seeking flag
let num_bufs = (data.len() / 2) as i64;
let mut end_timecode = parse_timecode(framerate, &tc).unwrap();
// add one more frame here so that add duration of the last frame
end_timecode.add_frames(num_bufs + 1);
let stop_time = gst::ClockTime::from(end_timecode.nsec_since_daily_jam());
if Some(framerate) != state.framerate {
let caps = gst::Caps::builder("closedcaption/x-cea-608")
.field("format", &"raw")
.field("framerate", &framerate)
.build();
events.push(gst::event::Caps::new(&caps));
state.framerate = Some(framerate);
gst_trace!(
CAT,
obj: element,
"Checking inside of segment, line start {} line stop {} segment start {} num bufs {}",
start_time,
stop_time,
segment_start,
num_bufs,
);
if stop_time > segment_start {
state.seeking = false;
state.discont = true;
state.need_flush_stop = true;
}
if state.need_segment {
let segment = gst::FormattedSegment::<gst::format::Time>::new();
events.push(gst::event::Segment::new(&segment));
state.need_segment = false;
// Still need to scan lines to find the first buffer
if state.seeking {
drop(state);
return Ok(self.state.lock().unwrap());
}
events.extend(state.pending_events.drain(..));
true
} else {
false
};
let mut timecode = state.handle_timecode(tc, framerate, element)?;
let mut buffers = gst::BufferList::new_sized(data.len() / 2);
let mut buffers = if clip_buffers {
gst::BufferList::new()
} else {
gst::BufferList::new_sized(data.len() / 2)
};
let mut send_eos = false;
for d in data.chunks_exact(2) {
let mut buffer = gst::Buffer::with_size(d.len()).unwrap();
{
@ -286,13 +424,38 @@ impl SccParse {
state.add_buffer_metadata(&mut buffer, &timecode, framerate, element);
timecode.increment_frame();
if clip_buffers {
let end_time = buffer.get_pts() + buffer.get_duration();
if end_time < segment_start {
gst_trace!(
CAT,
obj: element,
"Skip segment clipped buffer {:?}",
buffer,
);
continue;
}
}
send_eos = state.segment.get_stop().is_some()
&& buffer.get_pts() + buffer.get_duration() >= state.segment.get_stop();
let buffers = buffers.get_mut().unwrap();
buffers.add(buffer);
// Terminate loop once we found EOS boundary buffer
if send_eos {
break;
}
}
// Update the last_timecode to the current one
state.last_timecode = Some(timecode);
let events = state.create_events(element, Some(framerate));
// Drop our state mutex while we push out buffers or events
drop(state);
@ -306,9 +469,295 @@ impl SccParse {
err
})?;
if send_eos {
return Err(gst::FlowError::Eos);
}
Ok(self.state.lock().unwrap())
}
fn sink_activate(
&self,
pad: &gst::Pad,
element: &super::SccParse,
) -> Result<(), gst::LoggableError> {
let mode = {
let mut query = gst::query::Scheduling::new();
let mut state = self.state.lock().unwrap();
state.pull = None;
if !pad.peer_query(&mut query) {
gst_debug!(CAT, obj: pad, "Scheduling query failed on peer");
gst::PadMode::Push
} else if query
.has_scheduling_mode_with_flags(gst::PadMode::Pull, gst::SchedulingFlags::SEEKABLE)
{
gst_debug!(CAT, obj: pad, "Activating in Pull mode");
state.pull = Some(PullState::new(element, &self.srcpad));
gst::PadMode::Pull
} else {
gst_debug!(CAT, obj: pad, "Activating in Push mode");
gst::PadMode::Push
}
};
pad.activate_mode(mode, true)?;
Ok(())
}
fn start_task(&self, element: &super::SccParse) -> Result<(), gst::LoggableError> {
let element_weak = element.downgrade();
let pad_weak = self.sinkpad.downgrade();
let res = self.sinkpad.start_task(move || {
let element = match element_weak.upgrade() {
Some(element) => element,
None => {
if let Some(pad) = pad_weak.upgrade() {
pad.pause_task().unwrap();
}
return;
}
};
let parse = Self::from_instance(&element);
parse.loop_fn(&element);
});
if res.is_err() {
return Err(gst_loggable_error!(CAT, "Failed to start pad task"));
}
Ok(())
}
fn sink_activatemode(
&self,
_pad: &gst::Pad,
element: &super::SccParse,
mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
if mode == gst::PadMode::Pull {
if active {
self.start_task(element)?;
} else {
let _ = self.sinkpad.stop_task();
}
}
Ok(())
}
fn scan_duration(
&self,
element: &super::SccParse,
) -> Result<Option<gst_video::ValidVideoTimeCode>, gst::LoggableError> {
gst_debug!(CAT, obj: element, "Scanning duration");
/* First let's query the bytes duration upstream */
let mut q = gst::query::Duration::new(gst::Format::Bytes);
if !self.sinkpad.peer_query(&mut q) {
return Err(gst_loggable_error!(
CAT,
"Failed to query upstream duration"
));
}
let size = match q.get_result().try_into().unwrap() {
gst::format::Bytes(Some(size)) => size,
gst::format::Bytes(None) => {
return Err(gst_loggable_error!(
CAT,
"Failed to query upstream duration"
));
}
};
let mut offset = size;
let mut buffers = Vec::new();
let mut last_tc = None;
loop {
let scan_size = cmp::min(offset, 4096);
offset -= scan_size;
match self.sinkpad.pull_range(offset, scan_size as u32) {
Ok(buffer) => {
buffers.push(buffer);
}
Err(flow) => {
return Err(gst_loggable_error!(
CAT,
"Failed to pull buffer while scanning duration: {:?}",
flow
));
}
}
let mut reader = LineReader::new();
let mut parser = SccParser::new_scan_captions();
for buf in buffers.iter().rev() {
let buf = buf
.clone()
.into_mapped_buffer_readable()
.map_err(|_| gst_loggable_error!(CAT, "Failed to map buffer readable"))?;
reader.push(buf);
}
while let Some(line) = reader.get_line_with_drain(true) {
if let Ok(SccLine::Caption(tc, data)) =
parser.parse_line(line).map_err(|err| (line, err))
{
let framerate = if tc.drop_frame {
gst::Fraction::new(30000, 1001)
} else {
gst::Fraction::new(30, 1)
};
if let Ok(mut timecode) = parse_timecode(framerate, &tc) {
/* We're looking for the total duration */
timecode.add_frames((data.len() / 2) as i64 + 1);
last_tc = Some(timecode);
}
}
}
if last_tc.is_some() || offset == 0 {
gst_debug!(
CAT,
obj: element,
"Duration scan done, last_tc: {:?}",
last_tc
);
break (Ok(last_tc));
}
}
}
fn push_eos(&self, element: &super::SccParse) {
let mut state = self.state.lock().unwrap();
if state.seeking {
state.need_flush_stop = true;
}
let mut events = state.create_events(element, None);
let mut eos_event = gst::event::Eos::builder();
if let Some(seek_seqnum) = state.seek_seqnum {
eos_event = eos_event.seqnum(seek_seqnum);
}
events.push(eos_event.build());
// Drop our state mutex while we push out events
drop(state);
for event in events {
gst_debug!(CAT, obj: element, "Pushing event {:?}", event);
self.srcpad.push_event(event);
}
}
fn loop_fn(&self, element: &super::SccParse) {
let mut state = self.state.lock().unwrap();
let State {
ref framerate,
ref mut pull,
..
} = *state;
let mut pull = pull.as_mut().unwrap();
let scan_duration = framerate.is_none() && pull.duration.is_none();
let offset = pull.offset;
pull.offset += 4096;
drop(state);
let buffer = match self.sinkpad.pull_range(offset, 4096) {
Ok(buffer) => Some(buffer),
Err(gst::FlowError::Eos) => None,
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing");
self.sinkpad.pause_task().unwrap();
return;
}
Err(flow) => {
gst_error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow);
gst_element_error!(
element,
gst::StreamError::Failed,
["Streaming stopped, failed to pull buffer"]
);
self.sinkpad.pause_task().unwrap();
return;
}
};
match self.handle_buffer(element, buffer) {
Ok(_) => {
if scan_duration {
match self.scan_duration(element) {
Ok(Some(tc)) => {
let mut state = self.state.lock().unwrap();
let mut pull = state.pull.as_mut().unwrap();
pull.duration = tc.nsec_since_daily_jam().into();
}
Ok(None) => {
let mut state = self.state.lock().unwrap();
let mut pull = state.pull.as_mut().unwrap();
pull.duration = 0.into();
}
Err(err) => {
err.log();
gst_element_error!(
element,
gst::StreamError::Decode,
["Failed to scan duration"]
);
self.sinkpad.pause_task().unwrap();
}
}
}
}
Err(flow) => {
match flow {
gst::FlowError::Flushing => {
gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow);
}
gst::FlowError::Eos => {
self.push_eos(element);
gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow);
}
_ => {
self.push_eos(element);
gst_error!(CAT, obj: element, "Pausing after flow {:?}", flow);
gst_element_error!(
element,
gst::StreamError::Failed,
["Streaming stopped, reason: {:?}", flow]
);
}
}
self.sinkpad.pause_task().unwrap();
}
}
}
fn sink_chain(
&self,
pad: &gst::Pad,
@ -320,6 +769,23 @@ impl SccParse {
self.handle_buffer(element, Some(buffer))
}
fn flush(&self, mut state: MutexGuard<State>) -> MutexGuard<State> {
state.reader.clear();
state.parser.reset();
if let Some(pull) = &mut state.pull {
pull.offset = 0;
}
state.segment = gst::FormattedSegment::<gst::ClockTime>::new();
state.need_segment = true;
state.pending_events.clear();
state.last_position = 0.into();
state.last_timecode = None;
drop(state);
self.state.lock().unwrap()
}
fn sink_event(&self, pad: &gst::Pad, element: &super::SccParse, event: gst::Event) -> bool {
use gst::EventView;
@ -337,13 +803,9 @@ impl SccParse {
true
}
EventView::FlushStop(_) => {
let mut state = self.state.lock().unwrap();
state.reader.clear();
state.parser.reset();
state.need_segment = true;
state.pending_events.clear();
state.last_position = gst::ClockTime::from_seconds(0);
state.last_timecode = None;
let state = self.state.lock().unwrap();
let state = self.flush(state);
drop(state);
pad.event_default(Some(element), event)
}
@ -370,15 +832,105 @@ impl SccParse {
}
}
fn perform_seek(&self, event: &gst::event::Seek, element: &super::SccParse) -> bool {
let mut state = self.state.lock().unwrap();
if state.pull.is_none() {
gst_error!(CAT, obj: element, "seeking is only supported in pull mode");
return false;
}
let (rate, flags, start_type, start, stop_type, stop) = event.get();
let mut start: gst::ClockTime = match start.try_into() {
Ok(start) => start,
Err(_) => {
gst_error!(CAT, obj: element, "seek has invalid format");
return false;
}
};
let mut stop: gst::ClockTime = match stop.try_into() {
Ok(stop) => stop,
Err(_) => {
gst_error!(CAT, obj: element, "seek has invalid format");
return false;
}
};
if !flags.contains(gst::SeekFlags::FLUSH) {
gst_error!(CAT, obj: element, "only flushing seeks are supported");
return false;
}
if start_type == gst::SeekType::End || stop_type == gst::SeekType::End {
gst_error!(CAT, obj: element, "Relative seeks are not supported");
return false;
}
let pull = state.pull.as_ref().unwrap();
if start_type == gst::SeekType::Set {
start = start.min(pull.duration).unwrap_or(start);
}
if stop_type == gst::SeekType::Set {
stop = stop.min(pull.duration).unwrap_or(stop);
}
state.seeking = true;
let seek_seqnum = event.get_seqnum();
state.seek_seqnum = Some(seek_seqnum);
let event = gst::event::FlushStart::builder()
.seqnum(seek_seqnum)
.build();
gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event);
self.sinkpad.push_event(event);
let event = gst::event::FlushStart::builder()
.seqnum(seek_seqnum)
.build();
gst_debug!(CAT, obj: element, "Pushing event {:?}", event);
self.srcpad.push_event(event);
self.sinkpad.pause_task().unwrap();
state = self.flush(state);
let event = gst::event::FlushStop::builder(true)
.seqnum(seek_seqnum)
.build();
/* Drop our state while we push a serialized event upstream */
drop(state);
gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event);
self.sinkpad.push_event(event);
state = self.state.lock().unwrap();
state
.segment
.do_seek(rate, flags, start_type, start, stop_type, stop);
match self.start_task(element) {
Err(error) => {
error.log();
false
}
_ => true,
}
}
fn src_event(&self, pad: &gst::Pad, element: &super::SccParse, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
match event.view() {
EventView::Seek(_) => {
gst_log!(CAT, obj: pad, "Dropping seek event");
false
}
EventView::Seek(e) => self.perform_seek(&e, element),
_ => pad.event_default(Some(element), event),
}
}
@ -395,14 +947,24 @@ impl SccParse {
match query.view_mut() {
QueryView::Seeking(mut q) => {
// We don't support any seeking at all
let state = self.state.lock().unwrap();
let fmt = q.get_format();
if fmt == gst::Format::Time {
if let Some(pull) = state.pull.as_ref() {
q.set(
false,
gst::GenericFormattedValue::Other(fmt, -1),
gst::GenericFormattedValue::Other(fmt, -1),
true,
gst::GenericFormattedValue::Time(0.into()),
gst::GenericFormattedValue::Time(pull.duration),
);
true
} else {
false
}
} else {
false
}
}
QueryView::Position(ref mut q) => {
// For Time answer ourselfs, otherwise forward
@ -414,6 +976,24 @@ impl SccParse {
self.sinkpad.peer_query(query)
}
}
QueryView::Duration(ref mut q) => {
// For Time answer ourselfs, otherwise forward
let state = self.state.lock().unwrap();
if q.get_format() == gst::Format::Time {
if let Some(pull) = state.pull.as_ref() {
if pull.duration.is_some() {
q.set(state.pull.as_ref().unwrap().duration);
true
} else {
false
}
} else {
false
}
} else {
self.sinkpad.peer_query(query)
}
}
_ => pad.query_default(Some(element), query),
}
}
@ -431,6 +1011,25 @@ impl ObjectSubclass for SccParse {
fn with_class(klass: &Self::Class) -> Self {
let templ = klass.get_pad_template("sink").unwrap();
let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink"))
.activate_function(|pad, parent| {
SccParse::catch_panic_pad_function(
parent,
|| Err(gst_loggable_error!(CAT, "Panic activating sink pad")),
|parse, element| parse.sink_activate(pad, element),
)
})
.activatemode_function(|pad, parent, mode, active| {
SccParse::catch_panic_pad_function(
parent,
|| {
Err(gst_loggable_error!(
CAT,
"Panic activating sink pad with mode"
))
},
|parse, element| parse.sink_activatemode(pad, element, mode, active),
)
})
.chain_function(|pad, parent, buffer| {
SccParse::catch_panic_pad_function(
parent,

View file

@ -141,6 +141,12 @@ impl SccParser {
}
}
pub fn new_scan_captions() -> Self {
Self {
state: State::CaptionOrEmpty,
}
}
pub fn reset(&mut self) {
self.state = State::Header;
}

View file

@ -17,10 +17,12 @@
// Boston, MA 02110-1335, USA.
use gst::prelude::*;
use gst::EventView;
use gst_video::{ValidVideoTimeCode, VideoTimeCode};
use pretty_assertions::assert_eq;
use rand::{Rng, SeedableRng};
use std::collections::VecDeque;
use std::path::PathBuf;
fn init() {
use std::sync::Once;
@ -199,3 +201,83 @@ fn test_timecodes() {
.build()
);
}
#[test]
fn test_pull() {
init();
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests/dn2018-1217.scc");
let mut h = gst_check::Harness::new_parse(&format!("filesrc location={:?} ! sccparse", path));
h.play();
/* Let's first pull until EOS */
loop {
let mut done = false;
while h.events_in_queue() != 0 {
let event = h.pull_event();
if let Ok(event) = event {
match event.view() {
EventView::Eos(_) => {
done = true;
break;
}
_ => (),
}
}
}
while h.buffers_in_queue() != 0 {
let _ = h.pull();
}
if done {
break;
}
}
/* Now seek and check that we receive buffers with appropriate PTS */
h.push_upstream_event(gst::event::Seek::new(
1.0,
gst::SeekFlags::FLUSH,
gst::SeekType::Set,
gst::GenericFormattedValue::Time(18 * gst::SECOND),
gst::SeekType::Set,
gst::GenericFormattedValue::Time(19 * gst::SECOND),
));
loop {
let mut done = false;
while h.buffers_in_queue() != 0 {
if let Ok(buffer) = h.pull() {
let pts = buffer.get_pts();
let end_time = pts + buffer.get_duration();
assert!(end_time >= 18 * gst::SECOND && pts < 19 * gst::SECOND);
}
}
while h.events_in_queue() != 0 {
let event = h.pull_event();
if let Ok(event) = event {
match event.view() {
EventView::Eos(_) => {
done = true;
break;
}
_ => (),
}
}
}
if done {
break;
}
}
}