From 58786fa0b5f122f2ccd60277739fb1450b3a71f7 Mon Sep 17 00:00:00 2001 From: Seungha Yang Date: Fri, 27 Nov 2020 21:40:40 +0900 Subject: [PATCH] sccparse: Add support for seeking Only pull mode can support seeking for now and reverse playback is not implemented yet. Note that this restriction is the same as that of mccparse. --- video/closedcaption/src/scc_parse/imp.rs | 711 ++++++++++++++++++-- video/closedcaption/src/scc_parse/parser.rs | 6 + video/closedcaption/tests/scc_parse.rs | 82 +++ 3 files changed, 743 insertions(+), 56 deletions(-) diff --git a/video/closedcaption/src/scc_parse/imp.rs b/video/closedcaption/src/scc_parse/imp.rs index 8fc00054..ebe3c08f 100644 --- a/video/closedcaption/src/scc_parse/imp.rs +++ b/video/closedcaption/src/scc_parse/imp.rs @@ -20,8 +20,13 @@ use glib::subclass; use glib::subclass::prelude::*; use gst::prelude::*; use gst::subclass::prelude::*; -use gst::{gst_debug, gst_element_error, gst_error, gst_fixme, gst_log, gst_trace, gst_warning}; +use gst::{ + gst_debug, gst_element_error, gst_error, gst_fixme, gst_info, gst_log, gst_loggable_error, + gst_trace, gst_warning, +}; +use std::cmp; +use std::convert::TryInto; use std::sync::{Mutex, MutexGuard}; use once_cell::sync::Lazy; @@ -38,6 +43,28 @@ static CAT: Lazy = Lazy::new(|| { ) }); +#[derive(Debug)] +struct PullState { + need_stream_start: bool, + stream_id: String, + offset: u64, + duration: gst::ClockTime, +} + +impl PullState { + fn new(element: &super::SccParse, pad: &gst::Pad) -> Self { + Self { + need_stream_start: true, + stream_id: pad + .create_stream_id(element, Some("src")) + .unwrap() + .to_string(), + offset: 0, + duration: gst::CLOCK_TIME_NONE, + } + } +} + #[derive(Debug)] struct State { reader: LineReader>, @@ -47,6 +74,16 @@ struct State { framerate: Option, last_position: gst::ClockTime, last_timecode: Option, + segment: gst::FormattedSegment, + + // Pull mode + pull: Option, + + // seeking + seeking: bool, + discont: bool, + seek_seqnum: Option, + need_flush_stop: bool, } impl Default for State { @@ -59,10 +96,43 @@ impl Default for State { framerate: None, last_position: gst::CLOCK_TIME_NONE, last_timecode: None, + segment: gst::FormattedSegment::::new(), + pull: None, + seeking: false, + discont: false, + seek_seqnum: None, + need_flush_stop: false, } } } +fn parse_timecode( + framerate: gst::Fraction, + tc: &TimeCode, +) -> Result { + use std::convert::TryFrom; + + let timecode = gst_video::VideoTimeCode::new( + framerate, + None, + if tc.drop_frame { + gst_video::VideoTimeCodeFlags::DROP_FRAME + } else { + gst_video::VideoTimeCodeFlags::empty() + }, + tc.hours, + tc.minutes, + tc.seconds, + tc.frames, + 0, + ); + + match gst_video::ValidVideoTimeCode::try_from(timecode).map_err(|_| gst::FlowError::Error) { + Ok(timecode) => Ok(timecode), + Err(timecode) => Err(timecode), + } +} + impl State { #[allow(clippy::type_complexity)] fn get_line( @@ -84,28 +154,11 @@ impl State { fn handle_timecode( &mut self, - tc: TimeCode, + tc: &TimeCode, framerate: gst::Fraction, element: &super::SccParse, ) -> Result { - use std::convert::TryFrom; - - let timecode = gst_video::VideoTimeCode::new( - framerate, - None, - if tc.drop_frame { - gst_video::VideoTimeCodeFlags::DROP_FRAME - } else { - gst_video::VideoTimeCodeFlags::empty() - }, - tc.hours, - tc.minutes, - tc.seconds, - tc.frames, - 0, - ); - - match gst_video::ValidVideoTimeCode::try_from(timecode) { + match parse_timecode(framerate, &tc) { Ok(timecode) => Ok(timecode), Err(timecode) => { let last_timecode = @@ -176,6 +229,61 @@ impl State { .unwrap_or(gst::CLOCK_TIME_NONE), ); } + + fn create_events( + &mut self, + element: &super::SccParse, + framerate: Option, + ) -> Vec { + let mut events = Vec::new(); + + if self.need_flush_stop { + let mut b = gst::event::FlushStop::builder(true); + + if let Some(seek_seqnum) = self.seek_seqnum { + b = b.seqnum(seek_seqnum); + } + + events.push(b.build()); + self.need_flush_stop = false; + } + + if let Some(pull) = &mut self.pull { + if pull.need_stream_start { + events.push(gst::event::StreamStart::new(&pull.stream_id)); + pull.need_stream_start = false; + } + } + + if let Some(framerate) = framerate { + if self.framerate != Some(framerate) { + self.framerate = Some(framerate); + + let caps = gst::Caps::builder("closedcaption/x-cea-608") + .field("format", &"raw") + .field("framerate", &framerate) + .build(); + self.framerate = Some(framerate); + + events.push(gst::event::Caps::new(&caps)); + gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps); + } + } + + if self.need_segment { + let mut b = gst::event::Segment::builder(&self.segment); + + if let Some(seek_seqnum) = self.seek_seqnum { + b = b.seqnum(seek_seqnum); + } + + events.push(b.build()); + self.need_segment = false; + } + + events.extend(self.pending_events.drain(..)); + events + } } pub struct SccParse { @@ -228,7 +336,12 @@ impl SccParse { break Err(gst::FlowError::Error); } - Ok(None) => break Ok(gst::FlowSuccess::Ok), + Ok(None) => { + if drain && state.pull.is_some() { + break Err(gst::FlowError::Eos); + } + break Ok(gst::FlowSuccess::Ok); + } } } } @@ -256,27 +369,52 @@ impl SccParse { gst::Fraction::new(30, 1) }; - let mut events = Vec::new(); + let mut timecode = state.handle_timecode(&tc, framerate, element)?; + let start_time = gst::ClockTime::from(timecode.nsec_since_daily_jam()); + let segment_start = state.segment.get_start(); + let clip_buffers = if state.seeking { + // If we are in the middle of seeking, check whether this line + // contains start frame, and if so, unset seeking flag + let num_bufs = (data.len() / 2) as i64; + let mut end_timecode = parse_timecode(framerate, &tc).unwrap(); + // add one more frame here so that add duration of the last frame + end_timecode.add_frames(num_bufs + 1); + let stop_time = gst::ClockTime::from(end_timecode.nsec_since_daily_jam()); - if Some(framerate) != state.framerate { - let caps = gst::Caps::builder("closedcaption/x-cea-608") - .field("format", &"raw") - .field("framerate", &framerate) - .build(); - events.push(gst::event::Caps::new(&caps)); - state.framerate = Some(framerate); - } + gst_trace!( + CAT, + obj: element, + "Checking inside of segment, line start {} line stop {} segment start {} num bufs {}", + start_time, + stop_time, + segment_start, + num_bufs, + ); - if state.need_segment { - let segment = gst::FormattedSegment::::new(); - events.push(gst::event::Segment::new(&segment)); - state.need_segment = false; - } + if stop_time > segment_start { + state.seeking = false; + state.discont = true; + state.need_flush_stop = true; + } - events.extend(state.pending_events.drain(..)); + // Still need to scan lines to find the first buffer + if state.seeking { + drop(state); + return Ok(self.state.lock().unwrap()); + } - let mut timecode = state.handle_timecode(tc, framerate, element)?; - let mut buffers = gst::BufferList::new_sized(data.len() / 2); + true + } else { + false + }; + + let mut buffers = if clip_buffers { + gst::BufferList::new() + } else { + gst::BufferList::new_sized(data.len() / 2) + }; + + let mut send_eos = false; for d in data.chunks_exact(2) { let mut buffer = gst::Buffer::with_size(d.len()).unwrap(); { @@ -286,13 +424,38 @@ impl SccParse { state.add_buffer_metadata(&mut buffer, &timecode, framerate, element); timecode.increment_frame(); + + if clip_buffers { + let end_time = buffer.get_pts() + buffer.get_duration(); + if end_time < segment_start { + gst_trace!( + CAT, + obj: element, + "Skip segment clipped buffer {:?}", + buffer, + ); + + continue; + } + } + + send_eos = state.segment.get_stop().is_some() + && buffer.get_pts() + buffer.get_duration() >= state.segment.get_stop(); + let buffers = buffers.get_mut().unwrap(); buffers.add(buffer); + + // Terminate loop once we found EOS boundary buffer + if send_eos { + break; + } } // Update the last_timecode to the current one state.last_timecode = Some(timecode); + let events = state.create_events(element, Some(framerate)); + // Drop our state mutex while we push out buffers or events drop(state); @@ -306,9 +469,295 @@ impl SccParse { err })?; + if send_eos { + return Err(gst::FlowError::Eos); + } + Ok(self.state.lock().unwrap()) } + fn sink_activate( + &self, + pad: &gst::Pad, + element: &super::SccParse, + ) -> Result<(), gst::LoggableError> { + let mode = { + let mut query = gst::query::Scheduling::new(); + let mut state = self.state.lock().unwrap(); + + state.pull = None; + + if !pad.peer_query(&mut query) { + gst_debug!(CAT, obj: pad, "Scheduling query failed on peer"); + gst::PadMode::Push + } else if query + .has_scheduling_mode_with_flags(gst::PadMode::Pull, gst::SchedulingFlags::SEEKABLE) + { + gst_debug!(CAT, obj: pad, "Activating in Pull mode"); + + state.pull = Some(PullState::new(element, &self.srcpad)); + + gst::PadMode::Pull + } else { + gst_debug!(CAT, obj: pad, "Activating in Push mode"); + gst::PadMode::Push + } + }; + + pad.activate_mode(mode, true)?; + Ok(()) + } + + fn start_task(&self, element: &super::SccParse) -> Result<(), gst::LoggableError> { + let element_weak = element.downgrade(); + let pad_weak = self.sinkpad.downgrade(); + let res = self.sinkpad.start_task(move || { + let element = match element_weak.upgrade() { + Some(element) => element, + None => { + if let Some(pad) = pad_weak.upgrade() { + pad.pause_task().unwrap(); + } + return; + } + }; + + let parse = Self::from_instance(&element); + parse.loop_fn(&element); + }); + if res.is_err() { + return Err(gst_loggable_error!(CAT, "Failed to start pad task")); + } + Ok(()) + } + + fn sink_activatemode( + &self, + _pad: &gst::Pad, + element: &super::SccParse, + mode: gst::PadMode, + active: bool, + ) -> Result<(), gst::LoggableError> { + if mode == gst::PadMode::Pull { + if active { + self.start_task(element)?; + } else { + let _ = self.sinkpad.stop_task(); + } + } + + Ok(()) + } + + fn scan_duration( + &self, + element: &super::SccParse, + ) -> Result, gst::LoggableError> { + gst_debug!(CAT, obj: element, "Scanning duration"); + + /* First let's query the bytes duration upstream */ + let mut q = gst::query::Duration::new(gst::Format::Bytes); + + if !self.sinkpad.peer_query(&mut q) { + return Err(gst_loggable_error!( + CAT, + "Failed to query upstream duration" + )); + } + + let size = match q.get_result().try_into().unwrap() { + gst::format::Bytes(Some(size)) => size, + gst::format::Bytes(None) => { + return Err(gst_loggable_error!( + CAT, + "Failed to query upstream duration" + )); + } + }; + + let mut offset = size; + let mut buffers = Vec::new(); + let mut last_tc = None; + + loop { + let scan_size = cmp::min(offset, 4096); + + offset -= scan_size; + + match self.sinkpad.pull_range(offset, scan_size as u32) { + Ok(buffer) => { + buffers.push(buffer); + } + Err(flow) => { + return Err(gst_loggable_error!( + CAT, + "Failed to pull buffer while scanning duration: {:?}", + flow + )); + } + } + + let mut reader = LineReader::new(); + let mut parser = SccParser::new_scan_captions(); + + for buf in buffers.iter().rev() { + let buf = buf + .clone() + .into_mapped_buffer_readable() + .map_err(|_| gst_loggable_error!(CAT, "Failed to map buffer readable"))?; + + reader.push(buf); + } + + while let Some(line) = reader.get_line_with_drain(true) { + if let Ok(SccLine::Caption(tc, data)) = + parser.parse_line(line).map_err(|err| (line, err)) + { + let framerate = if tc.drop_frame { + gst::Fraction::new(30000, 1001) + } else { + gst::Fraction::new(30, 1) + }; + + if let Ok(mut timecode) = parse_timecode(framerate, &tc) { + /* We're looking for the total duration */ + timecode.add_frames((data.len() / 2) as i64 + 1); + last_tc = Some(timecode); + } + } + } + + if last_tc.is_some() || offset == 0 { + gst_debug!( + CAT, + obj: element, + "Duration scan done, last_tc: {:?}", + last_tc + ); + break (Ok(last_tc)); + } + } + } + + fn push_eos(&self, element: &super::SccParse) { + let mut state = self.state.lock().unwrap(); + + if state.seeking { + state.need_flush_stop = true; + } + + let mut events = state.create_events(element, None); + let mut eos_event = gst::event::Eos::builder(); + + if let Some(seek_seqnum) = state.seek_seqnum { + eos_event = eos_event.seqnum(seek_seqnum); + } + + events.push(eos_event.build()); + + // Drop our state mutex while we push out events + drop(state); + + for event in events { + gst_debug!(CAT, obj: element, "Pushing event {:?}", event); + self.srcpad.push_event(event); + } + } + + fn loop_fn(&self, element: &super::SccParse) { + let mut state = self.state.lock().unwrap(); + let State { + ref framerate, + ref mut pull, + .. + } = *state; + let mut pull = pull.as_mut().unwrap(); + let scan_duration = framerate.is_none() && pull.duration.is_none(); + let offset = pull.offset; + + pull.offset += 4096; + + drop(state); + + let buffer = match self.sinkpad.pull_range(offset, 4096) { + Ok(buffer) => Some(buffer), + Err(gst::FlowError::Eos) => None, + Err(gst::FlowError::Flushing) => { + gst_debug!(CAT, obj: &self.sinkpad, "Pausing after pulling buffer, reason: flushing"); + + self.sinkpad.pause_task().unwrap(); + return; + } + Err(flow) => { + gst_error!(CAT, obj: &self.sinkpad, "Failed to pull, reason: {:?}", flow); + + gst_element_error!( + element, + gst::StreamError::Failed, + ["Streaming stopped, failed to pull buffer"] + ); + + self.sinkpad.pause_task().unwrap(); + return; + } + }; + + match self.handle_buffer(element, buffer) { + Ok(_) => { + if scan_duration { + match self.scan_duration(element) { + Ok(Some(tc)) => { + let mut state = self.state.lock().unwrap(); + let mut pull = state.pull.as_mut().unwrap(); + pull.duration = tc.nsec_since_daily_jam().into(); + } + Ok(None) => { + let mut state = self.state.lock().unwrap(); + let mut pull = state.pull.as_mut().unwrap(); + pull.duration = 0.into(); + } + Err(err) => { + err.log(); + + gst_element_error!( + element, + gst::StreamError::Decode, + ["Failed to scan duration"] + ); + + self.sinkpad.pause_task().unwrap(); + } + } + } + } + Err(flow) => { + match flow { + gst::FlowError::Flushing => { + gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow); + } + gst::FlowError::Eos => { + self.push_eos(element); + + gst_debug!(CAT, obj: element, "Pausing after flow {:?}", flow); + } + _ => { + self.push_eos(element); + + gst_error!(CAT, obj: element, "Pausing after flow {:?}", flow); + + gst_element_error!( + element, + gst::StreamError::Failed, + ["Streaming stopped, reason: {:?}", flow] + ); + } + } + + self.sinkpad.pause_task().unwrap(); + } + } + } + fn sink_chain( &self, pad: &gst::Pad, @@ -320,6 +769,23 @@ impl SccParse { self.handle_buffer(element, Some(buffer)) } + fn flush(&self, mut state: MutexGuard) -> MutexGuard { + state.reader.clear(); + state.parser.reset(); + if let Some(pull) = &mut state.pull { + pull.offset = 0; + } + state.segment = gst::FormattedSegment::::new(); + state.need_segment = true; + state.pending_events.clear(); + state.last_position = 0.into(); + state.last_timecode = None; + + drop(state); + + self.state.lock().unwrap() + } + fn sink_event(&self, pad: &gst::Pad, element: &super::SccParse, event: gst::Event) -> bool { use gst::EventView; @@ -337,13 +803,9 @@ impl SccParse { true } EventView::FlushStop(_) => { - let mut state = self.state.lock().unwrap(); - state.reader.clear(); - state.parser.reset(); - state.need_segment = true; - state.pending_events.clear(); - state.last_position = gst::ClockTime::from_seconds(0); - state.last_timecode = None; + let state = self.state.lock().unwrap(); + let state = self.flush(state); + drop(state); pad.event_default(Some(element), event) } @@ -370,15 +832,105 @@ impl SccParse { } } + fn perform_seek(&self, event: &gst::event::Seek, element: &super::SccParse) -> bool { + let mut state = self.state.lock().unwrap(); + + if state.pull.is_none() { + gst_error!(CAT, obj: element, "seeking is only supported in pull mode"); + return false; + } + + let (rate, flags, start_type, start, stop_type, stop) = event.get(); + + let mut start: gst::ClockTime = match start.try_into() { + Ok(start) => start, + Err(_) => { + gst_error!(CAT, obj: element, "seek has invalid format"); + return false; + } + }; + + let mut stop: gst::ClockTime = match stop.try_into() { + Ok(stop) => stop, + Err(_) => { + gst_error!(CAT, obj: element, "seek has invalid format"); + return false; + } + }; + + if !flags.contains(gst::SeekFlags::FLUSH) { + gst_error!(CAT, obj: element, "only flushing seeks are supported"); + return false; + } + + if start_type == gst::SeekType::End || stop_type == gst::SeekType::End { + gst_error!(CAT, obj: element, "Relative seeks are not supported"); + return false; + } + + let pull = state.pull.as_ref().unwrap(); + + if start_type == gst::SeekType::Set { + start = start.min(pull.duration).unwrap_or(start); + } + + if stop_type == gst::SeekType::Set { + stop = stop.min(pull.duration).unwrap_or(stop); + } + + state.seeking = true; + let seek_seqnum = event.get_seqnum(); + state.seek_seqnum = Some(seek_seqnum); + + let event = gst::event::FlushStart::builder() + .seqnum(seek_seqnum) + .build(); + + gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event); + self.sinkpad.push_event(event); + + let event = gst::event::FlushStart::builder() + .seqnum(seek_seqnum) + .build(); + + gst_debug!(CAT, obj: element, "Pushing event {:?}", event); + self.srcpad.push_event(event); + + self.sinkpad.pause_task().unwrap(); + + state = self.flush(state); + + let event = gst::event::FlushStop::builder(true) + .seqnum(seek_seqnum) + .build(); + + /* Drop our state while we push a serialized event upstream */ + drop(state); + + gst_debug!(CAT, obj: element, "Sending event {:?} upstream", event); + self.sinkpad.push_event(event); + + state = self.state.lock().unwrap(); + + state + .segment + .do_seek(rate, flags, start_type, start, stop_type, stop); + + match self.start_task(element) { + Err(error) => { + error.log(); + false + } + _ => true, + } + } + fn src_event(&self, pad: &gst::Pad, element: &super::SccParse, event: gst::Event) -> bool { use gst::EventView; gst_log!(CAT, obj: pad, "Handling event {:?}", event); match event.view() { - EventView::Seek(_) => { - gst_log!(CAT, obj: pad, "Dropping seek event"); - false - } + EventView::Seek(e) => self.perform_seek(&e, element), _ => pad.event_default(Some(element), event), } } @@ -395,14 +947,24 @@ impl SccParse { match query.view_mut() { QueryView::Seeking(mut q) => { - // We don't support any seeking at all + let state = self.state.lock().unwrap(); + let fmt = q.get_format(); - q.set( - false, - gst::GenericFormattedValue::Other(fmt, -1), - gst::GenericFormattedValue::Other(fmt, -1), - ); - true + + if fmt == gst::Format::Time { + if let Some(pull) = state.pull.as_ref() { + q.set( + true, + gst::GenericFormattedValue::Time(0.into()), + gst::GenericFormattedValue::Time(pull.duration), + ); + true + } else { + false + } + } else { + false + } } QueryView::Position(ref mut q) => { // For Time answer ourselfs, otherwise forward @@ -414,6 +976,24 @@ impl SccParse { self.sinkpad.peer_query(query) } } + QueryView::Duration(ref mut q) => { + // For Time answer ourselfs, otherwise forward + let state = self.state.lock().unwrap(); + if q.get_format() == gst::Format::Time { + if let Some(pull) = state.pull.as_ref() { + if pull.duration.is_some() { + q.set(state.pull.as_ref().unwrap().duration); + true + } else { + false + } + } else { + false + } + } else { + self.sinkpad.peer_query(query) + } + } _ => pad.query_default(Some(element), query), } } @@ -431,6 +1011,25 @@ impl ObjectSubclass for SccParse { fn with_class(klass: &Self::Class) -> Self { let templ = klass.get_pad_template("sink").unwrap(); let sinkpad = gst::Pad::builder_with_template(&templ, Some("sink")) + .activate_function(|pad, parent| { + SccParse::catch_panic_pad_function( + parent, + || Err(gst_loggable_error!(CAT, "Panic activating sink pad")), + |parse, element| parse.sink_activate(pad, element), + ) + }) + .activatemode_function(|pad, parent, mode, active| { + SccParse::catch_panic_pad_function( + parent, + || { + Err(gst_loggable_error!( + CAT, + "Panic activating sink pad with mode" + )) + }, + |parse, element| parse.sink_activatemode(pad, element, mode, active), + ) + }) .chain_function(|pad, parent, buffer| { SccParse::catch_panic_pad_function( parent, diff --git a/video/closedcaption/src/scc_parse/parser.rs b/video/closedcaption/src/scc_parse/parser.rs index b1b77cd7..53ccd4e8 100644 --- a/video/closedcaption/src/scc_parse/parser.rs +++ b/video/closedcaption/src/scc_parse/parser.rs @@ -141,6 +141,12 @@ impl SccParser { } } + pub fn new_scan_captions() -> Self { + Self { + state: State::CaptionOrEmpty, + } + } + pub fn reset(&mut self) { self.state = State::Header; } diff --git a/video/closedcaption/tests/scc_parse.rs b/video/closedcaption/tests/scc_parse.rs index 3e0a449f..b960866e 100644 --- a/video/closedcaption/tests/scc_parse.rs +++ b/video/closedcaption/tests/scc_parse.rs @@ -17,10 +17,12 @@ // Boston, MA 02110-1335, USA. use gst::prelude::*; +use gst::EventView; use gst_video::{ValidVideoTimeCode, VideoTimeCode}; use pretty_assertions::assert_eq; use rand::{Rng, SeedableRng}; use std::collections::VecDeque; +use std::path::PathBuf; fn init() { use std::sync::Once; @@ -199,3 +201,83 @@ fn test_timecodes() { .build() ); } + +#[test] +fn test_pull() { + init(); + + let mut path = PathBuf::from(env!("CARGO_MANIFEST_DIR")); + path.push("tests/dn2018-1217.scc"); + + let mut h = gst_check::Harness::new_parse(&format!("filesrc location={:?} ! sccparse", path)); + + h.play(); + + /* Let's first pull until EOS */ + loop { + let mut done = false; + + while h.events_in_queue() != 0 { + let event = h.pull_event(); + + if let Ok(event) = event { + match event.view() { + EventView::Eos(_) => { + done = true; + break; + } + _ => (), + } + } + } + + while h.buffers_in_queue() != 0 { + let _ = h.pull(); + } + + if done { + break; + } + } + + /* Now seek and check that we receive buffers with appropriate PTS */ + h.push_upstream_event(gst::event::Seek::new( + 1.0, + gst::SeekFlags::FLUSH, + gst::SeekType::Set, + gst::GenericFormattedValue::Time(18 * gst::SECOND), + gst::SeekType::Set, + gst::GenericFormattedValue::Time(19 * gst::SECOND), + )); + + loop { + let mut done = false; + + while h.buffers_in_queue() != 0 { + if let Ok(buffer) = h.pull() { + let pts = buffer.get_pts(); + let end_time = pts + buffer.get_duration(); + + assert!(end_time >= 18 * gst::SECOND && pts < 19 * gst::SECOND); + } + } + + while h.events_in_queue() != 0 { + let event = h.pull_event(); + + if let Ok(event) = event { + match event.view() { + EventView::Eos(_) => { + done = true; + break; + } + _ => (), + } + } + } + + if done { + break; + } + } +}