From 45ebb4c629b8903d4cb3ba04579b4b992a68ec87 Mon Sep 17 00:00:00 2001 From: Jordan Petridis Date: Mon, 21 Jan 2019 21:45:00 +0200 Subject: [PATCH] mccparse: Refactor huge line-handling function into smaller separate functions * The Debug category was moved to a lazy_static!. This allowed for a couple of methods, to be implemented directly against the State struct since the debug category was their only dependency from MccParse. * Log the Caps/Format change --- gst-plugin-closedcaption/Cargo.toml | 1 + gst-plugin-closedcaption/src/lib.rs | 2 + gst-plugin-closedcaption/src/mcc_parse.rs | 411 ++++++++++++---------- 3 files changed, 230 insertions(+), 184 deletions(-) diff --git a/gst-plugin-closedcaption/Cargo.toml b/gst-plugin-closedcaption/Cargo.toml index 81c1e31b..39a8b29c 100644 --- a/gst-plugin-closedcaption/Cargo.toml +++ b/gst-plugin-closedcaption/Cargo.toml @@ -11,6 +11,7 @@ combine = "3.6" either = "1" uuid = { version = "0.7", features = ["v4"] } chrono = "0.4" +lazy_static = "1.2" [dependencies.gst] git = "https://gitlab.freedesktop.org/gstreamer/gstreamer-rs" diff --git a/gst-plugin-closedcaption/src/lib.rs b/gst-plugin-closedcaption/src/lib.rs index 45e83a11..cbe62c03 100644 --- a/gst-plugin-closedcaption/src/lib.rs +++ b/gst-plugin-closedcaption/src/lib.rs @@ -26,6 +26,8 @@ extern crate glib; #[macro_use] extern crate gst; +#[macro_use] +extern crate lazy_static; #[cfg(test)] #[macro_use] diff --git a/gst-plugin-closedcaption/src/mcc_parse.rs b/gst-plugin-closedcaption/src/mcc_parse.rs index c9808eb2..3ec5fe9e 100644 --- a/gst-plugin-closedcaption/src/mcc_parse.rs +++ b/gst-plugin-closedcaption/src/mcc_parse.rs @@ -22,12 +22,22 @@ use glib::subclass::prelude::*; use gst; use gst::prelude::*; use gst::subclass::prelude::*; -use gst_video; +use gst_video::{self, ValidVideoTimeCode}; -use std::sync::Mutex; +use std::sync::{Mutex, MutexGuard}; use crate::line_reader::LineReader; -use crate::mcc_parser::{MccLine, MccParser}; +use crate::mcc_parser::{MccLine, MccParser, TimeCode}; + +lazy_static! { + static ref CAT: gst::DebugCategory = { + gst::DebugCategory::new( + "mccparse", + gst::DebugColorFlags::empty(), + "Mcc Parser Element", + ) + }; +} #[derive(Clone, Copy, Debug, PartialEq, Eq)] enum Format { @@ -87,10 +97,155 @@ impl State { .map(Option::Some) .map_err(|err| (line, err)) } + + fn handle_timecode( + &mut self, + element: &gst::Element, + tc: TimeCode, + framerate: gst::Fraction, + drop_frame: bool, + ) -> Result { + let timecode = gst_video::VideoTimeCode::new( + framerate, + None, + if drop_frame { + gst_video::VideoTimeCodeFlags::DROP_FRAME + } else { + gst_video::VideoTimeCodeFlags::empty() + }, + tc.hours, + tc.minutes, + tc.seconds, + tc.frames, + 0, + ); + + match timecode.try_into() { + Ok(timecode) => Ok(timecode), + Err(timecode) => { + let last_timecode = + self.last_timecode + .as_ref() + .map(Clone::clone) + .ok_or_else(|| { + gst_element_error!( + element, + gst::StreamError::Decode, + ["Invalid first timecode {:?}", timecode] + ); + + gst::FlowError::Error + })?; + + gst_warning!( + CAT, + obj: element, + "Invalid timecode {:?}, using previous {:?}", + timecode, + last_timecode + ); + + Ok(last_timecode) + } + } + } + + /// Calculate a timestamp from the timecode and make sure to + /// not produce timestamps jumping backwards + fn update_timestamp( + &mut self, + element: &gst::Element, + timecode: &gst_video::ValidVideoTimeCode, + ) { + let nsecs = gst::ClockTime::from(timecode.nsec_since_daily_jam()); + if self.start_position.is_none() { + self.start_position = nsecs; + } + + let nsecs = if nsecs < self.start_position { + gst_fixme!( + CAT, + obj: element, + "New position {} < start position {}", + nsecs, + self.start_position + ); + self.start_position + } else { + nsecs - self.start_position + }; + + if nsecs >= self.last_position { + self.last_position = nsecs; + } else { + gst_fixme!( + CAT, + obj: element, + "New position {} < last position {}", + nsecs, + self.last_position + ); + } + } + + fn add_buffer_metadata( + &mut self, + element: &gst::Element, + buffer: &mut gst::buffer::Buffer, + timecode: &gst_video::ValidVideoTimeCode, + framerate: &gst::Fraction, + ) { + let buffer = buffer.get_mut().unwrap(); + gst_video::VideoTimeCodeMeta::add(buffer, &timecode); + + self.update_timestamp(element, &timecode); + + buffer.set_pts(self.last_position); + buffer.set_duration( + gst::SECOND + .mul_div_ceil(*framerate.denom() as u64, *framerate.numer() as u64) + .unwrap_or(gst::CLOCK_TIME_NONE), + ); + } + + fn create_events( + &mut self, + element: &gst::Element, + format: Format, + framerate: &gst::Fraction, + ) -> Vec { + let mut events = Vec::new(); + + if self.format != Some(format) { + self.format = Some(format); + + let caps = match format { + Format::Cea708Cdp => gst::Caps::builder("closedcaption/x-cea-708") + .field("format", &"cdp") + .field("framerate", framerate) + .build(), + Format::Cea608 => gst::Caps::builder("closedcaption/x-cea-608") + .field("format", &"s334-1a") + .field("framerate", framerate) + .build(), + }; + + events.push(gst::Event::new_caps(&caps).build()); + gst_info!(CAT, obj: element, "Caps changed to {:?}", &caps); + } + + if self.need_segment { + let segment = gst::FormattedSegment::::new(); + events.push(gst::Event::new_segment(&segment).build()); + self.need_segment = false; + } + + events.extend(self.pending_events.drain(..)); + events + } } struct MccParse { - cat: gst::DebugCategory, srcpad: gst::Pad, sinkpad: gst::Pad, state: Mutex, @@ -178,7 +333,7 @@ impl MccParse { match line { Ok(Some(MccLine::Caption(tc, data))) => { gst_trace!( - self.cat, + CAT, obj: element, "Got caption buffer with timecode {:?} and size {}", tc, @@ -187,7 +342,7 @@ impl MccParse { if data.len() < 3 { gst_debug!( - self.cat, + CAT, obj: element, "Too small caption packet: {}", data.len(), @@ -199,13 +354,7 @@ impl MccParse { (0x61, 0x01) => Format::Cea708Cdp, (0x61, 0x02) => Format::Cea608, (did, sdid) => { - gst_debug!( - self.cat, - obj: element, - "Unknown DID {:x} SDID {:x}", - did, - sdid - ); + gst_debug!(CAT, obj: element, "Unknown DID {:x} SDID {:x}", did, sdid); continue; } }; @@ -213,7 +362,7 @@ impl MccParse { let len = data[2]; if data.len() < 3 + len as usize { gst_debug!( - self.cat, + CAT, obj: element, "Too small caption packet: {} < {}", data.len(), @@ -222,158 +371,11 @@ impl MccParse { continue; } - let (framerate, drop_frame) = match state.timecode_rate { - Some((rate, false)) => (gst::Fraction::new(rate as i32, 1), false), - Some((rate, true)) => (gst::Fraction::new(rate as i32 * 1000, 1001), true), - None => { - gst_element_error!( - element, - gst::StreamError::Decode, - ["Got caption before time code rate"] - ); - - break Err(gst::FlowError::Error); - } - }; - - let mut events = Vec::new(); - - if state.format != Some(format) { - state.format = Some(format); - - let caps = match format { - Format::Cea708Cdp => gst::Caps::builder("closedcaption/x-cea-708") - .field("format", &"cdp") - .field("framerate", &framerate) - .build(), - Format::Cea608 => gst::Caps::builder("closedcaption/x-cea-608") - .field("format", &"s334-1a") - .field("framerate", &framerate) - .build(), - }; - - events.push(gst::Event::new_caps(&caps).build()); - } - - if state.need_segment { - let segment = gst::FormattedSegment::::new(); - events.push(gst::Event::new_segment(&segment).build()); - state.need_segment = false; - } - - events.extend(state.pending_events.drain(..)); - - let timecode = gst_video::VideoTimeCode::new( - framerate, - None, - if drop_frame { - gst_video::VideoTimeCodeFlags::DROP_FRAME - } else { - gst_video::VideoTimeCodeFlags::empty() - }, - tc.hours, - tc.minutes, - tc.seconds, - tc.frames, - 0, - ); - - let timecode = match timecode.try_into() { - Ok(timecode) => timecode, - Err(timecode) => { - let last_timecode = - state.last_timecode.as_ref().map(Clone::clone).ok_or_else( - || { - gst_element_error!( - element, - gst::StreamError::Decode, - ["Invalid first timecode {:?}", timecode] - ); - - gst::FlowError::Error - }, - )?; - - gst_warning!( - self.cat, - obj: element, - "Invalid timecode {:?}, using previous {:?}", - timecode, - last_timecode - ); - - last_timecode - } - }; - - let mut buffer = gst::Buffer::from_mut_slice(OffsetVec { - vec: data, - offset: 3, - len: len as usize, - }); - - { - let buffer = buffer.get_mut().unwrap(); - gst_video::VideoTimeCodeMeta::add(buffer, &timecode); - - // Calculate a timestamp from the timecode and make sure to - // not produce timestamps jumping backwards - let nsecs = gst::ClockTime::from(timecode.nsec_since_daily_jam()); - if state.start_position.is_none() { - state.start_position = nsecs; - } - - let nsecs = if nsecs < state.start_position { - gst_fixme!( - self.cat, - obj: element, - "New position {} < start position {}", - nsecs, - state.start_position - ); - state.start_position - } else { - nsecs - state.start_position - }; - - if nsecs >= state.last_position { - state.last_position = nsecs; - } else { - gst_fixme!( - self.cat, - obj: element, - "New position {} < last position {}", - nsecs, - state.last_position - ); - } - - buffer.set_pts(state.last_position); - buffer.set_duration( - gst::SECOND - .mul_div_ceil(*framerate.denom() as u64, *framerate.numer() as u64) - .unwrap_or(gst::CLOCK_TIME_NONE), - ); - } - - // Drop our state mutex while we push out buffers or events - drop(state); - - for event in events { - gst_debug!(self.cat, obj: element, "Pushing event {:?}", event); - self.srcpad.push_event(event); - } - - self.srcpad.push(buffer).map_err(|err| { - gst_error!(self.cat, obj: element, "Pushing buffer returned {:?}", err); - err - })?; - - state = self.state.lock().unwrap(); + state = self.handle_line(element, tc, data, format, state)?; } Ok(Some(MccLine::TimeCodeRate(rate, df))) => { gst_debug!( - self.cat, + CAT, obj: element, "Got timecode rate {} (drop frame {})", rate, @@ -382,7 +384,7 @@ impl MccParse { state.timecode_rate = Some((rate, df)); } Ok(Some(line)) => { - gst_debug!(self.cat, obj: element, "Got line '{:?}'", line); + gst_debug!(CAT, obj: element, "Got line '{:?}'", line); } Err((line, err)) => { gst_element_error!( @@ -398,13 +400,63 @@ impl MccParse { } } + fn handle_line( + &self, + element: &gst::Element, + tc: TimeCode, + data: Vec, + format: Format, + mut state: MutexGuard, + ) -> Result, gst::FlowError> { + let (framerate, drop_frame) = match state.timecode_rate { + Some((rate, false)) => (gst::Fraction::new(rate as i32, 1), false), + Some((rate, true)) => (gst::Fraction::new(rate as i32 * 1000, 1001), true), + None => { + gst_element_error!( + element, + gst::StreamError::Decode, + ["Got caption before time code rate"] + ); + + return Err(gst::FlowError::Error); + } + }; + + let events = state.create_events(element, format, &framerate); + let timecode = state.handle_timecode(element, tc, framerate, drop_frame)?; + + let len = data[2] as usize; + let mut buffer = gst::Buffer::from_mut_slice(OffsetVec { + vec: data, + offset: 3, + len, + }); + + state.add_buffer_metadata(element, &mut buffer, &timecode, &framerate); + + // Drop our state mutex while we push out buffers or events + drop(state); + + for event in events { + gst_debug!(CAT, obj: element, "Pushing event {:?}", event); + self.srcpad.push_event(event); + } + + self.srcpad.push(buffer).map_err(|err| { + gst_error!(CAT, obj: element, "Pushing buffer returned {:?}", err); + err + })?; + + Ok(self.state.lock().unwrap()) + } + fn sink_chain( &self, pad: &gst::Pad, element: &gst::Element, buffer: gst::Buffer, ) -> Result { - gst_log!(self.cat, obj: pad, "Handling buffer {:?}", buffer); + gst_log!(CAT, obj: pad, "Handling buffer {:?}", buffer); self.handle_buffer(element, Some(buffer)) } @@ -412,17 +464,17 @@ impl MccParse { fn sink_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { use gst::EventView; - gst_log!(self.cat, obj: pad, "Handling event {:?}", event); + gst_log!(CAT, obj: pad, "Handling event {:?}", event); match event.view() { EventView::Caps(_) => { // We send a proper caps event from the chain function later - gst_log!(self.cat, obj: pad, "Dropping caps event"); + gst_log!(CAT, obj: pad, "Dropping caps event"); true } EventView::Segment(_) => { // We send a gst::Format::Time segment event later when needed - gst_log!(self.cat, obj: pad, "Dropping segment event"); + gst_log!(CAT, obj: pad, "Dropping segment event"); true } EventView::FlushStop(_) => { @@ -435,9 +487,9 @@ impl MccParse { pad.event_default(element, event) } EventView::Eos(_) => { - gst_log!(self.cat, obj: pad, "Draining"); + gst_log!(CAT, obj: pad, "Draining"); if let Err(err) = self.handle_buffer(element, None) { - gst_error!(self.cat, obj: pad, "Failed to drain parser: {:?}", err); + gst_error!(CAT, obj: pad, "Failed to drain parser: {:?}", err); } pad.event_default(element, event) } @@ -446,11 +498,7 @@ impl MccParse { && !self.srcpad.has_current_caps() && event.get_type() > gst::EventType::Caps { - gst_log!( - self.cat, - obj: pad, - "Deferring sticky event until we have caps" - ); + gst_log!(CAT, obj: pad, "Deferring sticky event until we have caps"); let mut state = self.state.lock().unwrap(); state.pending_events.push(event); true @@ -464,10 +512,10 @@ impl MccParse { fn src_event(&self, pad: &gst::Pad, element: &gst::Element, event: gst::Event) -> bool { use gst::EventView; - gst_log!(self.cat, obj: pad, "Handling event {:?}", event); + gst_log!(CAT, obj: pad, "Handling event {:?}", event); match event.view() { EventView::Seek(_) => { - gst_log!(self.cat, obj: pad, "Dropping seek event"); + gst_log!(CAT, obj: pad, "Dropping seek event"); false } _ => pad.event_default(element, event), @@ -477,7 +525,7 @@ impl MccParse { fn src_query(&self, pad: &gst::Pad, element: &gst::Element, query: &mut gst::QueryRef) -> bool { use gst::QueryView; - gst_log!(self.cat, obj: pad, "Handling query {:?}", query); + gst_log!(CAT, obj: pad, "Handling query {:?}", query); match query.view_mut() { QueryView::Seeking(mut q) => { @@ -522,11 +570,6 @@ impl ObjectSubclass for MccParse { MccParse::set_pad_functions(&sinkpad, &srcpad); Self { - cat: gst::DebugCategory::new( - "mccparse", - gst::DebugColorFlags::empty(), - "Mcc Parser Element", - ), srcpad, sinkpad, state: Mutex::new(State::default()), @@ -594,7 +637,7 @@ impl ElementImpl for MccParse { element: &gst::Element, transition: gst::StateChange, ) -> Result { - gst_trace!(self.cat, obj: element, "Changing state {:?}", transition); + gst_trace!(CAT, obj: element, "Changing state {:?}", transition); match transition { gst::StateChange::ReadyToPaused | gst::StateChange::PausedToReady => {