closedcaption: Add ST2038 muxer element

Part-of: <https://gitlab.freedesktop.org/gstreamer/gst-plugins-rs/-/merge_requests/1777>
This commit is contained in:
Sebastian Dröge 2024-09-16 13:54:54 +03:00 committed by GStreamer Marge Bot
parent a4dcb52ca7
commit b2e37d3c98
6 changed files with 680 additions and 7 deletions

View file

@ -22,7 +22,7 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0", features = ["raw_value"] }
cea708-types = "0.3.2"
cea608-types = "0.1.1"
gst = { workspace = true, features = ["v1_16"]}
gst = { workspace = true, features = ["v1_20"]}
gst-base = { workspace = true, features = ["v1_18"]}
gst-video = { workspace = true, features = ["v1_16"]}
winnow = "0.6"
@ -47,6 +47,7 @@ gst-plugin-version-helper.workspace = true
static = []
capi = []
doc = ["gst/v1_18"]
v1_26 = ["gst-base/v1_26"]
[package.metadata.capi]
min_version = "0.9.21"

View file

@ -36,6 +36,7 @@ mod scc_enc;
mod scc_parse;
mod st2038anc_utils;
mod st2038ancdemux;
mod st2038ancmux;
mod transcriberbin;
mod tttocea608;
mod tttocea708;
@ -65,6 +66,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
tttocea708::register(plugin)?;
cea708overlay::register(plugin)?;
st2038ancdemux::register(plugin)?;
st2038ancmux::register(plugin)?;
Ok(())
}

View file

@ -8,13 +8,14 @@
//
// SPDX-License-Identifier: MPL-2.0
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
#[derive(Clone, Copy, Debug)]
pub(crate) struct AncDataHeader {
pub(crate) c_not_y_channel_flag: bool,
pub(crate) did: u8,
pub(crate) sdid: u8,
pub(crate) line_number: u16,
pub(crate) horizontal_offset: u16,
pub(crate) data_count: u8,
}
impl AncDataHeader {
@ -34,7 +35,7 @@ impl AncDataHeader {
// Top two bits are parity bits and can be stripped off
let did = (r.read::<u16>(10).context("DID")? & 0xff) as u8;
let sdid = (r.read::<u16>(10).context("SDID")? & 0xff) as u8;
let _data_count = (r.read::<u16>(10).context("data count")? & 0xff) as u8;
let data_count = (r.read::<u16>(10).context("data count")? & 0xff) as u8;
Ok(AncDataHeader {
c_not_y_channel_flag,
@ -42,6 +43,7 @@ impl AncDataHeader {
horizontal_offset,
did,
sdid,
data_count,
})
}
}

View file

@ -37,12 +37,33 @@ pub struct St2038AncDemux {
#[derive(Default)]
struct State {
streams: HashMap<AncDataHeader, AncStream>,
streams: HashMap<AncDataId, AncStream>,
flow_combiner: UniqueFlowCombiner,
segment: gst::FormattedSegment<gst::ClockTime>,
last_inactivity_check: Option<gst::ClockTime>,
}
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct AncDataId {
c_not_y_channel_flag: bool,
did: u8,
sdid: u8,
line_number: u16,
horizontal_offset: u16,
}
impl From<AncDataHeader> for AncDataId {
fn from(value: AncDataHeader) -> Self {
AncDataId {
c_not_y_channel_flag: value.c_not_y_channel_flag,
did: value.did,
sdid: value.sdid,
line_number: value.line_number,
horizontal_offset: value.horizontal_offset,
}
}
}
struct AncStream {
pad: gst::Pad,
last_used: Option<gst::ClockTime>,
@ -71,7 +92,7 @@ impl St2038AncDemux {
})
.unwrap();
let stream = match state.streams.get_mut(&anc_hdr) {
let stream = match state.streams.get_mut(&AncDataId::from(anc_hdr)) {
Some(stream) => stream,
None => {
let pad_name = format!(
@ -104,14 +125,17 @@ impl St2038AncDemux {
state.flow_combiner.add_pad(&anc_srcpad);
state.streams.insert(
anc_hdr.clone(),
AncDataId::from(anc_hdr),
AncStream {
pad: anc_srcpad,
last_used: running_time,
},
);
state.streams.get_mut(&anc_hdr).expect("stream")
state
.streams
.get_mut(&AncDataId::from(anc_hdr))
.expect("stream")
}
};

View file

@ -0,0 +1,612 @@
// Copyright (C) 2024 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use std::collections::BTreeMap;
use std::ops::ControlFlow;
use std::sync::Mutex;
use gst::glib;
use gst::prelude::*;
use gst::subclass::prelude::*;
use gst_base::prelude::*;
use gst_base::subclass::prelude::*;
use once_cell::sync::Lazy;
use crate::st2038anc_utils::AncDataHeader;
#[derive(Default)]
struct State {
downstream_framerate: Option<gst::Fraction>,
}
#[derive(Default)]
pub struct St2038AncMux {
state: Mutex<State>,
}
pub(crate) static CAT: Lazy<gst::DebugCategory> = Lazy::new(|| {
gst::DebugCategory::new(
"st2038ancmux",
gst::DebugColorFlags::empty(),
Some("ST2038 Anc Mux Element"),
)
});
impl AggregatorImpl for St2038AncMux {
fn aggregate(&self, timeout: bool) -> Result<gst::FlowSuccess, gst::FlowError> {
let state = self.state.lock().unwrap();
let src_segment = self
.obj()
.src_pad()
.segment()
.downcast::<gst::ClockTime>()
.expect("Non-TIME segment");
let start_running_time =
if src_segment.position().is_none() || src_segment.position() < src_segment.start() {
src_segment.start().unwrap()
} else {
src_segment.position().unwrap()
};
// Only if downstream framerate provided, otherwise we output as we go
let duration = if let Some(framerate) = state.downstream_framerate {
gst::ClockTime::SECOND
.nseconds()
.mul_div_round(framerate.denom() as u64, framerate.numer() as u64)
.unwrap()
.nseconds()
} else {
gst::ClockTime::ZERO
};
let end_running_time = start_running_time + duration;
drop(state);
gst::trace!(
CAT,
imp = self,
"Aggregating for start time {} end {} timeout {}",
start_running_time.display(),
end_running_time.display(),
timeout
);
let sinkpads = self.obj().sink_pads();
// Collect buffers from all pads. We can start outputting for this frame on timeout,
// or otherwise all pads are either EOS or have a buffer for a future frame.
let mut all_pads_done = true;
let mut all_pads_eos = true;
let mut min_next_buffer_running_time = None;
for pad in sinkpads
.iter()
.map(|pad| pad.downcast_ref::<super::St2038AncMuxSinkPad>().unwrap())
{
let mut pad_state = pad.imp().pad_state.lock().unwrap();
if pad.is_eos() {
// This pad is done
gst::trace!(CAT, obj = pad, "Pad is EOS");
if !pad_state.queued_buffers.is_empty() {
all_pads_eos = false;
}
continue;
}
all_pads_eos = false;
let buffer = if let Some(buffer) = pad.peek_buffer() {
buffer
} else {
all_pads_done = false;
continue;
};
let segment = pad.segment().downcast::<gst::ClockTime>().unwrap();
let Some(buffer_start_ts) = segment.to_running_time(buffer.pts()) else {
gst::warning!(CAT, obj = pad, "Buffer without valid PTS, dropping");
pad.drop_buffer();
all_pads_done = false;
continue;
};
if buffer_start_ts > end_running_time
|| (end_running_time > start_running_time && buffer_start_ts == end_running_time)
{
gst::trace!(
CAT,
obj = pad,
"Buffer starting at {buffer_start_ts} >= {end_running_time}"
);
if min_next_buffer_running_time.map_or(true, |next_buffer_min_running_time| {
next_buffer_min_running_time > buffer_start_ts
}) {
min_next_buffer_running_time = Some(buffer_start_ts);
}
// buffer is not for this frame so we're not interested in it yet
// and this pad is done for this frame.
continue;
}
// Store buffers on the pad
gst::trace!(
CAT,
obj = pad,
"Queueing buffer starting at {buffer_start_ts}"
);
pad_state.queued_buffers.push(buffer);
pad.drop_buffer();
// Check again if there's another buffer on this pad for this frame
all_pads_done = false;
}
if !all_pads_done && !timeout {
gst::trace!(CAT, imp = self, "Not all pads ready yet");
return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA);
}
if all_pads_eos {
gst::debug!(CAT, imp = self, "All pads EOS");
return Err(gst::FlowError::Eos);
}
gst::trace!(CAT, imp = self, "Ready for outputting");
self.obj()
.selected_samples(start_running_time, None, duration, None);
// Remove all overlapping anc buffers from the queued buffers. The latest pad, latest
// buffer of that pad wins.
let mut lines =
BTreeMap::<u16, BTreeMap<u16, (u16, super::St2038AncMuxSinkPad, gst::Buffer)>>::new();
for pad in sinkpads
.iter()
.rev()
.map(|pad| pad.downcast_ref::<super::St2038AncMuxSinkPad>().unwrap())
{
let mut pad_state = pad.imp().pad_state.lock().unwrap();
for buffer in pad_state.queued_buffers.drain(..).rev() {
if buffer.size() == 0
&& buffer.flags().contains(gst::BufferFlags::GAP)
&& gst::meta::CustomMeta::from_buffer(&buffer, "GstAggregatorMissingDataMeta")
.is_ok()
{
gst::trace!(CAT, obj = pad, "Dropping gap buffer");
continue;
}
let header = match AncDataHeader::from_buffer(&buffer) {
Ok(header) => header,
Err(err) => {
gst::warning!(
CAT,
obj = pad,
"Dropping buffer with invalid ST2038 data ({err})"
);
continue;
}
};
gst::trace!(CAT, obj = pad, "Parsed ST2038 header {header:?}");
// FIXME: One pixel per word of data? ADF header needs to be included in the
// calculation? Two words per pixel because 4:2:2 YUV? Nobody knows!
let buffer_clone = buffer.clone(); // FIXME: To appease the borrow checker
lines
.entry(header.line_number)
.and_modify(|line| {
let new_offset = header.horizontal_offset;
let new_offset_end = header.horizontal_offset + header.data_count as u16;
for (offset, (offset_end, _pad, _buffer)) in &*line {
// If one of the range starts is between the start/end of the other
// then the two ranges are overlapping.
if (new_offset >= *offset && new_offset < *offset_end)
|| (*offset >= new_offset && *offset < new_offset_end)
{
gst::trace!(
CAT,
obj = pad,
"Not including ST2038 packet at {}x{}",
header.line_number,
header.horizontal_offset
);
return;
}
}
gst::trace!(
CAT,
obj = pad,
"Including ST2038 packet at {}x{}",
header.line_number,
header.horizontal_offset
);
line.insert(new_offset, (new_offset_end, pad.clone(), buffer));
})
.or_insert_with(|| {
gst::trace!(
CAT,
obj = pad,
"Including ST2038 packet at {}x{}",
header.line_number,
header.horizontal_offset
);
let mut line = BTreeMap::new();
line.insert(
header.horizontal_offset,
(
header.horizontal_offset + header.data_count as u16,
pad.clone(),
buffer_clone,
),
);
line
});
}
}
// Collect all anc buffers for this frame and output them as a single buffer list,
// sorted by line. Multiple anc in a single line are merged into a single buffer.
let ret = if !lines.is_empty() {
let mut buffers = gst::BufferList::new();
let buffers_ref = buffers.get_mut().unwrap();
for (line_idx, line) in lines {
// If there are multiple buffers for a line then merge them into a single buffer
if line.len() == 1 {
for (horizontal_offset, (_, _pad, buffer)) in line {
gst::trace!(
CAT,
imp = self,
"Outputting ST2038 packet at {line_idx}x{horizontal_offset}"
);
buffers_ref.add(buffer);
}
} else {
gst::trace!(
CAT,
imp = self,
"Outputting multiple ST2038 packets at line {line_idx}"
);
let mut new_buffer = gst::Buffer::new();
for (horizontal_offset, (_, _pad, buffer)) in line {
gst::trace!(CAT, imp = self, "Horizontal offset {horizontal_offset}");
// Copy over metadata of the first buffer for this line
if new_buffer.size() == 0 {
let new_buffer_ref = new_buffer.get_mut().unwrap();
let _ = buffer.copy_into(new_buffer_ref, gst::BUFFER_COPY_METADATA, ..);
}
new_buffer.append(buffer);
}
}
}
gst::trace!(CAT, imp = self, "Outputting {} buffers", buffers_ref.len());
// Unset marker flag on all buffers, and set PTS/duration if there is a downstream
// framerate. Otherwise we leave them as-is.
if duration > gst::ClockTime::ZERO {
buffers_ref.foreach_mut(|mut buffer, _idx| {
let buffer_ref = buffer.make_mut();
buffer_ref.set_pts(start_running_time);
buffer_ref.set_duration(duration);
buffer_ref.unset_flags(gst::BufferFlags::MARKER);
ControlFlow::Continue(Some(buffer))
});
} else {
buffers_ref.foreach_mut(|mut buffer, _idx| {
if buffer.flags().contains(gst::BufferFlags::MARKER) {
let buffer_ref = buffer.make_mut();
buffer_ref.unset_flags(gst::BufferFlags::MARKER);
}
ControlFlow::Continue(Some(buffer))
});
}
// Set marker flag on last buffer
{
let last = buffers_ref.get_mut(buffers_ref.len() - 1).unwrap();
last.set_flags(gst::BufferFlags::MARKER);
}
self.finish_buffer_list(buffers)
} else {
let mut duration = duration;
if let Some(min_next_buffer_running_time) = min_next_buffer_running_time {
gst::trace!(
CAT,
imp = self,
"Next buffer at {min_next_buffer_running_time}"
);
if duration == gst::ClockTime::ZERO {
duration = min_next_buffer_running_time - start_running_time;
}
}
gst::trace!(
CAT,
imp = self,
"Outputting gap event at {start_running_time} with duration {duration}"
);
// Nothing to be output for this frame
#[cfg(feature = "v1_26")]
{
self.obj().push_src_event(
gst::event::Gap::builder(start_running_time)
.duration(duration)
.build(),
);
}
#[cfg(not(feature = "v1_26"))]
{
self.obj().src_pad().push_event(
gst::event::Gap::builder(start_running_time)
.duration(duration)
.build(),
);
}
Ok(gst::FlowSuccess::Ok)
};
// Advance position to the next frame if there is a downstream framerate, or otherwise
// to the start of the next buffer.
if duration > gst::ClockTime::ZERO {
self.obj().set_position(end_running_time);
} else if let Some(min_next_buffer_running_time) = min_next_buffer_running_time {
self.obj().set_position(min_next_buffer_running_time);
} else {
self.obj()
.set_position(src_segment.position().opt_add(40.mseconds()));
}
ret
}
fn peek_next_sample(&self, pad: &gst_base::AggregatorPad) -> Option<gst::Sample> {
let pad = pad.downcast_ref::<super::St2038AncMuxSinkPad>().unwrap();
let pad_state = pad.imp().pad_state.lock().unwrap();
let caps = pad.current_caps()?;
if pad_state.queued_buffers.is_empty() {
return None;
}
Some(
gst::Sample::builder()
.buffer_list(
&pad_state
.queued_buffers
.iter()
.cloned()
.collect::<gst::BufferList>(),
)
.segment(&pad.segment())
.caps(&caps)
.build(),
)
}
fn next_time(&self) -> Option<gst::ClockTime> {
self.obj().simple_get_next_time()
}
fn flush(&self) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.state.lock().unwrap();
*state = State::default();
self.obj()
.src_pad()
.segment()
.set_position(None::<gst::ClockTime>);
Ok(gst::FlowSuccess::Ok)
}
fn negotiate(&self) -> bool {
let templ_caps = self.obj().src_pad().pad_template_caps();
let mut peer_caps = self.obj().src_pad().peer_query_caps(Some(&templ_caps));
gst::debug!(CAT, imp = self, "Downstream caps {peer_caps:?}");
if peer_caps.is_empty() {
gst::warning!(CAT, imp = self, "Downstream returned EMPTY caps");
return false;
}
peer_caps.fixate();
let framerate = peer_caps
.structure(0)
.unwrap()
.get::<gst::Fraction>("framerate")
.ok();
let mut state = self.state.lock().unwrap();
if let Some(framerate) = framerate {
gst::debug!(
CAT,
imp = self,
"Configuring downstream requested framerate {framerate}"
);
state.downstream_framerate = Some(framerate);
drop(state);
let duration = gst::ClockTime::SECOND
.nseconds()
.mul_div_round(framerate.denom() as u64, framerate.numer() as u64)
.unwrap()
.nseconds();
self.obj().set_latency(duration, duration);
} else {
gst::debug!(CAT, imp = self, "Downstream requested no framerate");
state.downstream_framerate = None;
drop(state);
// Assume 25fps as a worst case
self.obj().set_latency(40.mseconds(), None);
}
self.obj().set_src_caps(&peer_caps);
true
}
fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool {
#[allow(clippy::single_match)]
match event.view() {
gst::EventView::Segment(ev) => {
let segment = ev.segment();
if segment.format() != gst::Format::Time {
gst::error!(CAT, imp = self, "Non-TIME segments not supported");
return false;
}
}
_ => (),
}
self.parent_sink_event(aggregator_pad, event)
}
fn clip(
&self,
aggregator_pad: &gst_base::AggregatorPad,
buffer: gst::Buffer,
) -> Option<gst::Buffer> {
let Some(pts) = buffer.pts() else {
return Some(buffer);
};
let segment = aggregator_pad.segment();
segment
.downcast_ref::<gst::ClockTime>()
.map(|segment| segment.clip(pts, pts))
.map(|_| buffer)
}
fn start(&self) -> Result<(), gst::ErrorMessage> {
gst::trace!(CAT, imp = self, "Starting");
let mut state = self.state.lock().unwrap();
*state = State::default();
Ok(())
}
fn stop(&self) -> Result<(), gst::ErrorMessage> {
gst::trace!(CAT, imp = self, "Stopping");
let mut state = self.state.lock().unwrap();
*state = State::default();
Ok(())
}
}
impl ElementImpl for St2038AncMux {
fn metadata() -> Option<&'static gst::subclass::ElementMetadata> {
static ELEMENT_METADATA: Lazy<gst::subclass::ElementMetadata> = Lazy::new(|| {
gst::subclass::ElementMetadata::new(
"ST2038 Anc Mux",
"Muxer",
"Combines multiple ST2038 Anc streams",
"Sebastian Dröge <sebastian@centricular.com>",
)
});
Some(&*ELEMENT_METADATA)
}
fn pad_templates() -> &'static [gst::PadTemplate] {
static PAD_TEMPLATES: Lazy<Vec<gst::PadTemplate>> = Lazy::new(|| {
let caps = gst::Caps::new_empty_simple("meta/x-st-2038");
let src_pad_template = gst::PadTemplate::builder(
"src",
gst::PadDirection::Src,
gst::PadPresence::Always,
&caps,
)
.gtype(gst_base::AggregatorPad::static_type())
.build()
.unwrap();
let sink_pad_template = gst::PadTemplate::builder(
"sink_%u",
gst::PadDirection::Sink,
gst::PadPresence::Request,
&caps,
)
.gtype(super::St2038AncMuxSinkPad::static_type())
.build()
.unwrap();
vec![src_pad_template, sink_pad_template]
});
PAD_TEMPLATES.as_ref()
}
}
impl GstObjectImpl for St2038AncMux {}
impl ObjectImpl for St2038AncMux {}
#[glib::object_subclass]
impl ObjectSubclass for St2038AncMux {
const NAME: &'static str = "GstSt2038AncMux";
type Type = super::St2038AncMux;
type ParentType = gst_base::Aggregator;
}
#[derive(Default)]
struct PadState {
queued_buffers: Vec<gst::Buffer>,
}
#[derive(Default)]
pub struct St2038AncMuxSinkPad {
pad_state: Mutex<PadState>,
}
impl St2038AncMuxSinkPad {}
impl AggregatorPadImpl for St2038AncMuxSinkPad {
fn flush(
&self,
_aggregator: &gst_base::Aggregator,
) -> Result<gst::FlowSuccess, gst::FlowError> {
let mut state = self.pad_state.lock().unwrap();
state.queued_buffers.clear();
Ok(gst::FlowSuccess::Ok)
}
}
impl PadImpl for St2038AncMuxSinkPad {}
impl GstObjectImpl for St2038AncMuxSinkPad {}
impl ObjectImpl for St2038AncMuxSinkPad {}
#[glib::object_subclass]
impl ObjectSubclass for St2038AncMuxSinkPad {
const NAME: &'static str = "GstSt2038AncMuxSinkPad";
type Type = super::St2038AncMuxSinkPad;
type ParentType = gst_base::AggregatorPad;
}

View file

@ -0,0 +1,32 @@
// Copyright (C) 2024 Sebastian Dröge <sebastian@centricular.com>
//
// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0.
// If a copy of the MPL was not distributed with this file, You can obtain one at
// <https://mozilla.org/MPL/2.0/>.
//
// SPDX-License-Identifier: MPL-2.0
use gst::glib;
use gst::prelude::*;
mod imp;
glib::wrapper! {
pub struct St2038AncMux(ObjectSubclass<imp::St2038AncMux>) @extends gst_base::Aggregator, gst::Element, gst::Object;
}
glib::wrapper! {
pub struct St2038AncMuxSinkPad(ObjectSubclass<imp::St2038AncMuxSinkPad>) @extends gst_base::AggregatorPad, gst::Pad, gst::Object;
}
pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> {
#[cfg(feature = "doc")]
St2038AncMuxSinkPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty());
gst::Element::register(
Some(plugin),
"st2038ancmux",
gst::Rank::NONE,
St2038AncMux::static_type(),
)
}