diff --git a/gst-plugin-closedcaption/src/mcc_parse.rs b/gst-plugin-closedcaption/src/mcc_parse.rs index 63a30d14..970c9a94 100644 --- a/gst-plugin-closedcaption/src/mcc_parse.rs +++ b/gst-plugin-closedcaption/src/mcc_parse.rs @@ -24,6 +24,7 @@ use gst::prelude::*; use gst::subclass::prelude::*; use gst_video::{self, ValidVideoTimeCode}; +use std::cmp; use std::sync::{Mutex, MutexGuard}; use crate::line_reader::LineReader; @@ -45,6 +46,25 @@ enum Format { Cea608, } +#[derive(Debug)] +struct PullState { + need_stream_start: bool, + stream_id: String, + offset: u64, + duration: gst::ClockTime, +} + +impl PullState { + fn new(element: &gst::Element, pad: &gst::Pad) -> Self { + Self { + need_stream_start: true, + stream_id: pad.create_stream_id(element, "src").unwrap().to_string(), + offset: 0, + duration: gst::CLOCK_TIME_NONE, + } + } +} + #[derive(Debug)] struct State { reader: LineReader>, @@ -56,6 +76,18 @@ struct State { last_position: gst::ClockTime, last_timecode: Option, timecode_rate: Option<(u8, bool)>, + segment: gst::FormattedSegment, + + // Pull mode + pull: Option, + + // seeking + seeking: bool, + discont: bool, + seek_seqnum: gst::Seqnum, + last_raw_line: Vec, + replay_last_line: bool, + need_flush_stop: bool, } impl Default for State { @@ -70,10 +102,55 @@ impl Default for State { last_position: gst::CLOCK_TIME_NONE, last_timecode: None, timecode_rate: None, + segment: gst::FormattedSegment::::new(), + pull: None, + seeking: false, + discont: false, + seek_seqnum: gst::event::SEQNUM_INVALID, + last_raw_line: Vec::new(), + replay_last_line: false, + need_flush_stop: false, } } } +fn parse_timecode( + framerate: gst::Fraction, + drop_frame: bool, + tc: TimeCode, +) -> Result { + let timecode = gst_video::VideoTimeCode::new( + framerate, + None, + if drop_frame { + gst_video::VideoTimeCodeFlags::DROP_FRAME + } else { + gst_video::VideoTimeCodeFlags::empty() + }, + tc.hours, + tc.minutes, + tc.seconds, + tc.frames, + 0, + ); + + timecode.try_into().map_err(|_| gst::FlowError::Error) +} + +fn parse_timecode_rate( + timecode_rate: Option<(u8, bool)>, +) -> Result<(gst::Fraction, bool), gst::FlowError> { + let (framerate, drop_frame) = match timecode_rate { + Some((rate, false)) => (gst::Fraction::new(rate as i32, 1), false), + Some((rate, true)) => (gst::Fraction::new(rate as i32 * 1000, 1001), true), + None => { + return Err(gst::FlowError::Error); + } + }; + + Ok((framerate, drop_frame)) +} + impl State { fn get_line( &mut self, @@ -85,15 +162,24 @@ impl State { combine::easy::Errors, ), > { - let line = match self.reader.get_line_with_drain(drain) { - None => { - return Ok(None); + let line = match self.replay_last_line { + true => { + self.replay_last_line = false; + &self.last_raw_line } - Some(line) => line, + false => match self.reader.get_line_with_drain(drain) { + None => { + return Ok(None); + } + Some(line) => { + self.last_raw_line = line.to_vec(); + line + } + }, }; self.parser - .parse_line(line) + .parse_line(line, !self.seeking) .map(Option::Some) .map_err(|err| (line, err)) } @@ -101,26 +187,11 @@ impl State { fn handle_timecode( &mut self, element: &gst::Element, - tc: TimeCode, framerate: gst::Fraction, drop_frame: bool, + tc: TimeCode, ) -> Result { - let timecode = gst_video::VideoTimeCode::new( - framerate, - None, - if drop_frame { - gst_video::VideoTimeCodeFlags::DROP_FRAME - } else { - gst_video::VideoTimeCodeFlags::empty() - }, - tc.hours, - tc.minutes, - tc.seconds, - tc.frames, - 0, - ); - - match timecode.try_into() { + match parse_timecode(framerate, drop_frame, tc) { Ok(timecode) => Ok(timecode), Err(timecode) => { let last_timecode = @@ -201,6 +272,12 @@ impl State { self.update_timestamp(element, &timecode); buffer.set_pts(self.last_position); + + if self.discont { + buffer.set_flags(gst::BufferFlags::DISCONT); + self.discont = false; + } + buffer.set_duration( gst::SECOND .mul_div_ceil(*framerate.denom() as u64, *framerate.numer() as u64) @@ -211,32 +288,57 @@ impl State { fn create_events( &mut self, element: &gst::Element, - format: Format, + format: Option, framerate: &gst::Fraction, ) -> Vec { let mut events = Vec::new(); - if self.format != Some(format) { - self.format = Some(format); + if self.need_flush_stop { + let mut b = gst::Event::new_flush_stop(true); - let caps = match format { - Format::Cea708Cdp => gst::Caps::builder("closedcaption/x-cea-708") - .field("format", &"cdp") - .field("framerate", framerate) - .build(), - Format::Cea608 => gst::Caps::builder("closedcaption/x-cea-608") - .field("format", &"s334-1a") - .field("framerate", framerate) - .build(), - }; + if self.seek_seqnum != gst::event::SEQNUM_INVALID { + b = b.seqnum(self.seek_seqnum); + } - events.push(gst::Event::new_caps(&caps).build()); - gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps); + events.push(b.build()); + self.need_flush_stop = false; + } + + if let Some(pull) = &mut self.pull { + if pull.need_stream_start { + events.push(gst::Event::new_stream_start(&pull.stream_id).build()); + pull.need_stream_start = false; + } + } + + if let Some(format) = format { + if self.format != Some(format) { + self.format = Some(format); + + let caps = match format { + Format::Cea708Cdp => gst::Caps::builder("closedcaption/x-cea-708") + .field("format", &"cdp") + .field("framerate", framerate) + .build(), + Format::Cea608 => gst::Caps::builder("closedcaption/x-cea-608") + .field("format", &"s334-1a") + .field("framerate", framerate) + .build(), + }; + + events.push(gst::Event::new_caps(&caps).build()); + gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps); + } } if self.need_segment { - let segment = gst::FormattedSegment::::new(); - events.push(gst::Event::new_segment(&segment).build()); + let mut b = gst::Event::new_segment(&self.segment); + + if self.seek_seqnum != gst::event::SEQNUM_INVALID { + b = b.seqnum(self.seek_seqnum); + } + + events.push(b.build()); self.need_segment = false; } @@ -272,6 +374,26 @@ impl AsMut<[u8]> for OffsetVec { impl MccParse { fn set_pad_functions(sinkpad: &gst::Pad, srcpad: &gst::Pad) { + sinkpad.set_activate_function(|pad, parent| { + MccParse::catch_panic_pad_function( + parent, + || Err(gst_loggable_error!(CAT, "Panic activating sink pad")), + |parse, element| parse.sink_activate(pad, element), + ) + }); + + sinkpad.set_activatemode_function(|pad, parent, mode, active| { + MccParse::catch_panic_pad_function( + parent, + || { + Err(gst_loggable_error!( + CAT, + "Panic activating sink pad with mode" + )) + }, + |parse, element| parse.sink_activatemode(pad, element, mode, active), + ) + }); sinkpad.set_chain_function(|pad, parent, buffer| { MccParse::catch_panic_pad_function( parent, @@ -307,6 +429,7 @@ impl MccParse { &self, element: &gst::Element, buffer: Option, + scan_tc_rate: bool, ) -> Result { let mut state = self.state.lock().unwrap(); @@ -331,8 +454,10 @@ impl MccParse { loop { let line = state.get_line(drain); match line { - Ok(Some(MccLine::Caption(tc, data))) => { - gst_trace!( + Ok(Some(MccLine::Caption(tc, Some(data)))) => { + assert!(!state.seeking); + + gst_debug!( CAT, obj: element, "Got caption buffer with timecode {:?} and size {}", @@ -340,6 +465,16 @@ impl MccParse { data.len() ); + if scan_tc_rate { + gst_element_error!( + element, + gst::StreamError::Decode, + ["Found caption line while scanning for timecode rate"] + ); + + break Err(gst::FlowError::Error); + } + if data.len() < 3 { gst_debug!( CAT, @@ -373,6 +508,21 @@ impl MccParse { state = self.handle_line(element, tc, data, format, state)?; } + Ok(Some(MccLine::Caption(tc, None))) => { + assert!(state.seeking); + + if scan_tc_rate { + gst_element_error!( + element, + gst::StreamError::Decode, + ["Found caption line while scanning for timecode rate"] + ); + + break Err(gst::FlowError::Error); + } + + state = self.handle_skipped_line(element, tc, state)?; + } Ok(Some(MccLine::TimeCodeRate(rate, df))) => { gst_debug!( CAT, @@ -382,6 +532,10 @@ impl MccParse { df ); state.timecode_rate = Some((rate, df)); + + if scan_tc_rate { + break Ok(gst::FlowSuccess::Ok); + } } Ok(Some(line)) => { gst_debug!(CAT, obj: element, "Got line '{:?}'", line); @@ -395,11 +549,50 @@ impl MccParse { break Err(gst::FlowError::Error); } - Ok(None) => break Ok(gst::FlowSuccess::Ok), + Ok(None) => { + if scan_tc_rate { + gst_element_error!( + element, + gst::StreamError::Decode, + ["Found end of input while scanning for timecode rate"] + ); + + break Err(gst::FlowError::Error); + } + + if drain && state.pull.is_some() { + break Err(gst::FlowError::Eos); + } + break Ok(gst::FlowSuccess::Ok); + } } } } + fn handle_skipped_line( + &self, + element: &gst::Element, + tc: TimeCode, + mut state: MutexGuard, + ) -> Result, gst::FlowError> { + let (framerate, drop_frame) = parse_timecode_rate(state.timecode_rate)?; + let timecode = state.handle_timecode(element, framerate, drop_frame, tc)?; + let nsecs = gst::ClockTime::from(timecode.nsec_since_daily_jam()); + + state.last_timecode = Some(timecode); + + if nsecs >= state.segment.get_start() { + state.seeking = false; + state.discont = true; + state.replay_last_line = true; + state.need_flush_stop = true; + } + + drop(state); + + Ok(self.state.lock().unwrap()) + } + fn handle_line( &self, element: &gst::Element, @@ -408,22 +601,9 @@ impl MccParse { format: Format, mut state: MutexGuard, ) -> Result, gst::FlowError> { - let (framerate, drop_frame) = match state.timecode_rate { - Some((rate, false)) => (gst::Fraction::new(rate as i32, 1), false), - Some((rate, true)) => (gst::Fraction::new(rate as i32 * 1000, 1001), true), - None => { - gst_element_error!( - element, - gst::StreamError::Decode, - ["Got caption before time code rate"] - ); - - return Err(gst::FlowError::Error); - } - }; - - let events = state.create_events(element, format, &framerate); - let timecode = state.handle_timecode(element, tc, framerate, drop_frame)?; + let (framerate, drop_frame) = parse_timecode_rate(state.timecode_rate)?; + let events = state.create_events(element, Some(format), &framerate); + let timecode = state.handle_timecode(element, framerate, drop_frame, tc)?; let len = data[2] as usize; let mut buffer = gst::Buffer::from_mut_slice(OffsetVec { @@ -437,6 +617,9 @@ impl MccParse { // Update the last_timecode to the current one state.last_timecode = Some(timecode); + let send_eos = state.segment.get_stop().is_some() + && buffer.get_pts() + buffer.get_duration() >= state.segment.get_stop(); + // Drop our state mutex while we push out buffers or events drop(state); @@ -446,13 +629,324 @@ impl MccParse { } self.srcpad.push(buffer).map_err(|err| { - gst_error!(CAT, obj: element, "Pushing buffer returned {:?}", err); + if err != gst::FlowError::Flushing { + gst_error!(CAT, obj: element, "Pushing buffer returned {:?}", err); + } err })?; + if send_eos { + return Err(gst::FlowError::Eos); + } + Ok(self.state.lock().unwrap()) } + fn sink_activate( + &self, + pad: &gst::Pad, + element: &gst::Element, + ) -> Result<(), gst::LoggableError> { + let mode = { + let mut query = gst::Query::new_scheduling(); + let mut state = self.state.lock().unwrap(); + + state.pull = None; + + if !pad.peer_query(&mut query) { + gst_debug!(CAT, obj: pad, "Scheduling query failed on peer"); + gst::PadMode::Push + } else if query + .has_scheduling_mode_with_flags(gst::PadMode::Pull, gst::SchedulingFlags::SEEKABLE) + { + gst_debug!(CAT, obj: pad, "Activating in Pull mode"); + + state.pull = Some(PullState::new(element, &self.srcpad)); + + gst::PadMode::Pull + } else { + gst_debug!(CAT, obj: pad, "Activating in Push mode"); + gst::PadMode::Push + } + }; + + pad.activate_mode(mode, true)?; + Ok(()) + } + + fn start_task(&self, element: &gst::Element) -> Result<(), gst::LoggableError> { + let element_weak = element.downgrade(); + let pad_weak = self.sinkpad.downgrade(); + if let Err(_) = self.sinkpad.start_task(move || { + let element = match element_weak.upgrade() { + Some(element) => element, + None => { + if let Some(pad) = pad_weak.upgrade() { + pad.pause_task().unwrap(); + } + return; + } + }; + + let parse = Self::from_instance(&element); + parse.loop_fn(&element); + }) { + return Err(gst_loggable_error!(CAT, "Failed to start pad task")); + } + Ok(()) + } + + fn sink_activatemode( + &self, + _pad: &gst::Pad, + element: &gst::Element, + mode: gst::PadMode, + active: bool, + ) -> Result<(), gst::LoggableError> { + if active { + if mode == gst::PadMode::Pull { + self.start_task(element)?; + } + } else { + if mode == gst::PadMode::Pull { + let _ = self.sinkpad.stop_task(); + } + } + + Ok(()) + } + + fn scan_duration( + &self, + element: &gst::Element, + ) -> Result, gst::LoggableError> { + gst_debug!(CAT, obj: element, "Scanning duration"); + + /* First let's query the bytes duration upstream */ + let mut q = gst::query::Query::new_duration(gst::Format::Bytes); + + if !self.sinkpad.peer_query(&mut q) { + return Err(gst_loggable_error!( + CAT, + "Failed to query upstream duration" + )); + } + + let size = match q.get_result().try_into_bytes().unwrap() { + gst::format::Bytes(Some(size)) => size, + gst::format::Bytes(None) => { + return Err(gst_loggable_error!( + CAT, + "Failed to query upstream duration" + )); + } + }; + + let mut offset = size; + let mut buffers = Vec::new(); + let mut last_tc = None; + + loop { + let scan_size = cmp::min(offset, 4096); + + offset -= scan_size; + + match self.sinkpad.pull_range(offset, scan_size as u32) { + Ok(buffer) => { + buffers.push(buffer); + } + Err(flow) => { + return Err(gst_loggable_error!( + CAT, + "Failed to pull buffer while scanning duration: {:?}", + flow + )); + } + } + + let mut reader = LineReader::new(); + let mut parser = MccParser::new_scan_captions(); + + for buf in buffers.iter().rev() { + let buf = buf + .clone() + .into_mapped_buffer_readable() + .map_err(|_| gst_loggable_error!(CAT, "Failed to map buffer readable"))?; + + reader.push(buf); + } + + loop { + let line = match reader.get_line_with_drain(true) { + Some(line) => line, + None => { + break; + } + }; + + match parser.parse_line(line, false).map_err(|err| (line, err)) { + Ok(MccLine::Caption(tc, None)) => { + let state = self.state.lock().unwrap(); + let (framerate, drop_frame) = parse_timecode_rate(state.timecode_rate) + .map_err(|_| { + gst_loggable_error!(CAT, "Failed to parse timecode rate") + })?; + last_tc = match parse_timecode(framerate, drop_frame, tc) { + Ok(mut timecode) => { + /* We're looking for the total duration */ + timecode.increment_frame(); + Some(timecode) + } + Err(_) => None, + } + } + _ => { /* We ignore everything else including errors */ } + } + } + + if last_tc.is_some() || offset == 0 { + gst_debug!( + CAT, + obj: element, + "Duration scan done, last_tc: {:?}", + last_tc + ); + break (Ok(last_tc)); + } + } + } + + fn push_eos(&self, element: &gst::Element) { + let mut state = self.state.lock().unwrap(); + + if state.seeking { + state.need_flush_stop = true; + } + + match parse_timecode_rate(state.timecode_rate) { + Ok((framerate, _)) => { + let mut events = state.create_events(element, None, &framerate); + let mut eos_event = gst::Event::new_eos(); + + if state.seek_seqnum != gst::event::SEQNUM_INVALID { + eos_event = eos_event.seqnum(state.seek_seqnum); + } + + events.push(eos_event.build()); + + // Drop our state mutex while we push out events + drop(state); + + for event in events { + gst_debug!(CAT, obj: element, "Pushing event {:?}", event); + self.srcpad.push_event(event); + } + } + Err(_) => { + gst_element_error!( + element, + gst::StreamError::Failed, + ["Streaming stopped, failed to parse timecode rate"] + ); + } + } + } + + fn loop_fn(&self, element: &gst::Element) { + let mut state = self.state.lock().unwrap(); + let State { + timecode_rate: ref tc_rate, + ref mut pull, + .. + } = *state; + let mut pull = pull.as_mut().unwrap(); + let scan_tc_rate = tc_rate.is_none() && pull.duration.is_none(); + let offset = pull.offset; + + pull.offset += 4096; + + drop(state); + + let buffer = match self.sinkpad.pull_range(offset, 4096) { + Ok(buffer) => Some(buffer), + Err(gst::FlowError::Eos) => None, + Err(gst::FlowError::Flushing) => { + gst_debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing"); + + self.sinkpad.pause_task().unwrap(); + return; + } + Err(flow) => { + gst_error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow); + + gst_element_error!( + element, + gst::StreamError::Failed, + ["Streaming stopped, failed to pull buffer"] + ); + + self.sinkpad.pause_task().unwrap(); + return; + } + }; + + match self.handle_buffer(element, buffer, scan_tc_rate) { + Ok(_) => { + let tc_rate = self.state.lock().unwrap().timecode_rate; + if scan_tc_rate && tc_rate.is_some() { + match self.scan_duration(element) { + Ok(Some(tc)) => { + let mut state = self.state.lock().unwrap(); + let mut pull = state.pull.as_mut().unwrap(); + pull.duration = tc.nsec_since_daily_jam().into(); + } + Ok(None) => { + let mut state = self.state.lock().unwrap(); + let mut pull = state.pull.as_mut().unwrap(); + pull.duration = 0.into(); + } + Err(err) => { + err.log(); + + gst_element_error!( + element, + gst::StreamError::Decode, + ["Failed to scan duration"] + ); + + self.sinkpad.pause_task().unwrap(); + } + } + } + } + Err(flow) => { + match flow { + gst::FlowError::Flushing => { + gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow); + } + gst::FlowError::Eos => { + self.push_eos(element); + + gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow); + } + _ => { + self.push_eos(element); + + gst_error!(CAT, obj: element, "Pausing after flow {:?}", flow); + + gst_element_error!( + element, + gst::StreamError::Failed, + ["Streaming stopped, reason: {:?}", flow] + ); + } + } + + self.sinkpad.pause_task().unwrap(); + } + } + } + fn sink_chain( &self, pad: &gst::Pad, @@ -461,7 +955,27 @@ impl MccParse { ) -> Result { gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); - self.handle_buffer(element, Some(buffer)) + self.handle_buffer(element, Some(buffer), false) + } + + fn flush(&self, mut state: MutexGuard) -> MutexGuard { + state.reader.clear(); + state.parser.reset(); + if let Some(pull) = &mut state.pull { + pull.offset = 0; + } + state.segment = gst::FormattedSegment::::new(); + state.need_segment = true; + state.pending_events.clear(); + state.start_position = 0.into(); + state.last_position = 0.into(); + state.last_timecode = None; + state.timecode_rate = None; + state.last_raw_line = [].to_vec(); + + drop(state); + + self.state.lock().unwrap() } fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { @@ -481,21 +995,15 @@ impl MccParse { true } EventView::FlushStop(_) => { - let mut state = self.state.lock().unwrap(); - state.reader.clear(); - state.parser.reset(); - state.need_segment = true; - state.pending_events.clear(); - state.start_position = gst::ClockTime::from_seconds(0); - state.last_position = gst::ClockTime::from_seconds(0); - state.last_timecode = None; - state.timecode_rate = None; + let state = self.state.lock().unwrap(); + + let _ = self.flush(state); pad.event_default(element, event) } EventView::Eos(_) => { gst_log!(CAT, obj: pad, "Draining"); - if let Err(err) = self.handle_buffer(element, None) { + if let Err(err) = self.handle_buffer(element, None, false) { gst_error!(CAT, obj: pad, "Failed to drain parser: {:?}", err); } pad.event_default(element, event) @@ -516,15 +1024,104 @@ impl MccParse { } } + fn perform_seek(&self, event: &gst::event::Seek, element: &gst::Element) -> bool { + let mut state = self.state.lock().unwrap(); + + if state.pull.is_none() { + gst_error!(CAT, obj: element, "seeking is only supported in pull mode"); + return false; + } + + let (rate, flags, start_type, start, stop_type, stop) = event.get(); + + let mut start = match start.try_into_time() { + Ok(start) => start, + Err(_) => { + gst_error!(CAT, obj: element, "seek has invalid format"); + return false; + } + }; + + let mut stop = match stop.try_into_time() { + Ok(stop) => stop, + Err(_) => { + gst_error!(CAT, obj: element, "seek has invalid format"); + return false; + } + }; + + if !flags.contains(gst::SeekFlags::FLUSH) { + gst_error!(CAT, obj: element, "only flushing seeks are supported"); + return false; + } + + if start_type == gst::SeekType::End || stop_type == gst::SeekType::End { + gst_error!(CAT, obj: element, "Relative seeks are not supported"); + return false; + } + + let pull = state.pull.as_ref().unwrap(); + + if start_type == gst::SeekType::Set && pull.duration.is_some() { + start = cmp::min(start, pull.duration); + } + + if stop_type == gst::SeekType::Set && pull.duration.is_some() { + stop = cmp::min(stop, pull.duration); + } + + state.seeking = true; + state.seek_seqnum = event.get_seqnum(); + + let event = gst::Event::new_flush_start() + .seqnum(state.seek_seqnum) + .build(); + + gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event); + self.sinkpad.push_event(event); + + let event = gst::Event::new_flush_start() + .seqnum(state.seek_seqnum) + .build(); + + gst_debug!(CAT, obj: element, "Pushing event {:?}", event); + self.srcpad.push_event(event); + + self.sinkpad.pause_task().unwrap(); + + state = self.flush(state); + + let event = gst::Event::new_flush_stop(true) + .seqnum(state.seek_seqnum) + .build(); + + /* Drop our state while we push a serialized event upstream */ + drop(state); + + gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event); + self.sinkpad.push_event(event); + + state = self.state.lock().unwrap(); + + state + .segment + .do_seek(rate, flags, start_type, start, stop_type, stop); + + match self.start_task(element) { + Err(error) => { + error.log(); + false + } + _ => true, + } + } + fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { use gst::EventView; gst_log!(CAT, obj: pad, "Handling event {:?}", event); match event.view() { - EventView::Seek(_) => { - gst_log!(CAT, obj: pad, "Dropping seek event"); - false - } + EventView::Seek(e) => self.perform_seek(&e, element), _ => pad.event_default(element, event), } } @@ -536,14 +1133,24 @@ impl MccParse { match query.view_mut() { QueryView::Seeking(mut q) => { - // We don't support any seeking at all + let state = self.state.lock().unwrap(); + let fmt = q.get_format(); - q.set( - false, - gst::GenericFormattedValue::Other(fmt, -1), - gst::GenericFormattedValue::Other(fmt, -1), - ); - true + + if fmt == gst::Format::Time { + if let Some(pull) = state.pull.as_ref() { + q.set( + true, + gst::GenericFormattedValue::Time(0.into()), + gst::GenericFormattedValue::Time(pull.duration), + ); + true + } else { + false + } + } else { + false + } } QueryView::Position(ref mut q) => { // For Time answer ourselfs, otherwise forward @@ -555,6 +1162,24 @@ impl MccParse { self.sinkpad.peer_query(query) } } + QueryView::Duration(ref mut q) => { + // For Time answer ourselfs, otherwise forward + let state = self.state.lock().unwrap(); + if q.get_format() == gst::Format::Time { + if let Some(pull) = state.pull.as_ref() { + if pull.duration.is_some() { + q.set(state.pull.as_ref().unwrap().duration); + true + } else { + false + } + } else { + false + } + } else { + self.sinkpad.peer_query(query) + } + } _ => pad.query_default(element, query), } } diff --git a/gst-plugin-closedcaption/src/mcc_parser.rs b/gst-plugin-closedcaption/src/mcc_parser.rs index fe9aca6a..5eebf6ab 100644 --- a/gst-plugin-closedcaption/src/mcc_parser.rs +++ b/gst-plugin-closedcaption/src/mcc_parser.rs @@ -41,7 +41,7 @@ pub enum MccLine<'a> { UUID(&'a [u8]), Metadata(&'a [u8], &'a [u8]), TimeCodeRate(u8, bool), - Caption(TimeCode, Vec), + Caption(TimeCode, Option>), } #[derive(Clone, Copy, Debug, PartialEq, Eq)] @@ -340,7 +340,7 @@ where } /// Parser for a MCC caption line in the form `timecode\tpayload`. -fn caption<'a, I: 'a>() -> impl Parser> +fn caption<'a, I: 'a>(parse_payload: bool) -> impl Parser> where I: RangeStream, I::Error: ParseError, @@ -353,7 +353,11 @@ where optional((token(b','), digits())), )), token(b'\t'), - mcc_payload(), + if parse_payload { + mcc_payload().map(|v| Some(v)).right() + } else { + skip_many(any()).map(|_| None).left() + }, end_of_line(), ) .map(|(tc, _, _, value, _)| MccLine::Caption(tc, value)) @@ -366,6 +370,12 @@ impl MccParser { Self { state: State::Init } } + pub fn new_scan_captions() -> Self { + Self { + state: State::Captions, + } + } + pub fn reset(&mut self) { self.state = State::Init; } @@ -373,6 +383,7 @@ impl MccParser { pub fn parse_line<'a>( &mut self, line: &'a [u8], + parse_payload: bool, ) -> Result, combine::easy::Errors> { match self.state { @@ -410,7 +421,7 @@ impl MccParser { v.0 }), - State::Captions => caption() + State::Captions => caption(parse_payload) .message("while in Captions state") .easy_parse(line) .map(|v| v.0), @@ -654,7 +665,7 @@ mod tests { #[test] fn test_caption() { - let mut parser = caption(); + let mut parser = caption(true); assert_eq!( parser.parse(b"00:00:00:00\tT52S524F67ZZ72F4QROO7391UC13FFF74ZZAEB4".as_ref()), Ok(( @@ -666,7 +677,7 @@ mod tests { frames: 0, drop_frame: false }, - vec![ + Some(vec![ 0x61, 0x01, 0x52, 0x96, 0x69, 0x52, 0x4F, 0x67, 0x00, 0x00, 0x72, 0xF4, 0xFC, 0x80, 0x80, 0xFD, 0x80, 0x80, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, @@ -675,7 +686,7 @@ mod tests { 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0x73, 0x91, 0xE1, 0x00, 0x00, 0xC1, 0x3F, 0xFF, 0x74, 0x00, 0x00, 0xAE, 0xB4 - ] + ]) ), b"".as_ref() )) @@ -692,7 +703,7 @@ mod tests { frames: 0, drop_frame: false }, - vec![ + Some(vec![ 0x61, 0x01, 0x52, 0x96, 0x69, 0x52, 0x4F, 0x67, 0x00, 0x00, 0x72, 0xF4, 0xFC, 0x80, 0x80, 0xFD, 0x80, 0x80, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, @@ -701,7 +712,7 @@ mod tests { 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0x73, 0x91, 0xE1, 0x00, 0x00, 0xC1, 0x3F, 0xFF, 0x74, 0x00, 0x00, 0xAE, 0xB4 - ] + ]) ), b"".as_ref() )) @@ -718,7 +729,7 @@ mod tests { frames: 0, drop_frame: false }, - vec![ + Some(vec![ 0x61, 0x01, 0x52, 0x96, 0x69, 0x52, 0x4F, 0x67, 0x00, 0x00, 0x72, 0xF4, 0xFC, 0x80, 0x80, 0xFD, 0x80, 0x80, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, @@ -727,7 +738,7 @@ mod tests { 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0xFA, 0x00, 0x00, 0x73, 0x91, 0xE1, 0x00, 0x00, 0xC1, 0x3F, 0xFF, 0x74, 0x00, 0x00, 0xAE, 0xB4 - ] + ]) ), b"".as_ref() )) @@ -749,7 +760,7 @@ mod tests { reader.push(Vec::from(mcc_file.as_ref())); while let Some(line) = reader.get_line() { - let res = match parser.parse_line(line) { + let res = match parser.parse_line(line, true) { Ok(res) => res, Err(err) => panic!("Couldn't parse line {}: {:?}", line_cnt, err), }; diff --git a/gst-plugin-closedcaption/tests/mcc_parse.rs b/gst-plugin-closedcaption/tests/mcc_parse.rs index 984f77a9..8a251c3f 100644 --- a/gst-plugin-closedcaption/tests/mcc_parse.rs +++ b/gst-plugin-closedcaption/tests/mcc_parse.rs @@ -19,7 +19,9 @@ extern crate pretty_assertions; use gst::prelude::*; +use gst::EventView; use rand::{Rng, SeedableRng}; +use std::path::PathBuf; fn init() { use std::sync::{Once, ONCE_INIT}; @@ -127,3 +129,84 @@ fn test_parse() { .build() ); } + +#[test] +fn test_pull() { + init(); + + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("tests/captions-test_708.mcc"); + + let mut h = gst_check::Harness::new_parse(&format!("filesrc location={:?} ! mccparse", path)); + + h.play(); + + /* Let's first pull until EOS */ + loop { + let mut done = false; + + while h.events_in_queue() != 0 { + let event = h.pull_event(); + + if let Some(event) = event { + match event.view() { + EventView::Eos(_) => { + done = true; + break; + } + _ => (), + } + } + } + + while h.buffers_in_queue() != 0 { + let _ = h.pull(); + } + + if done { + break; + } + } + + /* Now seek and check that we receive buffers with appropriate PTS */ + h.push_upstream_event( + gst::Event::new_seek( + 1.0, + gst::SeekFlags::FLUSH, + gst::SeekType::Set, + gst::GenericFormattedValue::Time(gst::SECOND.into()), + gst::SeekType::Set, + gst::GenericFormattedValue::Time((2 * gst::SECOND).into()), + ) + .build(), + ); + + loop { + let mut done = false; + + while h.buffers_in_queue() != 0 { + if let Some(buffer) = h.pull() { + let pts = buffer.get_pts(); + assert!(pts > gst::SECOND && pts < 2 * gst::SECOND); + } + } + + while h.events_in_queue() != 0 { + let event = h.pull_event(); + + if let Some(event) = event { + match event.view() { + EventView::Eos(_) => { + done = true; + break; + } + _ => (), + } + } + } + + if done { + break; + } + } +}