closedcaption: st2038: Handle different alignments correctly in muxer/demuxer

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1777>
This commit is contained in:
Sebastian Dröge 2024-09-17 09:41:35 +03:00 committed by GStreamer Marge Bot
parent b2e37d3c98
commit 623667af03
3 changed files with 295 additions and 148 deletions

View file

@ -16,14 +16,18 @@ pub(crate) struct AncDataHeader {
pub(crate) line_number: u16,
pub(crate) horizontal_offset: u16,
pub(crate) data_count: u8,
#[allow(unused)]
pub(crate) checksum: u16,
pub(crate) len: usize,
}
impl AncDataHeader {
pub(crate) fn from_buffer(buffer: &gst::Buffer) -> anyhow::Result<AncDataHeader> {
pub(crate) fn from_slice(slice: &[u8]) -> anyhow::Result<AncDataHeader> {
use anyhow::Context;
use bitstream_io::{BigEndian, BitRead, BitReader};
use std::io::Cursor;
let mut r = BitReader::endian(buffer.as_cursor_readable(), BigEndian);
let mut r = BitReader::endian(Cursor::new(slice), BigEndian);
let zeroes = r.read::<u8>(6).context("zero bits")?;
if zeroes != 0 {
@ -37,6 +41,21 @@ impl AncDataHeader {
let sdid = (r.read::<u16>(10).context("SDID")? & 0xff) as u8;
let data_count = (r.read::<u16>(10).context("data count")? & 0xff) as u8;
r.skip(data_count as u32 * 10).context("data")?;
let checksum = r.read::<u16>(10).context("checksum")?;
while !r.byte_aligned() {
let one = r.read::<u8>(1).context("alignment")?;
if one != 1 {
anyhow::bail!("Alignment bits are not ones!");
}
}
let len = r.position_in_bits().unwrap();
assert!(len % 8 == 0);
let len = len as usize / 8;
Ok(AncDataHeader {
c_not_y_channel_flag,
line_number,
@ -44,6 +63,8 @@ impl AncDataHeader {
did,
sdid,
data_count,
checksum,
len,
})
}
}

View file

@ -43,7 +43,7 @@ struct State {
last_inactivity_check: Option<gst::ClockTime>,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)]
pub struct AncDataId {
c_not_y_channel_flag: bool,
did: u8,
@ -72,7 +72,7 @@ struct AncStream {
impl St2038AncDemux {
fn sink_chain(
&self,
pad: &gst::Pad,
_pad: &gst::Pad,
buffer: gst::Buffer,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.borrow_mut();
@ -80,86 +80,130 @@ impl St2038AncDemux {
let ts = buffer.dts_or_pts();
let running_time = state.segment.to_running_time(ts);
let anc_hdr = AncDataHeader::from_buffer(&buffer)
.map_err(|err| {
gst::debug!(
CAT,
imp = self,
"Failed to parse ancillary data header: {err:?}"
);
// Just push it out on the combined pad and be done with it
return self.srcpad.push(buffer.clone());
})
.unwrap();
let Ok(map) = buffer.map_readable() else {
gst::error!(CAT, imp = self, "Failed to map buffer",);
let stream = match state.streams.get_mut(&AncDataId::from(anc_hdr)) {
Some(stream) => stream,
None => {
let pad_name = format!(
"anc_{:02x}_{:02x}_at_{}_{}",
anc_hdr.did, anc_hdr.sdid, anc_hdr.line_number, anc_hdr.horizontal_offset
);
// Just push it out on the combined pad and be done with it
drop(state);
let res = self.srcpad.push(buffer);
state = self.state.borrow_mut();
gst::info!(
CAT,
imp = self,
"New ancillary data stream {pad_name}: {anc_hdr:?}"
);
let anc_templ = self.obj().pad_template("anc_%02x_%02x_at_%u_%u").unwrap();
let anc_srcpad = gst::Pad::builder_from_template(&anc_templ)
.name(pad_name)
.build();
anc_srcpad.set_active(true).expect("set pad active");
// Forward sticky events from sink pad to new ancillary data source pad
// FIXME: do we want/need to modify the stream id here? caps?
pad.sticky_events_foreach(|event| {
anc_srcpad.push_event(event.clone());
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
});
self.obj().add_pad(&anc_srcpad).expect("add pad");
state.flow_combiner.add_pad(&anc_srcpad);
state.streams.insert(
AncDataId::from(anc_hdr),
AncStream {
pad: anc_srcpad,
last_used: running_time,
},
);
state
.streams
.get_mut(&AncDataId::from(anc_hdr))
.expect("stream")
}
return state.flow_combiner.update_pad_flow(&self.srcpad, res);
};
stream.last_used = running_time;
let mut slice = map.as_slice();
// Clone pad, so the borrow on stream can be dropped, otherwise compiler will
// complain that stream and state are both borrowed mutably..
let anc_pad = stream.pad.clone();
let anc_flow = anc_pad.push(buffer.clone());
let _ = state.flow_combiner.update_pad_flow(&anc_pad, anc_flow);
// Todo: Check every now and then if any ancillary streams haven't seen any data for a while
if let Some((last_check, rt)) = Option::zip(state.last_inactivity_check, running_time) {
if gst::ClockTime::absdiff(rt, last_check) >= gst::ClockTime::from_seconds(10) {
// gst::fixme!(CAT, imp = self, "Check ancillary streams for inactivity");
state.last_inactivity_check = running_time;
while !slice.is_empty() {
// Stop on stuffing bytes
if slice[0] == 0b1111_1111 {
break;
}
let start_offset = map.len() - slice.len();
let anc_hdr = match AncDataHeader::from_slice(slice) {
Ok(anc_hdr) => anc_hdr,
Err(err) => {
gst::debug!(
CAT,
imp = self,
"Failed to parse ancillary data header: {err:?}"
);
break;
}
};
let end_offset = start_offset + anc_hdr.len;
gst::trace!(CAT, imp = self, "Parsed ST2038 header {anc_hdr:?}");
let anc_id = AncDataId::from(anc_hdr);
let stream = match state.streams.get_mut(&anc_id) {
Some(stream) => stream,
None => {
let pad_name = format!(
"anc_{:02x}_{:02x}_at_{}_{}",
anc_id.did, anc_id.sdid, anc_id.line_number, anc_id.horizontal_offset
);
gst::info!(
CAT,
imp = self,
"New ancillary data stream {pad_name}: {anc_hdr:?}"
);
let anc_templ = self.obj().pad_template("anc_%02x_%02x_at_%u_%u").unwrap();
let anc_srcpad = gst::Pad::builder_from_template(&anc_templ)
.name(pad_name)
.build();
anc_srcpad.set_active(true).expect("set pad active");
// Forward sticky events from main source pad to new ancillary data source pad
// FIXME: do we want/need to modify the stream id here? caps?
self.srcpad.sticky_events_foreach(|event| {
let _ = anc_srcpad.store_sticky_event(event);
std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep)
});
drop(state);
self.obj().add_pad(&anc_srcpad).expect("add pad");
state = self.state.borrow_mut();
state.flow_combiner.add_pad(&anc_srcpad);
state.streams.insert(
anc_id,
AncStream {
pad: anc_srcpad,
last_used: running_time,
},
);
state.streams.get_mut(&anc_id).expect("stream")
}
};
stream.last_used = running_time;
let Ok(mut sub_buffer) =
buffer.copy_region(gst::BufferCopyFlags::MEMORY, start_offset..end_offset)
else {
gst::error!(CAT, imp = self, "Failed to create sub-buffer");
break;
};
{
let sub_buffer = sub_buffer.make_mut();
let _ = buffer.copy_into(sub_buffer, gst::BUFFER_COPY_METADATA, ..);
}
let anc_pad = stream.pad.clone();
drop(state);
let anc_flow = anc_pad.push(sub_buffer.clone());
state = self.state.borrow_mut();
state.flow_combiner.update_pad_flow(&anc_pad, anc_flow)?;
// TODO: Check every now and then if any ancillary streams haven't seen any data for a while
if let Some((last_check, rt)) = Option::zip(state.last_inactivity_check, running_time) {
if gst::ClockTime::absdiff(rt, last_check) >= gst::ClockTime::from_seconds(10) {
// gst::fixme!(CAT, imp = self, "Check ancillary streams for inactivity");
state.last_inactivity_check = running_time;
}
}
drop(state);
let main_flow = self.srcpad.push(sub_buffer);
state = self.state.borrow_mut();
state
.flow_combiner
.update_pad_flow(&self.srcpad, main_flow)?;
slice = &slice[anc_hdr.len..];
}
let main_flow = self.srcpad.push(buffer);
state.flow_combiner.update_pad_flow(&self.srcpad, main_flow)
Ok(gst::FlowSuccess::Ok)
}
fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool {
@ -181,6 +225,32 @@ impl St2038AncDemux {
.downcast::<gst::format::Time>()
.unwrap();
}
EventView::Caps(ev) => {
// Don't forward the caps event directly but set the alignment.
let mut caps = ev.caps_owned();
{
let caps = caps.make_mut();
caps.set("alignment", "packet");
}
let event = gst::event::Caps::builder(&caps)
.seqnum(event.seqnum())
.build();
let mut ret = self.srcpad.push_event(event.clone());
let state = self.state.borrow_mut();
let pads = state
.streams
.values()
.map(|stream| stream.pad.clone())
.collect::<Vec<_>>();
drop(state);
for pad in pads {
ret |= pad.push_event(event.clone());
}
return ret;
}
_ => {}
}
@ -207,6 +277,9 @@ impl ElementImpl for St2038AncDemux {
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::builder("meta/x-st-2038").build();
let caps_aligned = gst::Caps::builder("meta/x-st-2038")
.field("alignment", "packet")
.build();
let sink_pad_template = gst::PadTemplate::new(
"sink",
gst::PadDirection::Sink,
@ -221,7 +294,7 @@ impl ElementImpl for St2038AncDemux {
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
&caps_aligned,
)
.unwrap();
@ -229,7 +302,7 @@ impl ElementImpl for St2038AncDemux {
"anc_%02x_%02x_at_%u_%u",
gst::PadDirection::Src,
gst::PadPresence::Sometimes,
&caps,
&caps_aligned,
)
.unwrap();

View file

@ -20,9 +20,17 @@ use once_cell::sync::Lazy;
use crate::st2038anc_utils::AncDataHeader;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
enum Alignment {
#[default]
Packet,
Line,
}
#[derive(Default)]
struct State {
downstream_framerate: Option<gst::Fraction>,
alignment: Alignment,
}
#[derive(Default)]
@ -66,6 +74,7 @@ impl AggregatorImpl for St2038AncMux {
gst::ClockTime::ZERO
};
let end_running_time = start_running_time + duration;
let alignment = state.alignment;
drop(state);
gst::trace!(
@ -186,76 +195,105 @@ impl AggregatorImpl for St2038AncMux {
continue;
}
let header = match AncDataHeader::from_buffer(&buffer) {
Ok(header) => header,
Err(err) => {
gst::warning!(
CAT,
obj = pad,
"Dropping buffer with invalid ST2038 data ({err})"
);
continue;
}
let Ok(map) = buffer.map_readable() else {
gst::trace!(CAT, obj = pad, "Dropping unmappable buffer");
continue;
};
gst::trace!(CAT, obj = pad, "Parsed ST2038 header {header:?}");
let mut slice = map.as_slice();
while !slice.is_empty() {
// Stop on stuffing bytes
if slice[0] == 0b1111_1111 {
break;
}
// FIXME: One pixel per word of data? ADF header needs to be included in the
// calculation? Two words per pixel because 4:2:2 YUV? Nobody knows!
let buffer_clone = buffer.clone(); // FIXME: To appease the borrow checker
lines
.entry(header.line_number)
.and_modify(|line| {
let new_offset = header.horizontal_offset;
let new_offset_end = header.horizontal_offset + header.data_count as u16;
for (offset, (offset_end, _pad, _buffer)) in &*line {
// If one of the range starts is between the start/end of the other
// then the two ranges are overlapping.
if (new_offset >= *offset && new_offset < *offset_end)
|| (*offset >= new_offset && *offset < new_offset_end)
{
gst::trace!(
CAT,
obj = pad,
"Not including ST2038 packet at {}x{}",
header.line_number,
header.horizontal_offset
);
return;
}
let start_offset = map.len() - slice.len();
let header = match AncDataHeader::from_slice(slice) {
Ok(header) => header,
Err(err) => {
gst::warning!(
CAT,
obj = pad,
"Dropping buffer with invalid ST2038 data ({err})"
);
continue;
}
};
let end_offset = start_offset + header.len;
gst::trace!(
CAT,
obj = pad,
"Including ST2038 packet at {}x{}",
header.line_number,
header.horizontal_offset
);
gst::trace!(CAT, obj = pad, "Parsed ST2038 header {header:?}");
line.insert(new_offset, (new_offset_end, pad.clone(), buffer));
})
.or_insert_with(|| {
gst::trace!(
CAT,
obj = pad,
"Including ST2038 packet at {}x{}",
header.line_number,
header.horizontal_offset
);
let Ok(mut sub_buffer) =
buffer.copy_region(gst::BufferCopyFlags::MEMORY, start_offset..end_offset)
else {
gst::error!(CAT, imp = self, "Failed to create sub-buffer");
break;
};
{
let sub_buffer = sub_buffer.make_mut();
let _ = buffer.copy_into(sub_buffer, gst::BUFFER_COPY_METADATA, ..);
}
let mut line = BTreeMap::new();
line.insert(
header.horizontal_offset,
(
header.horizontal_offset + header.data_count as u16,
pad.clone(),
buffer_clone,
),
);
line
});
// FIXME: One pixel per word of data? ADF header needs to be included in the
// calculation? Two words per pixel because 4:2:2 YUV? Nobody knows!
let sub_buffer_clone = sub_buffer.clone(); // FIXME: To appease the borrow checker
lines
.entry(header.line_number)
.and_modify(|line| {
let new_offset = header.horizontal_offset;
let new_offset_end =
header.horizontal_offset + header.data_count as u16;
for (offset, (offset_end, _pad, _buffer)) in &*line {
// If one of the range starts is between the start/end of the other
// then the two ranges are overlapping.
if (new_offset >= *offset && new_offset < *offset_end)
|| (*offset >= new_offset && *offset < new_offset_end)
{
gst::trace!(
CAT,
obj = pad,
"Not including ST2038 packet at {}x{}",
header.line_number,
header.horizontal_offset
);
return;
}
}
gst::trace!(
CAT,
obj = pad,
"Including ST2038 packet at {}x{}",
header.line_number,
header.horizontal_offset
);
line.insert(new_offset, (new_offset_end, pad.clone(), sub_buffer));
})
.or_insert_with(|| {
gst::trace!(
CAT,
obj = pad,
"Including ST2038 packet at {}x{}",
header.line_number,
header.horizontal_offset
);
let mut line = BTreeMap::new();
line.insert(
header.horizontal_offset,
(
header.horizontal_offset + header.data_count as u16,
pad.clone(),
sub_buffer_clone,
),
);
line
});
slice = &slice[header.len..];
}
}
}
@ -268,7 +306,8 @@ impl AggregatorImpl for St2038AncMux {
for (line_idx, line) in lines {
// If there are multiple buffers for a line then merge them into a single buffer
if line.len() == 1 {
// unless packet alignment is selected
if line.len() == 1 || alignment == Alignment::Packet {
for (horizontal_offset, (_, _pad, buffer)) in line {
gst::trace!(
CAT,
@ -293,6 +332,7 @@ impl AggregatorImpl for St2038AncMux {
}
new_buffer.append(buffer);
}
buffers_ref.add(new_buffer);
}
}
@ -435,13 +475,23 @@ impl AggregatorImpl for St2038AncMux {
}
peer_caps.fixate();
let framerate = peer_caps
.structure(0)
.unwrap()
.get::<gst::Fraction>("framerate")
.ok();
let s = peer_caps.structure(0).unwrap();
let framerate = s.get::<gst::Fraction>("framerate").ok();
let alignment = match s.get::<&str>("alignment").ok() {
Some("packet") => Alignment::Packet,
Some("line") => Alignment::Line,
_ => {
let peer_caps = peer_caps.make_mut();
peer_caps.set("alignment", "packet");
Alignment::Packet
}
};
let mut state = self.state.lock().unwrap();
gst::debug!(CAT, imp = self, "Configuring alignment {alignment:?}");
state.alignment = alignment;
if let Some(framerate) = framerate {
gst::debug!(
CAT,
@ -536,7 +586,9 @@ impl ElementImpl for St2038AncMux {
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::new_empty_simple("meta/x-st-2038");
let caps = gst::Caps::builder("meta/x-st-2038")
.field("alignment", gst::List::new(["packet", "line"]))
.build();
let src_pad_template = gst::PadTemplate::builder(
"src",
gst::PadDirection::Src,
@ -547,6 +599,7 @@ impl ElementImpl for St2038AncMux {
.build()
.unwrap();
let caps = gst::Caps::builder("meta/x-st-2038").build();
let sink_pad_template = gst::PadTemplate::builder(
"sink_%u",
gst::PadDirection::Sink,