mccparse: implement seeking

This commit is contained in:
Mathieu Duponchelle 2019-03-22 14:08:54 +00:00 committed by Sebastian Dröge
parent 2f3139dea2
commit 50b9654af5
3 changed files with 812 additions and 93 deletions

View file

@ -24,6 +24,7 @@ use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_video::{self, ValidVideoTimeCode};
use std::cmp;
use std::sync::{Mutex, MutexGuard};
use crate::line_reader::LineReader;
@ -45,6 +46,25 @@ enum Format {
Cea608,
}
#[derive(Debug)]
struct PullState {
need_stream_start: bool,
stream_id: String,
offset: u64,
duration: gst::ClockTime,
}
impl PullState {
fn new(element: &gst::Element, pad: &gst::Pad) -> Self {
Self {
need_stream_start: true,
stream_id: pad.create_stream_id(element, "src").unwrap().to_string(),
offset: 0,
duration: gst::CLOCK_TIME_NONE,
}
}
}
#[derive(Debug)]
struct State {
reader: LineReader<gst::MappedBuffer<gst::buffer::Readable>>,
@ -56,6 +76,18 @@ struct State {
last_position: gst::ClockTime,
last_timecode: Option<gst_video::ValidVideoTimeCode>,
timecode_rate: Option<(u8, bool)>,
segment: gst::FormattedSegment<gst::ClockTime>,
// Pull mode
pull: Option<PullState>,
// seeking
seeking: bool,
discont: bool,
seek_seqnum: gst::Seqnum,
last_raw_line: Vec<u8>,
replay_last_line: bool,
need_flush_stop: bool,
}
impl Default for State {
@ -70,10 +102,55 @@ impl Default for State {
last_position: gst::CLOCK_TIME_NONE,
last_timecode: None,
timecode_rate: None,
segment: gst::FormattedSegment::<gst::ClockTime>::new(),
pull: None,
seeking: false,
discont: false,
seek_seqnum: gst::event::SEQNUM_INVALID,
last_raw_line: Vec::new(),
replay_last_line: false,
need_flush_stop: false,
}
}
}
fn parse_timecode(
framerate: gst::Fraction,
drop_frame: bool,
tc: TimeCode,
) -> Result<ValidVideoTimeCode, gst::FlowError> {
let timecode = gst_video::VideoTimeCode::new(
framerate,
None,
if drop_frame {
gst_video::VideoTimeCodeFlags::DROP_FRAME
} else {
gst_video::VideoTimeCodeFlags::empty()
},
tc.hours,
tc.minutes,
tc.seconds,
tc.frames,
0,
);
timecode.try_into().map_err(|_| gst::FlowError::Error)
}
fn parse_timecode_rate(
timecode_rate: Option<(u8, bool)>,
) -> Result<(gst::Fraction, bool), gst::FlowError> {
let (framerate, drop_frame) = match timecode_rate {
Some((rate, false)) => (gst::Fraction::new(rate as i32, 1), false),
Some((rate, true)) => (gst::Fraction::new(rate as i32 * 1000, 1001), true),
None => {
return Err(gst::FlowError::Error);
}
};
Ok((framerate, drop_frame))
}
impl State {
fn get_line(
&mut self,
@ -85,15 +162,24 @@ impl State {
combine::easy::Errors<u8, &[u8], combine::stream::PointerOffset>,
),
> {
let line = match self.reader.get_line_with_drain(drain) {
None => {
return Ok(None);
let line = match self.replay_last_line {
true => {
self.replay_last_line = false;
&self.last_raw_line
}
Some(line) => line,
false => match self.reader.get_line_with_drain(drain) {
None => {
return Ok(None);
}
Some(line) => {
self.last_raw_line = line.to_vec();
line
}
},
};
self.parser
.parse_line(line)
.parse_line(line, !self.seeking)
.map(Option::Some)
.map_err(|err| (line, err))
}
@ -101,26 +187,11 @@ impl State {
fn handle_timecode(
&mut self,
element: &gst::Element,
tc: TimeCode,
framerate: gst::Fraction,
drop_frame: bool,
tc: TimeCode,
) -> Result<ValidVideoTimeCode, gst::FlowError> {
let timecode = gst_video::VideoTimeCode::new(
framerate,
None,
if drop_frame {
gst_video::VideoTimeCodeFlags::DROP_FRAME
} else {
gst_video::VideoTimeCodeFlags::empty()
},
tc.hours,
tc.minutes,
tc.seconds,
tc.frames,
0,
);
match timecode.try_into() {
match parse_timecode(framerate, drop_frame, tc) {
Ok(timecode) => Ok(timecode),
Err(timecode) => {
let last_timecode =
@ -201,6 +272,12 @@ impl State {
self.update_timestamp(element, &timecode);
buffer.set_pts(self.last_position);
if self.discont {
buffer.set_flags(gst::BufferFlags::DISCONT);
self.discont = false;
}
buffer.set_duration(
gst::SECOND
.mul_div_ceil(*framerate.denom() as u64, *framerate.numer() as u64)
@ -211,32 +288,57 @@ impl State {
fn create_events(
&mut self,
element: &gst::Element,
format: Format,
format: Option<Format>,
framerate: &gst::Fraction,
) -> Vec<gst::Event> {
let mut events = Vec::new();
if self.format != Some(format) {
self.format = Some(format);
if self.need_flush_stop {
let mut b = gst::Event::new_flush_stop(true);
let caps = match format {
Format::Cea708Cdp => gst::Caps::builder("closedcaption/x-cea-708")
.field("format", &"cdp")
.field("framerate", framerate)
.build(),
Format::Cea608 => gst::Caps::builder("closedcaption/x-cea-608")
.field("format", &"s334-1a")
.field("framerate", framerate)
.build(),
};
if self.seek_seqnum != gst::event::SEQNUM_INVALID {
b = b.seqnum(self.seek_seqnum);
}
events.push(gst::Event::new_caps(&caps).build());
gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps);
events.push(b.build());
self.need_flush_stop = false;
}
if let Some(pull) = &mut self.pull {
if pull.need_stream_start {
events.push(gst::Event::new_stream_start(&pull.stream_id).build());
pull.need_stream_start = false;
}
}
if let Some(format) = format {
if self.format != Some(format) {
self.format = Some(format);
let caps = match format {
Format::Cea708Cdp => gst::Caps::builder("closedcaption/x-cea-708")
.field("format", &"cdp")
.field("framerate", framerate)
.build(),
Format::Cea608 => gst::Caps::builder("closedcaption/x-cea-608")
.field("format", &"s334-1a")
.field("framerate", framerate)
.build(),
};
events.push(gst::Event::new_caps(&caps).build());
gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps);
}
}
if self.need_segment {
let segment = gst::FormattedSegment::<gst::format::Time>::new();
events.push(gst::Event::new_segment(&segment).build());
let mut b = gst::Event::new_segment(&self.segment);
if self.seek_seqnum != gst::event::SEQNUM_INVALID {
b = b.seqnum(self.seek_seqnum);
}
events.push(b.build());
self.need_segment = false;
}
@ -272,6 +374,26 @@ impl AsMut<[u8]> for OffsetVec {
impl MccParse {
fn set_pad_functions(sinkpad: &gst::Pad, srcpad: &gst::Pad) {
sinkpad.set_activate_function(|pad, parent| {
MccParse::catch_panic_pad_function(
parent,
|| Err(gst_loggable_error!(CAT, "Panic activating sink pad")),
|parse, element| parse.sink_activate(pad, element),
)
});
sinkpad.set_activatemode_function(|pad, parent, mode, active| {
MccParse::catch_panic_pad_function(
parent,
|| {
Err(gst_loggable_error!(
CAT,
"Panic activating sink pad with mode"
))
},
|parse, element| parse.sink_activatemode(pad, element, mode, active),
)
});
sinkpad.set_chain_function(|pad, parent, buffer| {
MccParse::catch_panic_pad_function(
parent,
@ -307,6 +429,7 @@ impl MccParse {
&self,
element: &gst::Element,
buffer: Option<gst::Buffer>,
scan_tc_rate: bool,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
@ -331,8 +454,10 @@ impl MccParse {
loop {
let line = state.get_line(drain);
match line {
Ok(Some(MccLine::Caption(tc, data))) => {
gst_trace!(
Ok(Some(MccLine::Caption(tc, Some(data)))) => {
assert!(!state.seeking);
gst_debug!(
CAT,
obj: element,
"Got caption buffer with timecode {:?} and size {}",
@ -340,6 +465,16 @@ impl MccParse {
data.len()
);
if scan_tc_rate {
gst_element_error!(
element,
gst::StreamError::Decode,
["Found caption line while scanning for timecode rate"]
);
break Err(gst::FlowError::Error);
}
if data.len() < 3 {
gst_debug!(
CAT,
@ -373,6 +508,21 @@ impl MccParse {
state = self.handle_line(element, tc, data, format, state)?;
}
Ok(Some(MccLine::Caption(tc, None))) => {
assert!(state.seeking);
if scan_tc_rate {
gst_element_error!(
element,
gst::StreamError::Decode,
["Found caption line while scanning for timecode rate"]
);
break Err(gst::FlowError::Error);
}
state = self.handle_skipped_line(element, tc, state)?;
}
Ok(Some(MccLine::TimeCodeRate(rate, df))) => {
gst_debug!(
CAT,
@ -382,6 +532,10 @@ impl MccParse {
df
);
state.timecode_rate = Some((rate, df));
if scan_tc_rate {
break Ok(gst::FlowSuccess::Ok);
}
}
Ok(Some(line)) => {
gst_debug!(CAT, obj: element, "Got line '{:?}'", line);
@ -395,11 +549,50 @@ impl MccParse {
break Err(gst::FlowError::Error);
}
Ok(None) => break Ok(gst::FlowSuccess::Ok),
Ok(None) => {
if scan_tc_rate {
gst_element_error!(
element,
gst::StreamError::Decode,
["Found end of input while scanning for timecode rate"]
);
break Err(gst::FlowError::Error);
}
if drain && state.pull.is_some() {
break Err(gst::FlowError::Eos);
}
break Ok(gst::FlowSuccess::Ok);
}
}
}
}
fn handle_skipped_line(
&self,
element: &gst::Element,
tc: TimeCode,
mut state: MutexGuard<State>,
) -> Result<MutexGuard<State>, gst::FlowError> {
let (framerate, drop_frame) = parse_timecode_rate(state.timecode_rate)?;
let timecode = state.handle_timecode(element, framerate, drop_frame, tc)?;
let nsecs = gst::ClockTime::from(timecode.nsec_since_daily_jam());
state.last_timecode = Some(timecode);
if nsecs >= state.segment.get_start() {
state.seeking = false;
state.discont = true;
state.replay_last_line = true;
state.need_flush_stop = true;
}
drop(state);
Ok(self.state.lock().unwrap())
}
fn handle_line(
&self,
element: &gst::Element,
@ -408,22 +601,9 @@ impl MccParse {
format: Format,
mut state: MutexGuard<State>,
) -> Result<MutexGuard<State>, gst::FlowError> {
let (framerate, drop_frame) = match state.timecode_rate {
Some((rate, false)) => (gst::Fraction::new(rate as i32, 1), false),
Some((rate, true)) => (gst::Fraction::new(rate as i32 * 1000, 1001), true),
None => {
gst_element_error!(
element,
gst::StreamError::Decode,
["Got caption before time code rate"]
);
return Err(gst::FlowError::Error);
}
};
let events = state.create_events(element, format, &framerate);
let timecode = state.handle_timecode(element, tc, framerate, drop_frame)?;
let (framerate, drop_frame) = parse_timecode_rate(state.timecode_rate)?;
let events = state.create_events(element, Some(format), &framerate);
let timecode = state.handle_timecode(element, framerate, drop_frame, tc)?;
let len = data[2] as usize;
let mut buffer = gst::Buffer::from_mut_slice(OffsetVec {
@ -437,6 +617,9 @@ impl MccParse {
// Update the last_timecode to the current one
state.last_timecode = Some(timecode);
let send_eos = state.segment.get_stop().is_some()
&& buffer.get_pts() + buffer.get_duration() >= state.segment.get_stop();
// Drop our state mutex while we push out buffers or events
drop(state);
@ -446,13 +629,324 @@ impl MccParse {
}
self.srcpad.push(buffer).map_err(|err| {
gst_error!(CAT, obj: element, "Pushing buffer returned {:?}", err);
if err != gst::FlowError::Flushing {
gst_error!(CAT, obj: element, "Pushing buffer returned {:?}", err);
}
err
})?;
if send_eos {
return Err(gst::FlowError::Eos);
}
Ok(self.state.lock().unwrap())
}
fn sink_activate(
&self,
pad: &gst::Pad,
element: &gst::Element,
) -> Result<(), gst::LoggableError> {
let mode = {
let mut query = gst::Query::new_scheduling();
let mut state = self.state.lock().unwrap();
state.pull = None;
if !pad.peer_query(&mut query) {
gst_debug!(CAT, obj: pad, "Scheduling query failed on peer");
gst::PadMode::Push
} else if query
.has_scheduling_mode_with_flags(gst::PadMode::Pull, gst::SchedulingFlags::SEEKABLE)
{
gst_debug!(CAT, obj: pad, "Activating in Pull mode");
state.pull = Some(PullState::new(element, &self.srcpad));
gst::PadMode::Pull
} else {
gst_debug!(CAT, obj: pad, "Activating in Push mode");
gst::PadMode::Push
}
};
pad.activate_mode(mode, true)?;
Ok(())
}
fn start_task(&self, element: &gst::Element) -> Result<(), gst::LoggableError> {
let element_weak = element.downgrade();
let pad_weak = self.sinkpad.downgrade();
if let Err(_) = self.sinkpad.start_task(move || {
let element = match element_weak.upgrade() {
Some(element) => element,
None => {
if let Some(pad) = pad_weak.upgrade() {
pad.pause_task().unwrap();
}
return;
}
};
let parse = Self::from_instance(&element);
parse.loop_fn(&element);
}) {
return Err(gst_loggable_error!(CAT, "Failed to start pad task"));
}
Ok(())
}
fn sink_activatemode(
&self,
_pad: &gst::Pad,
element: &gst::Element,
mode: gst::PadMode,
active: bool,
) -> Result<(), gst::LoggableError> {
if active {
if mode == gst::PadMode::Pull {
self.start_task(element)?;
}
} else {
if mode == gst::PadMode::Pull {
let _ = self.sinkpad.stop_task();
}
}
Ok(())
}
fn scan_duration(
&self,
element: &gst::Element,
) -> Result<Option<ValidVideoTimeCode>, gst::LoggableError> {
gst_debug!(CAT, obj: element, "Scanning duration");
/* First let's query the bytes duration upstream */
let mut q = gst::query::Query::new_duration(gst::Format::Bytes);
if !self.sinkpad.peer_query(&mut q) {
return Err(gst_loggable_error!(
CAT,
"Failed to query upstream duration"
));
}
let size = match q.get_result().try_into_bytes().unwrap() {
gst::format::Bytes(Some(size)) => size,
gst::format::Bytes(None) => {
return Err(gst_loggable_error!(
CAT,
"Failed to query upstream duration"
));
}
};
let mut offset = size;
let mut buffers = Vec::new();
let mut last_tc = None;
loop {
let scan_size = cmp::min(offset, 4096);
offset -= scan_size;
match self.sinkpad.pull_range(offset, scan_size as u32) {
Ok(buffer) => {
buffers.push(buffer);
}
Err(flow) => {
return Err(gst_loggable_error!(
CAT,
"Failed to pull buffer while scanning duration: {:?}",
flow
));
}
}
let mut reader = LineReader::new();
let mut parser = MccParser::new_scan_captions();
for buf in buffers.iter().rev() {
let buf = buf
.clone()
.into_mapped_buffer_readable()
.map_err(|_| gst_loggable_error!(CAT, "Failed to map buffer readable"))?;
reader.push(buf);
}
loop {
let line = match reader.get_line_with_drain(true) {
Some(line) => line,
None => {
break;
}
};
match parser.parse_line(line, false).map_err(|err| (line, err)) {
Ok(MccLine::Caption(tc, None)) => {
let state = self.state.lock().unwrap();
let (framerate, drop_frame) = parse_timecode_rate(state.timecode_rate)
.map_err(|_| {
gst_loggable_error!(CAT, "Failed to parse timecode rate")
})?;
last_tc = match parse_timecode(framerate, drop_frame, tc) {
Ok(mut timecode) => {
/* We're looking for the total duration */
timecode.increment_frame();
Some(timecode)
}
Err(_) => None,
}
}
_ => { /* We ignore everything else including errors */ }
}
}
if last_tc.is_some() || offset == 0 {
gst_debug!(
CAT,
obj: element,
"Duration scan done, last_tc: {:?}",
last_tc
);
break (Ok(last_tc));
}
}
}
fn push_eos(&self, element: &gst::Element) {
let mut state = self.state.lock().unwrap();
if state.seeking {
state.need_flush_stop = true;
}
match parse_timecode_rate(state.timecode_rate) {
Ok((framerate, _)) => {
let mut events = state.create_events(element, None, &framerate);
let mut eos_event = gst::Event::new_eos();
if state.seek_seqnum != gst::event::SEQNUM_INVALID {
eos_event = eos_event.seqnum(state.seek_seqnum);
}
events.push(eos_event.build());
// Drop our state mutex while we push out events
drop(state);
for event in events {
gst_debug!(CAT, obj: element, "Pushing event {:?}", event);
self.srcpad.push_event(event);
}
}
Err(_) => {
gst_element_error!(
element,
gst::StreamError::Failed,
["Streaming stopped, failed to parse timecode rate"]
);
}
}
}
fn loop_fn(&self, element: &gst::Element) {
let mut state = self.state.lock().unwrap();
let State {
timecode_rate: ref tc_rate,
ref mut pull,
..
} = *state;
let mut pull = pull.as_mut().unwrap();
let scan_tc_rate = tc_rate.is_none() && pull.duration.is_none();
let offset = pull.offset;
pull.offset += 4096;
drop(state);
let buffer = match self.sinkpad.pull_range(offset, 4096) {
Ok(buffer) => Some(buffer),
Err(gst::FlowError::Eos) => None,
Err(gst::FlowError::Flushing) => {
gst_debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing");
self.sinkpad.pause_task().unwrap();
return;
}
Err(flow) => {
gst_error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow);
gst_element_error!(
element,
gst::StreamError::Failed,
["Streaming stopped, failed to pull buffer"]
);
self.sinkpad.pause_task().unwrap();
return;
}
};
match self.handle_buffer(element, buffer, scan_tc_rate) {
Ok(_) => {
let tc_rate = self.state.lock().unwrap().timecode_rate;
if scan_tc_rate && tc_rate.is_some() {
match self.scan_duration(element) {
Ok(Some(tc)) => {
let mut state = self.state.lock().unwrap();
let mut pull = state.pull.as_mut().unwrap();
pull.duration = tc.nsec_since_daily_jam().into();
}
Ok(None) => {
let mut state = self.state.lock().unwrap();
let mut pull = state.pull.as_mut().unwrap();
pull.duration = 0.into();
}
Err(err) => {
err.log();
gst_element_error!(
element,
gst::StreamError::Decode,
["Failed to scan duration"]
);
self.sinkpad.pause_task().unwrap();
}
}
}
}
Err(flow) => {
match flow {
gst::FlowError::Flushing => {
gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow);
}
gst::FlowError::Eos => {
self.push_eos(element);
gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow);
}
_ => {
self.push_eos(element);
gst_error!(CAT, obj: element, "Pausing after flow {:?}", flow);
gst_element_error!(
element,
gst::StreamError::Failed,
["Streaming stopped, reason: {:?}", flow]
);
}
}
self.sinkpad.pause_task().unwrap();
}
}
}
fn sink_chain(
&self,
pad: &gst::Pad,
@ -461,7 +955,27 @@ impl MccParse {
) -> Result<gst::FlowSuccess, gst::FlowError> {
gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer);
self.handle_buffer(element, Some(buffer))
self.handle_buffer(element, Some(buffer), false)
}
fn flush(&self, mut state: MutexGuard<State>) -> MutexGuard<State> {
state.reader.clear();
state.parser.reset();
if let Some(pull) = &mut state.pull {
pull.offset = 0;
}
state.segment = gst::FormattedSegment::<gst::ClockTime>::new();
state.need_segment = true;
state.pending_events.clear();
state.start_position = 0.into();
state.last_position = 0.into();
state.last_timecode = None;
state.timecode_rate = None;
state.last_raw_line = [].to_vec();
drop(state);
self.state.lock().unwrap()
}
fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
@ -481,21 +995,15 @@ impl MccParse {
true
}
EventView::FlushStop(_) => {
let mut state = self.state.lock().unwrap();
state.reader.clear();
state.parser.reset();
state.need_segment = true;
state.pending_events.clear();
state.start_position = gst::ClockTime::from_seconds(0);
state.last_position = gst::ClockTime::from_seconds(0);
state.last_timecode = None;
state.timecode_rate = None;
let state = self.state.lock().unwrap();
let _ = self.flush(state);
pad.event_default(element, event)
}
EventView::Eos(_) => {
gst_log!(CAT, obj: pad, "Draining");
if let Err(err) = self.handle_buffer(element, None) {
if let Err(err) = self.handle_buffer(element, None, false) {
gst_error!(CAT, obj: pad, "Failed to drain parser: {:?}", err);
}
pad.event_default(element, event)
@ -516,15 +1024,104 @@ impl MccParse {
}
}
fn perform_seek(&self, event: &gst::event::Seek, element: &gst::Element) -> bool {
let mut state = self.state.lock().unwrap();
if state.pull.is_none() {
gst_error!(CAT, obj: element, "seeking is only supported in pull mode");
return false;
}
let (rate, flags, start_type, start, stop_type, stop) = event.get();
let mut start = match start.try_into_time() {
Ok(start) => start,
Err(_) => {
gst_error!(CAT, obj: element, "seek has invalid format");
return false;
}
};
let mut stop = match stop.try_into_time() {
Ok(stop) => stop,
Err(_) => {
gst_error!(CAT, obj: element, "seek has invalid format");
return false;
}
};
if !flags.contains(gst::SeekFlags::FLUSH) {
gst_error!(CAT, obj: element, "only flushing seeks are supported");
return false;
}
if start_type == gst::SeekType::End || stop_type == gst::SeekType::End {
gst_error!(CAT, obj: element, "Relative seeks are not supported");
return false;
}
let pull = state.pull.as_ref().unwrap();
if start_type == gst::SeekType::Set && pull.duration.is_some() {
start = cmp::min(start, pull.duration);
}
if stop_type == gst::SeekType::Set && pull.duration.is_some() {
stop = cmp::min(stop, pull.duration);
}
state.seeking = true;
state.seek_seqnum = event.get_seqnum();
let event = gst::Event::new_flush_start()
.seqnum(state.seek_seqnum)
.build();
gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event);
self.sinkpad.push_event(event);
let event = gst::Event::new_flush_start()
.seqnum(state.seek_seqnum)
.build();
gst_debug!(CAT, obj: element, "Pushing event {:?}", event);
self.srcpad.push_event(event);
self.sinkpad.pause_task().unwrap();
state = self.flush(state);
let event = gst::Event::new_flush_stop(true)
.seqnum(state.seek_seqnum)
.build();
/* Drop our state while we push a serialized event upstream */
drop(state);
gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event);
self.sinkpad.push_event(event);
state = self.state.lock().unwrap();
state
.segment
.do_seek(rate, flags, start_type, start, stop_type, stop);
match self.start_task(element) {
Err(error) => {
error.log();
false
}
_ => true,
}
}
fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool {
use gst::EventView;
gst_log!(CAT, obj: pad, "Handling event {:?}", event);
match event.view() {
EventView::Seek(_) => {
gst_log!(CAT, obj: pad, "Dropping seek event");
false
}
EventView::Seek(e) => self.perform_seek(&e, element),
_ => pad.event_default(element, event),
}
}
@ -536,14 +1133,24 @@ impl MccParse {
match query.view_mut() {
QueryView::Seeking(mut q) => {
// We don't support any seeking at all
let state = self.state.lock().unwrap();
let fmt = q.get_format();
q.set(
false,
gst::GenericFormattedValue::Other(fmt, -1),
gst::GenericFormattedValue::Other(fmt, -1),
);
true
if fmt == gst::Format::Time {
if let Some(pull) = state.pull.as_ref() {
q.set(
true,
gst::GenericFormattedValue::Time(0.into()),
gst::GenericFormattedValue::Time(pull.duration),
);
true
} else {
false
}
} else {
false
}
}
QueryView::Position(ref mut q) => {
// For Time answer ourselfs, otherwise forward
@ -555,6 +1162,24 @@ impl MccParse {
self.sinkpad.peer_query(query)
}
}
QueryView::Duration(ref mut q) => {
// For Time answer ourselfs, otherwise forward
let state = self.state.lock().unwrap();
if q.get_format() == gst::Format::Time {
if let Some(pull) = state.pull.as_ref() {
if pull.duration.is_some() {
q.set(state.pull.as_ref().unwrap().duration);
true
} else {
false
}
} else {
false
}
} else {
self.sinkpad.peer_query(query)
}
}
_ => pad.query_default(element, query),
}
}

View file

@ -41,7 +41,7 @@ pub enum MccLine<'a> {
UUID(&'a [u8]),
Metadata(&'a [u8], &'a [u8]),
TimeCodeRate(u8, bool),
Caption(TimeCode, Vec<u8>),
Caption(TimeCode, Option<Vec<u8>>),
}
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
@ -340,7 +340,7 @@ where
}
/// Parser for a MCC caption line in the form `timecode\tpayload`.
fn caption<'a, I: 'a>() -> impl Parser<Input = I, Output = MccLine<'a>>
fn caption<'a, I: 'a>(parse_payload: bool) -> impl Parser<Input = I, Output = MccLine<'a>>
where
I: RangeStream<Item = u8, Range = &'a [u8]>,
I::Error: ParseError<I::Item, I::Range, I::Position>,
@ -353,7 +353,11 @@ where
optional((token(b','), digits())),
)),
token(b'\t'),
mcc_payload(),
if parse_payload {
mcc_payload().map(|v| Some(v)).right()
} else {
skip_many(any()).map(|_| None).left()
},
end_of_line(),
)
.map(|(tc, _, _, value, _)| MccLine::Caption(tc, value))
@ -366,6 +370,12 @@ impl MccParser {
Self { state: State::Init }
}
pub fn new_scan_captions() -> Self {
Self {
state: State::Captions,
}
}
pub fn reset(&mut self) {
self.state = State::Init;
}
@ -373,6 +383,7 @@ impl MccParser {
pub fn parse_line<'a>(
&mut self,
line: &'a [u8],
parse_payload: bool,
) -> Result<MccLine<'a>, combine::easy::Errors<u8, &'a [u8], combine::stream::PointerOffset>>
{
match self.state {
@ -410,7 +421,7 @@ impl MccParser {
v.0
}),
State::Captions => caption()
State::Captions => caption(parse_payload)
.message("while in Captions state")
.easy_parse(line)
.map(|v| v.0),
@ -654,7 +665,7 @@ mod tests {
#[test]
fn test_caption() {
let mut parser = caption();
let mut parser = caption(true);
assert_eq!(
parser.parse(b"00:00:00:00\tT52S524F67ZZ72F4QROO7391UC13FFF74ZZAEB4".as_ref()),
Ok((
@ -666,7 +677,7 @@ mod tests {
frames: 0,
drop_frame: false
},
vec![
Some(vec![
0x61, 0x01, 0x52, 0x96, 0x69, 0x52, 0x4F, 0x67, 0x00, 0x00, 0x72, 0xF4,
0xFC, 0x80, 0x80, 0xFD, 0x80, 0x80, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00,
0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00,
@ -675,7 +686,7 @@ mod tests {
0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00,
0x73, 0x91, 0xE1, 0x00, 0x00, 0xC1, 0x3F, 0xFF, 0x74, 0x00, 0x00, 0xAE,
0xB4
]
])
),
b"".as_ref()
))
@ -692,7 +703,7 @@ mod tests {
frames: 0,
drop_frame: false
},
vec![
Some(vec![
0x61, 0x01, 0x52, 0x96, 0x69, 0x52, 0x4F, 0x67, 0x00, 0x00, 0x72, 0xF4,
0xFC, 0x80, 0x80, 0xFD, 0x80, 0x80, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00,
0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00,
@ -701,7 +712,7 @@ mod tests {
0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00,
0x73, 0x91, 0xE1, 0x00, 0x00, 0xC1, 0x3F, 0xFF, 0x74, 0x00, 0x00, 0xAE,
0xB4
]
])
),
b"".as_ref()
))
@ -718,7 +729,7 @@ mod tests {
frames: 0,
drop_frame: false
},
vec![
Some(vec![
0x61, 0x01, 0x52, 0x96, 0x69, 0x52, 0x4F, 0x67, 0x00, 0x00, 0x72, 0xF4,
0xFC, 0x80, 0x80, 0xFD, 0x80, 0x80, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00,
0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00,
@ -727,7 +738,7 @@ mod tests {
0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00,
0x73, 0x91, 0xE1, 0x00, 0x00, 0xC1, 0x3F, 0xFF, 0x74, 0x00, 0x00, 0xAE,
0xB4
]
])
),
b"".as_ref()
))
@ -749,7 +760,7 @@ mod tests {
reader.push(Vec::from(mcc_file.as_ref()));
while let Some(line) = reader.get_line() {
let res = match parser.parse_line(line) {
let res = match parser.parse_line(line, true) {
Ok(res) => res,
Err(err) => panic!("Couldn't parse line {}: {:?}", line_cnt, err),
};

View file

@ -19,7 +19,9 @@
extern crate pretty_assertions;
use gst::prelude::*;
use gst::EventView;
use rand::{Rng, SeedableRng};
use std::path::PathBuf;
fn init() {
use std::sync::{Once, ONCE_INIT};
@ -127,3 +129,84 @@ fn test_parse() {
.build()
);
}
#[test]
fn test_pull() {
init();
let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
path.push("tests/captions-test_708.mcc");
let mut h = gst_check::Harness::new_parse(&format!("filesrc location={:?} ! mccparse", path));
h.play();
/* Let's first pull until EOS */
loop {
let mut done = false;
while h.events_in_queue() != 0 {
let event = h.pull_event();
if let Some(event) = event {
match event.view() {
EventView::Eos(_) => {
done = true;
break;
}
_ => (),
}
}
}
while h.buffers_in_queue() != 0 {
let _ = h.pull();
}
if done {
break;
}
}
/* Now seek and check that we receive buffers with appropriate PTS */
h.push_upstream_event(
gst::Event::new_seek(
1.0,
gst::SeekFlags::FLUSH,
gst::SeekType::Set,
gst::GenericFormattedValue::Time(gst::SECOND.into()),
gst::SeekType::Set,
gst::GenericFormattedValue::Time((2 * gst::SECOND).into()),
)
.build(),
);
loop {
let mut done = false;
while h.buffers_in_queue() != 0 {
if let Some(buffer) = h.pull() {
let pts = buffer.get_pts();
assert!(pts > gst::SECOND && pts < 2 * gst::SECOND);
}
}
while h.events_in_queue() != 0 {
let event = h.pull_event();
if let Some(event) = event {
match event.view() {
EventView::Eos(_) => {
done = true;
break;
}
_ => (),
}
}
}
if done {
break;
}
}
}