From 623667af03bfb1d95ca31de2c515bf606c648e62 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Tue, 17 Sep 2024 09:41:35 +0300 Subject: [PATCH] closedcaption: st2038: Handle different alignments correctly in muxer/demuxer Part-of: --- video/closedcaption/src/st2038anc_utils.rs | 25 +- video/closedcaption/src/st2038ancdemux/imp.rs | 225 ++++++++++++------ video/closedcaption/src/st2038ancmux/imp.rs | 193 +++++++++------ 3 files changed, 295 insertions(+), 148 deletions(-) diff --git a/video/closedcaption/src/st2038anc_utils.rs b/video/closedcaption/src/st2038anc_utils.rs index e8f6a9a0..918cb17b 100644 --- a/video/closedcaption/src/st2038anc_utils.rs +++ b/video/closedcaption/src/st2038anc_utils.rs @@ -16,14 +16,18 @@ pub(crate) struct AncDataHeader { pub(crate) line_number: u16, pub(crate) horizontal_offset: u16, pub(crate) data_count: u8, + #[allow(unused)] + pub(crate) checksum: u16, + pub(crate) len: usize, } impl AncDataHeader { - pub(crate) fn from_buffer(buffer: &gst::Buffer) -> anyhow::Result { + pub(crate) fn from_slice(slice: &[u8]) -> anyhow::Result { use anyhow::Context; use bitstream_io::{BigEndian, BitRead, BitReader}; + use std::io::Cursor; - let mut r = BitReader::endian(buffer.as_cursor_readable(), BigEndian); + let mut r = BitReader::endian(Cursor::new(slice), BigEndian); let zeroes = r.read::(6).context("zero bits")?; if zeroes != 0 { @@ -37,6 +41,21 @@ impl AncDataHeader { let sdid = (r.read::(10).context("SDID")? & 0xff) as u8; let data_count = (r.read::(10).context("data count")? & 0xff) as u8; + r.skip(data_count as u32 * 10).context("data")?; + + let checksum = r.read::(10).context("checksum")?; + + while !r.byte_aligned() { + let one = r.read::(1).context("alignment")?; + if one != 1 { + anyhow::bail!("Alignment bits are not ones!"); + } + } + + let len = r.position_in_bits().unwrap(); + assert!(len % 8 == 0); + let len = len as usize / 8; + Ok(AncDataHeader { c_not_y_channel_flag, line_number, @@ -44,6 +63,8 @@ impl AncDataHeader { did, sdid, data_count, + checksum, + len, }) } } diff --git a/video/closedcaption/src/st2038ancdemux/imp.rs b/video/closedcaption/src/st2038ancdemux/imp.rs index a6447c30..a8e66eac 100644 --- a/video/closedcaption/src/st2038ancdemux/imp.rs +++ b/video/closedcaption/src/st2038ancdemux/imp.rs @@ -43,7 +43,7 @@ struct State { last_inactivity_check: Option, } -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] pub struct AncDataId { c_not_y_channel_flag: bool, did: u8, @@ -72,7 +72,7 @@ struct AncStream { impl St2038AncDemux { fn sink_chain( &self, - pad: &gst::Pad, + _pad: &gst::Pad, buffer: gst::Buffer, ) -> Result { let mut state = self.state.borrow_mut(); @@ -80,86 +80,130 @@ impl St2038AncDemux { let ts = buffer.dts_or_pts(); let running_time = state.segment.to_running_time(ts); - let anc_hdr = AncDataHeader::from_buffer(&buffer) - .map_err(|err| { - gst::debug!( - CAT, - imp = self, - "Failed to parse ancillary data header: {err:?}" - ); - // Just push it out on the combined pad and be done with it - return self.srcpad.push(buffer.clone()); - }) - .unwrap(); + let Ok(map) = buffer.map_readable() else { + gst::error!(CAT, imp = self, "Failed to map buffer",); - let stream = match state.streams.get_mut(&AncDataId::from(anc_hdr)) { - Some(stream) => stream, - None => { - let pad_name = format!( - "anc_{:02x}_{:02x}_at_{}_{}", - anc_hdr.did, anc_hdr.sdid, anc_hdr.line_number, anc_hdr.horizontal_offset - ); + // Just push it out on the combined pad and be done with it + drop(state); + let res = self.srcpad.push(buffer); + state = self.state.borrow_mut(); - gst::info!( - CAT, - imp = self, - "New ancillary data stream {pad_name}: {anc_hdr:?}" - ); - - let anc_templ = self.obj().pad_template("anc_%02x_%02x_at_%u_%u").unwrap(); - let anc_srcpad = gst::Pad::builder_from_template(&anc_templ) - .name(pad_name) - .build(); - - anc_srcpad.set_active(true).expect("set pad active"); - - // Forward sticky events from sink pad to new ancillary data source pad - // FIXME: do we want/need to modify the stream id here? caps? - pad.sticky_events_foreach(|event| { - anc_srcpad.push_event(event.clone()); - std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) - }); - - self.obj().add_pad(&anc_srcpad).expect("add pad"); - - state.flow_combiner.add_pad(&anc_srcpad); - - state.streams.insert( - AncDataId::from(anc_hdr), - AncStream { - pad: anc_srcpad, - last_used: running_time, - }, - ); - - state - .streams - .get_mut(&AncDataId::from(anc_hdr)) - .expect("stream") - } + return state.flow_combiner.update_pad_flow(&self.srcpad, res); }; - stream.last_used = running_time; + let mut slice = map.as_slice(); - // Clone pad, so the borrow on stream can be dropped, otherwise compiler will - // complain that stream and state are both borrowed mutably.. - let anc_pad = stream.pad.clone(); - - let anc_flow = anc_pad.push(buffer.clone()); - - let _ = state.flow_combiner.update_pad_flow(&anc_pad, anc_flow); - - // Todo: Check every now and then if any ancillary streams haven't seen any data for a while - if let Some((last_check, rt)) = Option::zip(state.last_inactivity_check, running_time) { - if gst::ClockTime::absdiff(rt, last_check) >= gst::ClockTime::from_seconds(10) { - // gst::fixme!(CAT, imp = self, "Check ancillary streams for inactivity"); - state.last_inactivity_check = running_time; + while !slice.is_empty() { + // Stop on stuffing bytes + if slice[0] == 0b1111_1111 { + break; } + + let start_offset = map.len() - slice.len(); + let anc_hdr = match AncDataHeader::from_slice(slice) { + Ok(anc_hdr) => anc_hdr, + Err(err) => { + gst::debug!( + CAT, + imp = self, + "Failed to parse ancillary data header: {err:?}" + ); + break; + } + }; + let end_offset = start_offset + anc_hdr.len; + + gst::trace!(CAT, imp = self, "Parsed ST2038 header {anc_hdr:?}"); + + let anc_id = AncDataId::from(anc_hdr); + + let stream = match state.streams.get_mut(&anc_id) { + Some(stream) => stream, + None => { + let pad_name = format!( + "anc_{:02x}_{:02x}_at_{}_{}", + anc_id.did, anc_id.sdid, anc_id.line_number, anc_id.horizontal_offset + ); + + gst::info!( + CAT, + imp = self, + "New ancillary data stream {pad_name}: {anc_hdr:?}" + ); + + let anc_templ = self.obj().pad_template("anc_%02x_%02x_at_%u_%u").unwrap(); + let anc_srcpad = gst::Pad::builder_from_template(&anc_templ) + .name(pad_name) + .build(); + + anc_srcpad.set_active(true).expect("set pad active"); + + // Forward sticky events from main source pad to new ancillary data source pad + // FIXME: do we want/need to modify the stream id here? caps? + self.srcpad.sticky_events_foreach(|event| { + let _ = anc_srcpad.store_sticky_event(event); + std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) + }); + + drop(state); + self.obj().add_pad(&anc_srcpad).expect("add pad"); + state = self.state.borrow_mut(); + + state.flow_combiner.add_pad(&anc_srcpad); + + state.streams.insert( + anc_id, + AncStream { + pad: anc_srcpad, + last_used: running_time, + }, + ); + + state.streams.get_mut(&anc_id).expect("stream") + } + }; + + stream.last_used = running_time; + + let Ok(mut sub_buffer) = + buffer.copy_region(gst::BufferCopyFlags::MEMORY, start_offset..end_offset) + else { + gst::error!(CAT, imp = self, "Failed to create sub-buffer"); + break; + }; + { + let sub_buffer = sub_buffer.make_mut(); + let _ = buffer.copy_into(sub_buffer, gst::BUFFER_COPY_METADATA, ..); + } + + let anc_pad = stream.pad.clone(); + + drop(state); + let anc_flow = anc_pad.push(sub_buffer.clone()); + state = self.state.borrow_mut(); + + state.flow_combiner.update_pad_flow(&anc_pad, anc_flow)?; + + // TODO: Check every now and then if any ancillary streams haven't seen any data for a while + if let Some((last_check, rt)) = Option::zip(state.last_inactivity_check, running_time) { + if gst::ClockTime::absdiff(rt, last_check) >= gst::ClockTime::from_seconds(10) { + // gst::fixme!(CAT, imp = self, "Check ancillary streams for inactivity"); + state.last_inactivity_check = running_time; + } + } + + drop(state); + let main_flow = self.srcpad.push(sub_buffer); + state = self.state.borrow_mut(); + + state + .flow_combiner + .update_pad_flow(&self.srcpad, main_flow)?; + + slice = &slice[anc_hdr.len..]; } - let main_flow = self.srcpad.push(buffer); - - state.flow_combiner.update_pad_flow(&self.srcpad, main_flow) + Ok(gst::FlowSuccess::Ok) } fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { @@ -181,6 +225,32 @@ impl St2038AncDemux { .downcast::() .unwrap(); } + EventView::Caps(ev) => { + // Don't forward the caps event directly but set the alignment. + let mut caps = ev.caps_owned(); + { + let caps = caps.make_mut(); + caps.set("alignment", "packet"); + } + + let event = gst::event::Caps::builder(&caps) + .seqnum(event.seqnum()) + .build(); + + let mut ret = self.srcpad.push_event(event.clone()); + let state = self.state.borrow_mut(); + let pads = state + .streams + .values() + .map(|stream| stream.pad.clone()) + .collect::>(); + drop(state); + for pad in pads { + ret |= pad.push_event(event.clone()); + } + + return ret; + } _ => {} } @@ -207,6 +277,9 @@ impl ElementImpl for St2038AncDemux { fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { let caps = gst::Caps::builder("meta/x-st-2038").build(); + let caps_aligned = gst::Caps::builder("meta/x-st-2038") + .field("alignment", "packet") + .build(); let sink_pad_template = gst::PadTemplate::new( "sink", gst::PadDirection::Sink, @@ -221,7 +294,7 @@ impl ElementImpl for St2038AncDemux { "src", gst::PadDirection::Src, gst::PadPresence::Always, - &caps, + &caps_aligned, ) .unwrap(); @@ -229,7 +302,7 @@ impl ElementImpl for St2038AncDemux { "anc_%02x_%02x_at_%u_%u", gst::PadDirection::Src, gst::PadPresence::Sometimes, - &caps, + &caps_aligned, ) .unwrap(); diff --git a/video/closedcaption/src/st2038ancmux/imp.rs b/video/closedcaption/src/st2038ancmux/imp.rs index b4a05a76..97fe7b91 100644 --- a/video/closedcaption/src/st2038ancmux/imp.rs +++ b/video/closedcaption/src/st2038ancmux/imp.rs @@ -20,9 +20,17 @@ use once_cell::sync::Lazy; use crate::st2038anc_utils::AncDataHeader; +#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)] +enum Alignment { + #[default] + Packet, + Line, +} + #[derive(Default)] struct State { downstream_framerate: Option, + alignment: Alignment, } #[derive(Default)] @@ -66,6 +74,7 @@ impl AggregatorImpl for St2038AncMux { gst::ClockTime::ZERO }; let end_running_time = start_running_time + duration; + let alignment = state.alignment; drop(state); gst::trace!( @@ -186,76 +195,105 @@ impl AggregatorImpl for St2038AncMux { continue; } - let header = match AncDataHeader::from_buffer(&buffer) { - Ok(header) => header, - Err(err) => { - gst::warning!( - CAT, - obj = pad, - "Dropping buffer with invalid ST2038 data ({err})" - ); - continue; - } + let Ok(map) = buffer.map_readable() else { + gst::trace!(CAT, obj = pad, "Dropping unmappable buffer"); + continue; }; - gst::trace!(CAT, obj = pad, "Parsed ST2038 header {header:?}"); + let mut slice = map.as_slice(); + while !slice.is_empty() { + // Stop on stuffing bytes + if slice[0] == 0b1111_1111 { + break; + } - // FIXME: One pixel per word of data? ADF header needs to be included in the - // calculation? Two words per pixel because 4:2:2 YUV? Nobody knows! - let buffer_clone = buffer.clone(); // FIXME: To appease the borrow checker - lines - .entry(header.line_number) - .and_modify(|line| { - let new_offset = header.horizontal_offset; - let new_offset_end = header.horizontal_offset + header.data_count as u16; - - for (offset, (offset_end, _pad, _buffer)) in &*line { - // If one of the range starts is between the start/end of the other - // then the two ranges are overlapping. - if (new_offset >= *offset && new_offset < *offset_end) - || (*offset >= new_offset && *offset < new_offset_end) - { - gst::trace!( - CAT, - obj = pad, - "Not including ST2038 packet at {}x{}", - header.line_number, - header.horizontal_offset - ); - return; - } + let start_offset = map.len() - slice.len(); + let header = match AncDataHeader::from_slice(slice) { + Ok(header) => header, + Err(err) => { + gst::warning!( + CAT, + obj = pad, + "Dropping buffer with invalid ST2038 data ({err})" + ); + continue; } + }; + let end_offset = start_offset + header.len; - gst::trace!( - CAT, - obj = pad, - "Including ST2038 packet at {}x{}", - header.line_number, - header.horizontal_offset - ); + gst::trace!(CAT, obj = pad, "Parsed ST2038 header {header:?}"); - line.insert(new_offset, (new_offset_end, pad.clone(), buffer)); - }) - .or_insert_with(|| { - gst::trace!( - CAT, - obj = pad, - "Including ST2038 packet at {}x{}", - header.line_number, - header.horizontal_offset - ); + let Ok(mut sub_buffer) = + buffer.copy_region(gst::BufferCopyFlags::MEMORY, start_offset..end_offset) + else { + gst::error!(CAT, imp = self, "Failed to create sub-buffer"); + break; + }; + { + let sub_buffer = sub_buffer.make_mut(); + let _ = buffer.copy_into(sub_buffer, gst::BUFFER_COPY_METADATA, ..); + } - let mut line = BTreeMap::new(); - line.insert( - header.horizontal_offset, - ( - header.horizontal_offset + header.data_count as u16, - pad.clone(), - buffer_clone, - ), - ); - line - }); + // FIXME: One pixel per word of data? ADF header needs to be included in the + // calculation? Two words per pixel because 4:2:2 YUV? Nobody knows! + let sub_buffer_clone = sub_buffer.clone(); // FIXME: To appease the borrow checker + lines + .entry(header.line_number) + .and_modify(|line| { + let new_offset = header.horizontal_offset; + let new_offset_end = + header.horizontal_offset + header.data_count as u16; + + for (offset, (offset_end, _pad, _buffer)) in &*line { + // If one of the range starts is between the start/end of the other + // then the two ranges are overlapping. + if (new_offset >= *offset && new_offset < *offset_end) + || (*offset >= new_offset && *offset < new_offset_end) + { + gst::trace!( + CAT, + obj = pad, + "Not including ST2038 packet at {}x{}", + header.line_number, + header.horizontal_offset + ); + return; + } + } + + gst::trace!( + CAT, + obj = pad, + "Including ST2038 packet at {}x{}", + header.line_number, + header.horizontal_offset + ); + + line.insert(new_offset, (new_offset_end, pad.clone(), sub_buffer)); + }) + .or_insert_with(|| { + gst::trace!( + CAT, + obj = pad, + "Including ST2038 packet at {}x{}", + header.line_number, + header.horizontal_offset + ); + + let mut line = BTreeMap::new(); + line.insert( + header.horizontal_offset, + ( + header.horizontal_offset + header.data_count as u16, + pad.clone(), + sub_buffer_clone, + ), + ); + line + }); + + slice = &slice[header.len..]; + } } } @@ -268,7 +306,8 @@ impl AggregatorImpl for St2038AncMux { for (line_idx, line) in lines { // If there are multiple buffers for a line then merge them into a single buffer - if line.len() == 1 { + // unless packet alignment is selected + if line.len() == 1 || alignment == Alignment::Packet { for (horizontal_offset, (_, _pad, buffer)) in line { gst::trace!( CAT, @@ -293,6 +332,7 @@ impl AggregatorImpl for St2038AncMux { } new_buffer.append(buffer); } + buffers_ref.add(new_buffer); } } @@ -435,13 +475,23 @@ impl AggregatorImpl for St2038AncMux { } peer_caps.fixate(); - let framerate = peer_caps - .structure(0) - .unwrap() - .get::("framerate") - .ok(); + + let s = peer_caps.structure(0).unwrap(); + let framerate = s.get::("framerate").ok(); + + let alignment = match s.get::<&str>("alignment").ok() { + Some("packet") => Alignment::Packet, + Some("line") => Alignment::Line, + _ => { + let peer_caps = peer_caps.make_mut(); + peer_caps.set("alignment", "packet"); + Alignment::Packet + } + }; let mut state = self.state.lock().unwrap(); + gst::debug!(CAT, imp = self, "Configuring alignment {alignment:?}"); + state.alignment = alignment; if let Some(framerate) = framerate { gst::debug!( CAT, @@ -536,7 +586,9 @@ impl ElementImpl for St2038AncMux { fn pad_templates() -> &'static [gst::PadTemplate] { static PAD_TEMPLATES: Lazy> = Lazy::new(|| { - let caps = gst::Caps::new_empty_simple("meta/x-st-2038"); + let caps = gst::Caps::builder("meta/x-st-2038") + .field("alignment", gst::List::new(["packet", "line"])) + .build(); let src_pad_template = gst::PadTemplate::builder( "src", gst::PadDirection::Src, @@ -547,6 +599,7 @@ impl ElementImpl for St2038AncMux { .build() .unwrap(); + let caps = gst::Caps::builder("meta/x-st-2038").build(); let sink_pad_template = gst::PadTemplate::builder( "sink_%u", gst::PadDirection::Sink,