diff --git a/Cargo.lock b/Cargo.lock index 5ef0e991..56ad963f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2359,6 +2359,7 @@ version = "0.14.0-alpha.1" dependencies = [ "anyhow", "atomic_refcell", + "bitstream-io", "byteorder", "cairo-rs", "cea608-types", diff --git a/video/closedcaption/Cargo.toml b/video/closedcaption/Cargo.toml index 262cd733..93b9e7b6 100644 --- a/video/closedcaption/Cargo.toml +++ b/video/closedcaption/Cargo.toml @@ -27,6 +27,7 @@ gst-base = { workspace = true, features = ["v1_18"]} gst-video = { workspace = true, features = ["v1_16"]} winnow = "0.6" smallvec = "1" +bitstream-io = "2.3" [dev-dependencies] pretty_assertions = "1" diff --git a/video/closedcaption/src/lib.rs b/video/closedcaption/src/lib.rs index 5a34a248..28be02e9 100644 --- a/video/closedcaption/src/lib.rs +++ b/video/closedcaption/src/lib.rs @@ -34,6 +34,8 @@ mod mcc_parse; mod parser_utils; mod scc_enc; mod scc_parse; +mod st2038anc_utils; +mod st2038ancdemux; mod transcriberbin; mod tttocea608; mod tttocea708; @@ -62,6 +64,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { cea708mux::register(plugin)?; tttocea708::register(plugin)?; cea708overlay::register(plugin)?; + st2038ancdemux::register(plugin)?; Ok(()) } diff --git a/video/closedcaption/src/st2038anc_utils.rs b/video/closedcaption/src/st2038anc_utils.rs new file mode 100644 index 00000000..b11d4343 --- /dev/null +++ b/video/closedcaption/src/st2038anc_utils.rs @@ -0,0 +1,47 @@ +// GStreamer SMPTE ST-2038 ancillary metadata utils +// +// Copyright (C) 2024 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub(crate) struct AncDataHeader { + pub(crate) c_not_y_channel_flag: bool, + pub(crate) did: u8, + pub(crate) sdid: u8, + pub(crate) line_number: u16, + pub(crate) horizontal_offset: u16, +} + +impl AncDataHeader { + pub(crate) fn from_buffer(buffer: &gst::Buffer) -> anyhow::Result { + use anyhow::Context; + use bitstream_io::{BigEndian, BitRead, BitReader}; + + let mut r = BitReader::endian(buffer.as_cursor_readable(), BigEndian); + + let zeroes = r.read::(6).context("zero bits")?; + if zeroes != 0 { + anyhow::bail!("Zero bits not zero!"); + } + let c_not_y_channel_flag = r.read_bit().context("c_not_y_channel_flag")?; + let line_number = r.read::(11).context("line number")?; + let horizontal_offset = r.read::(12).context("horizontal offset")?; + // Top two bits are parity bits and can be stripped off + let did = (r.read::(10).context("DID")? & 0xff) as u8; + let sdid = (r.read::(10).context("SDID")? & 0xff) as u8; + let _data_count = (r.read::(10).context("data count")? & 0xff) as u8; + + Ok(AncDataHeader { + c_not_y_channel_flag, + line_number, + horizontal_offset, + did, + sdid, + }) + } +} diff --git a/video/closedcaption/src/st2038ancdemux/imp.rs b/video/closedcaption/src/st2038ancdemux/imp.rs new file mode 100644 index 00000000..38130641 --- /dev/null +++ b/video/closedcaption/src/st2038ancdemux/imp.rs @@ -0,0 +1,271 @@ +// GStreamer SMPTE ST-2038 ancillary metadata demuxer +// +// Copyright (C) 2024 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst_base::UniqueFlowCombiner; + +use atomic_refcell::AtomicRefCell; + +use once_cell::sync::Lazy; + +use std::collections::HashMap; + +use crate::st2038anc_utils::AncDataHeader; + +static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "st2038ancdemux", + gst::DebugColorFlags::empty(), + Some("SMPTE ST-2038 ancillary metadata demuxer"), + ) +}); + +pub struct St2038AncDemux { + srcpad: gst::Pad, + sinkpad: gst::Pad, + state: AtomicRefCell, +} + +#[derive(Default)] +struct State { + streams: HashMap, + flow_combiner: UniqueFlowCombiner, + segment: gst::FormattedSegment, + last_inactivity_check: Option, +} + +struct AncStream { + pad: gst::Pad, + last_used: Option, +} + +impl St2038AncDemux { + fn sink_chain( + &self, + pad: &gst::Pad, + buffer: gst::Buffer, + ) -> Result { + let mut state = self.state.borrow_mut(); + + let ts = buffer.dts_or_pts(); + let running_time = state.segment.to_running_time(ts); + + let anc_hdr = AncDataHeader::from_buffer(&buffer) + .map_err(|err| { + gst::debug!( + CAT, + imp = self, + "Failed to parse ancillary data header: {err:?}" + ); + // Just push it out on the combined pad and be done with it + return self.srcpad.push(buffer.clone()); + }) + .unwrap(); + + let stream = match state.streams.get_mut(&anc_hdr) { + Some(stream) => stream, + None => { + let pad_name = format!( + "anc_{:02x}_{:02x}_at_{}_{}", + anc_hdr.did, anc_hdr.sdid, anc_hdr.line_number, anc_hdr.horizontal_offset + ); + + gst::info!( + CAT, + imp = self, + "New ancillary data stream {pad_name}: {anc_hdr:?}" + ); + + let anc_templ = self.obj().pad_template("anc_%02x_%02x_at_%u_%u").unwrap(); + let anc_srcpad = gst::Pad::builder_from_template(&anc_templ) + .name(pad_name) + .build(); + + anc_srcpad.set_active(true).expect("set pad active"); + + // Forward sticky events from sink pad to new ancillary data source pad + // FIXME: do we want/need to modify the stream id here? caps? + pad.sticky_events_foreach(|event| { + anc_srcpad.push_event(event.clone()); + std::ops::ControlFlow::Continue(gst::EventForeachAction::Keep) + }); + + self.obj().add_pad(&anc_srcpad).expect("add pad"); + + state.flow_combiner.add_pad(&anc_srcpad); + + state.streams.insert( + anc_hdr.clone(), + AncStream { + pad: anc_srcpad, + last_used: running_time, + }, + ); + + state.streams.get_mut(&anc_hdr).expect("stream") + } + }; + + stream.last_used = running_time; + + // Clone pad, so the borrow on stream can be dropped, otherwise compiler will + // complain that stream and state are both borrowed mutably.. + let anc_pad = stream.pad.clone(); + + let anc_flow = anc_pad.push(buffer.clone()); + + let _ = state.flow_combiner.update_pad_flow(&anc_pad, anc_flow); + + // Todo: Check every now and then if any ancillary streams haven't seen any data for a while + if let Some((last_check, rt)) = Option::zip(state.last_inactivity_check, running_time) { + if gst::ClockTime::absdiff(rt, last_check) >= gst::ClockTime::from_seconds(10) { + // gst::fixme!(CAT, imp = self, "Check ancillary streams for inactivity"); + state.last_inactivity_check = running_time; + } + } + + let main_flow = self.srcpad.push(buffer); + + state.flow_combiner.update_pad_flow(&self.srcpad, main_flow) + } + + fn sink_event(&self, pad: &gst::Pad, event: gst::Event) -> bool { + use gst::EventView; + + gst::log!(CAT, obj = pad, "Handling event {:?}", event); + + // Todo: clear last_seen times on ancillary src pads on stream start/flush? + match event.view() { + EventView::StreamStart(_) => { + let mut state = self.state.borrow_mut(); + state.last_inactivity_check = gst::ClockTime::ZERO.into(); + } + EventView::Segment(ev) => { + let mut state = self.state.borrow_mut(); + state.segment = ev + .segment() + .clone() + .downcast::() + .unwrap(); + } + _ => {} + } + + gst::Pad::event_default(pad, Some(&*self.obj()), event) + } +} + +impl GstObjectImpl for St2038AncDemux {} + +impl ElementImpl for St2038AncDemux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "SMPTE ST-2038 ancillary metadata demuxer", + "Metadata/Video/Demuxer", + "Splits individual ancillary metadata streams from an SMPTE ST-2038 stream", + "Tim-Philipp Müller ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::builder("meta/x-st-2038").build(); + let sink_pad_template = gst::PadTemplate::new( + "sink", + gst::PadDirection::Sink, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + // One always pad that outputs the combined stream, and sometimes pads for each + // ancillary data type, so people can only splice off the ones they actually need. + let combined_src_pad_template = gst::PadTemplate::new( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .unwrap(); + + let individual_src_pad_template = gst::PadTemplate::new( + "anc_%02x_%02x_at_%u_%u", + gst::PadDirection::Src, + gst::PadPresence::Sometimes, + &caps, + ) + .unwrap(); + + vec![ + sink_pad_template, + combined_src_pad_template, + individual_src_pad_template, + ] + }); + + PAD_TEMPLATES.as_ref() + } +} + +#[glib::object_subclass] +impl ObjectSubclass for St2038AncDemux { + const NAME: &'static str = "GstSt2038AncDemux"; + type Type = super::St2038AncDemux; + type ParentType = gst::Element; + + fn with_class(klass: &Self::Class) -> Self { + let templ = klass.pad_template("sink").unwrap(); + let sinkpad = gst::Pad::builder_from_template(&templ) + .chain_function(|pad, parent, buffer| { + St2038AncDemux::catch_panic_pad_function( + parent, + || Err(gst::FlowError::Error), + |enc| enc.sink_chain(pad, buffer), + ) + }) + .event_function(|pad, parent, event| { + St2038AncDemux::catch_panic_pad_function( + parent, + || false, + |enc| enc.sink_event(pad, event), + ) + }) + .build(); + + let templ = klass.pad_template("src").unwrap(); + let srcpad = gst::Pad::from_template(&templ); + + Self { + srcpad, + sinkpad, + state: State::default().into(), + } + } +} + +impl ObjectImpl for St2038AncDemux { + fn constructed(&self) { + self.parent_constructed(); + + let mut state = self.state.borrow_mut(); + + let obj = self.obj(); + obj.add_pad(&self.sinkpad).unwrap(); + obj.add_pad(&self.srcpad).unwrap(); + + state.flow_combiner.add_pad(&self.srcpad); + } +} diff --git a/video/closedcaption/src/st2038ancdemux/mod.rs b/video/closedcaption/src/st2038ancdemux/mod.rs new file mode 100644 index 00000000..de8afcc8 --- /dev/null +++ b/video/closedcaption/src/st2038ancdemux/mod.rs @@ -0,0 +1,27 @@ +// GStreamer SMPTE ST-2038 ancillary metadata demuxer +// +// Copyright (C) 2024 Tim-Philipp Müller +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct St2038AncDemux(ObjectSubclass) @extends gst::Element, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + gst::Element::register( + Some(plugin), + "st2038ancdemux", + gst::Rank::NONE, + St2038AncDemux::static_type(), + ) +}