From b2e37d3c982fbc00682e6c684b02552974f8ece3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sebastian=20Dr=C3=B6ge?= Date: Mon, 16 Sep 2024 13:54:54 +0300 Subject: [PATCH] closedcaption: Add ST2038 muxer element Part-of: --- video/closedcaption/Cargo.toml | 3 +- video/closedcaption/src/lib.rs | 2 + video/closedcaption/src/st2038anc_utils.rs | 6 +- video/closedcaption/src/st2038ancdemux/imp.rs | 32 +- video/closedcaption/src/st2038ancmux/imp.rs | 612 ++++++++++++++++++ video/closedcaption/src/st2038ancmux/mod.rs | 32 + 6 files changed, 680 insertions(+), 7 deletions(-) create mode 100644 video/closedcaption/src/st2038ancmux/imp.rs create mode 100644 video/closedcaption/src/st2038ancmux/mod.rs diff --git a/video/closedcaption/Cargo.toml b/video/closedcaption/Cargo.toml index 93b9e7b6..fd92b5cf 100644 --- a/video/closedcaption/Cargo.toml +++ b/video/closedcaption/Cargo.toml @@ -22,7 +22,7 @@ serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0", features = ["raw_value"] } cea708-types = "0.3.2" cea608-types = "0.1.1" -gst = { workspace = true, features = ["v1_16"]} +gst = { workspace = true, features = ["v1_20"]} gst-base = { workspace = true, features = ["v1_18"]} gst-video = { workspace = true, features = ["v1_16"]} winnow = "0.6" @@ -47,6 +47,7 @@ gst-plugin-version-helper.workspace = true static = [] capi = [] doc = ["gst/v1_18"] +v1_26 = ["gst-base/v1_26"] [package.metadata.capi] min_version = "0.9.21" diff --git a/video/closedcaption/src/lib.rs b/video/closedcaption/src/lib.rs index 28be02e9..1c3c6ae7 100644 --- a/video/closedcaption/src/lib.rs +++ b/video/closedcaption/src/lib.rs @@ -36,6 +36,7 @@ mod scc_enc; mod scc_parse; mod st2038anc_utils; mod st2038ancdemux; +mod st2038ancmux; mod transcriberbin; mod tttocea608; mod tttocea708; @@ -65,6 +66,7 @@ fn plugin_init(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { tttocea708::register(plugin)?; cea708overlay::register(plugin)?; st2038ancdemux::register(plugin)?; + st2038ancmux::register(plugin)?; Ok(()) } diff --git a/video/closedcaption/src/st2038anc_utils.rs b/video/closedcaption/src/st2038anc_utils.rs index b11d4343..e8f6a9a0 100644 --- a/video/closedcaption/src/st2038anc_utils.rs +++ b/video/closedcaption/src/st2038anc_utils.rs @@ -8,13 +8,14 @@ // // SPDX-License-Identifier: MPL-2.0 -#[derive(Clone, Debug, PartialEq, Eq, Hash)] +#[derive(Clone, Copy, Debug)] pub(crate) struct AncDataHeader { pub(crate) c_not_y_channel_flag: bool, pub(crate) did: u8, pub(crate) sdid: u8, pub(crate) line_number: u16, pub(crate) horizontal_offset: u16, + pub(crate) data_count: u8, } impl AncDataHeader { @@ -34,7 +35,7 @@ impl AncDataHeader { // Top two bits are parity bits and can be stripped off let did = (r.read::(10).context("DID")? & 0xff) as u8; let sdid = (r.read::(10).context("SDID")? & 0xff) as u8; - let _data_count = (r.read::(10).context("data count")? & 0xff) as u8; + let data_count = (r.read::(10).context("data count")? & 0xff) as u8; Ok(AncDataHeader { c_not_y_channel_flag, @@ -42,6 +43,7 @@ impl AncDataHeader { horizontal_offset, did, sdid, + data_count, }) } } diff --git a/video/closedcaption/src/st2038ancdemux/imp.rs b/video/closedcaption/src/st2038ancdemux/imp.rs index 38130641..a6447c30 100644 --- a/video/closedcaption/src/st2038ancdemux/imp.rs +++ b/video/closedcaption/src/st2038ancdemux/imp.rs @@ -37,12 +37,33 @@ pub struct St2038AncDemux { #[derive(Default)] struct State { - streams: HashMap, + streams: HashMap, flow_combiner: UniqueFlowCombiner, segment: gst::FormattedSegment, last_inactivity_check: Option, } +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct AncDataId { + c_not_y_channel_flag: bool, + did: u8, + sdid: u8, + line_number: u16, + horizontal_offset: u16, +} + +impl From for AncDataId { + fn from(value: AncDataHeader) -> Self { + AncDataId { + c_not_y_channel_flag: value.c_not_y_channel_flag, + did: value.did, + sdid: value.sdid, + line_number: value.line_number, + horizontal_offset: value.horizontal_offset, + } + } +} + struct AncStream { pad: gst::Pad, last_used: Option, @@ -71,7 +92,7 @@ impl St2038AncDemux { }) .unwrap(); - let stream = match state.streams.get_mut(&anc_hdr) { + let stream = match state.streams.get_mut(&AncDataId::from(anc_hdr)) { Some(stream) => stream, None => { let pad_name = format!( @@ -104,14 +125,17 @@ impl St2038AncDemux { state.flow_combiner.add_pad(&anc_srcpad); state.streams.insert( - anc_hdr.clone(), + AncDataId::from(anc_hdr), AncStream { pad: anc_srcpad, last_used: running_time, }, ); - state.streams.get_mut(&anc_hdr).expect("stream") + state + .streams + .get_mut(&AncDataId::from(anc_hdr)) + .expect("stream") } }; diff --git a/video/closedcaption/src/st2038ancmux/imp.rs b/video/closedcaption/src/st2038ancmux/imp.rs new file mode 100644 index 00000000..b4a05a76 --- /dev/null +++ b/video/closedcaption/src/st2038ancmux/imp.rs @@ -0,0 +1,612 @@ +// Copyright (C) 2024 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use std::collections::BTreeMap; +use std::ops::ControlFlow; +use std::sync::Mutex; + +use gst::glib; +use gst::prelude::*; +use gst::subclass::prelude::*; +use gst_base::prelude::*; +use gst_base::subclass::prelude::*; + +use once_cell::sync::Lazy; + +use crate::st2038anc_utils::AncDataHeader; + +#[derive(Default)] +struct State { + downstream_framerate: Option, +} + +#[derive(Default)] +pub struct St2038AncMux { + state: Mutex, +} + +pub(crate) static CAT: Lazy = Lazy::new(|| { + gst::DebugCategory::new( + "st2038ancmux", + gst::DebugColorFlags::empty(), + Some("ST2038 Anc Mux Element"), + ) +}); + +impl AggregatorImpl for St2038AncMux { + fn aggregate(&self, timeout: bool) -> Result { + let state = self.state.lock().unwrap(); + let src_segment = self + .obj() + .src_pad() + .segment() + .downcast::() + .expect("Non-TIME segment"); + + let start_running_time = + if src_segment.position().is_none() || src_segment.position() < src_segment.start() { + src_segment.start().unwrap() + } else { + src_segment.position().unwrap() + }; + + // Only if downstream framerate provided, otherwise we output as we go + let duration = if let Some(framerate) = state.downstream_framerate { + gst::ClockTime::SECOND + .nseconds() + .mul_div_round(framerate.denom() as u64, framerate.numer() as u64) + .unwrap() + .nseconds() + } else { + gst::ClockTime::ZERO + }; + let end_running_time = start_running_time + duration; + drop(state); + + gst::trace!( + CAT, + imp = self, + "Aggregating for start time {} end {} timeout {}", + start_running_time.display(), + end_running_time.display(), + timeout + ); + + let sinkpads = self.obj().sink_pads(); + + // Collect buffers from all pads. We can start outputting for this frame on timeout, + // or otherwise all pads are either EOS or have a buffer for a future frame. + let mut all_pads_done = true; + let mut all_pads_eos = true; + let mut min_next_buffer_running_time = None; + + for pad in sinkpads + .iter() + .map(|pad| pad.downcast_ref::().unwrap()) + { + let mut pad_state = pad.imp().pad_state.lock().unwrap(); + + if pad.is_eos() { + // This pad is done + gst::trace!(CAT, obj = pad, "Pad is EOS"); + if !pad_state.queued_buffers.is_empty() { + all_pads_eos = false; + } + continue; + } + + all_pads_eos = false; + + let buffer = if let Some(buffer) = pad.peek_buffer() { + buffer + } else { + all_pads_done = false; + continue; + }; + + let segment = pad.segment().downcast::().unwrap(); + let Some(buffer_start_ts) = segment.to_running_time(buffer.pts()) else { + gst::warning!(CAT, obj = pad, "Buffer without valid PTS, dropping"); + pad.drop_buffer(); + all_pads_done = false; + continue; + }; + + if buffer_start_ts > end_running_time + || (end_running_time > start_running_time && buffer_start_ts == end_running_time) + { + gst::trace!( + CAT, + obj = pad, + "Buffer starting at {buffer_start_ts} >= {end_running_time}" + ); + + if min_next_buffer_running_time.map_or(true, |next_buffer_min_running_time| { + next_buffer_min_running_time > buffer_start_ts + }) { + min_next_buffer_running_time = Some(buffer_start_ts); + } + + // buffer is not for this frame so we're not interested in it yet + // and this pad is done for this frame. + continue; + } + + // Store buffers on the pad + gst::trace!( + CAT, + obj = pad, + "Queueing buffer starting at {buffer_start_ts}" + ); + pad_state.queued_buffers.push(buffer); + pad.drop_buffer(); + + // Check again if there's another buffer on this pad for this frame + all_pads_done = false; + } + + if !all_pads_done && !timeout { + gst::trace!(CAT, imp = self, "Not all pads ready yet"); + return Err(gst_base::AGGREGATOR_FLOW_NEED_DATA); + } + + if all_pads_eos { + gst::debug!(CAT, imp = self, "All pads EOS"); + return Err(gst::FlowError::Eos); + } + + gst::trace!(CAT, imp = self, "Ready for outputting"); + + self.obj() + .selected_samples(start_running_time, None, duration, None); + + // Remove all overlapping anc buffers from the queued buffers. The latest pad, latest + // buffer of that pad wins. + let mut lines = + BTreeMap::>::new(); + for pad in sinkpads + .iter() + .rev() + .map(|pad| pad.downcast_ref::().unwrap()) + { + let mut pad_state = pad.imp().pad_state.lock().unwrap(); + + for buffer in pad_state.queued_buffers.drain(..).rev() { + if buffer.size() == 0 + && buffer.flags().contains(gst::BufferFlags::GAP) + && gst::meta::CustomMeta::from_buffer(&buffer, "GstAggregatorMissingDataMeta") + .is_ok() + { + gst::trace!(CAT, obj = pad, "Dropping gap buffer"); + continue; + } + + let header = match AncDataHeader::from_buffer(&buffer) { + Ok(header) => header, + Err(err) => { + gst::warning!( + CAT, + obj = pad, + "Dropping buffer with invalid ST2038 data ({err})" + ); + continue; + } + }; + + gst::trace!(CAT, obj = pad, "Parsed ST2038 header {header:?}"); + + // FIXME: One pixel per word of data? ADF header needs to be included in the + // calculation? Two words per pixel because 4:2:2 YUV? Nobody knows! + let buffer_clone = buffer.clone(); // FIXME: To appease the borrow checker + lines + .entry(header.line_number) + .and_modify(|line| { + let new_offset = header.horizontal_offset; + let new_offset_end = header.horizontal_offset + header.data_count as u16; + + for (offset, (offset_end, _pad, _buffer)) in &*line { + // If one of the range starts is between the start/end of the other + // then the two ranges are overlapping. + if (new_offset >= *offset && new_offset < *offset_end) + || (*offset >= new_offset && *offset < new_offset_end) + { + gst::trace!( + CAT, + obj = pad, + "Not including ST2038 packet at {}x{}", + header.line_number, + header.horizontal_offset + ); + return; + } + } + + gst::trace!( + CAT, + obj = pad, + "Including ST2038 packet at {}x{}", + header.line_number, + header.horizontal_offset + ); + + line.insert(new_offset, (new_offset_end, pad.clone(), buffer)); + }) + .or_insert_with(|| { + gst::trace!( + CAT, + obj = pad, + "Including ST2038 packet at {}x{}", + header.line_number, + header.horizontal_offset + ); + + let mut line = BTreeMap::new(); + line.insert( + header.horizontal_offset, + ( + header.horizontal_offset + header.data_count as u16, + pad.clone(), + buffer_clone, + ), + ); + line + }); + } + } + + // Collect all anc buffers for this frame and output them as a single buffer list, + // sorted by line. Multiple anc in a single line are merged into a single buffer. + let ret = if !lines.is_empty() { + let mut buffers = gst::BufferList::new(); + + let buffers_ref = buffers.get_mut().unwrap(); + + for (line_idx, line) in lines { + // If there are multiple buffers for a line then merge them into a single buffer + if line.len() == 1 { + for (horizontal_offset, (_, _pad, buffer)) in line { + gst::trace!( + CAT, + imp = self, + "Outputting ST2038 packet at {line_idx}x{horizontal_offset}" + ); + buffers_ref.add(buffer); + } + } else { + gst::trace!( + CAT, + imp = self, + "Outputting multiple ST2038 packets at line {line_idx}" + ); + let mut new_buffer = gst::Buffer::new(); + for (horizontal_offset, (_, _pad, buffer)) in line { + gst::trace!(CAT, imp = self, "Horizontal offset {horizontal_offset}"); + // Copy over metadata of the first buffer for this line + if new_buffer.size() == 0 { + let new_buffer_ref = new_buffer.get_mut().unwrap(); + let _ = buffer.copy_into(new_buffer_ref, gst::BUFFER_COPY_METADATA, ..); + } + new_buffer.append(buffer); + } + } + } + + gst::trace!(CAT, imp = self, "Outputting {} buffers", buffers_ref.len()); + + // Unset marker flag on all buffers, and set PTS/duration if there is a downstream + // framerate. Otherwise we leave them as-is. + if duration > gst::ClockTime::ZERO { + buffers_ref.foreach_mut(|mut buffer, _idx| { + let buffer_ref = buffer.make_mut(); + buffer_ref.set_pts(start_running_time); + buffer_ref.set_duration(duration); + buffer_ref.unset_flags(gst::BufferFlags::MARKER); + + ControlFlow::Continue(Some(buffer)) + }); + } else { + buffers_ref.foreach_mut(|mut buffer, _idx| { + if buffer.flags().contains(gst::BufferFlags::MARKER) { + let buffer_ref = buffer.make_mut(); + buffer_ref.unset_flags(gst::BufferFlags::MARKER); + } + + ControlFlow::Continue(Some(buffer)) + }); + } + + // Set marker flag on last buffer + { + let last = buffers_ref.get_mut(buffers_ref.len() - 1).unwrap(); + last.set_flags(gst::BufferFlags::MARKER); + } + + self.finish_buffer_list(buffers) + } else { + let mut duration = duration; + + if let Some(min_next_buffer_running_time) = min_next_buffer_running_time { + gst::trace!( + CAT, + imp = self, + "Next buffer at {min_next_buffer_running_time}" + ); + if duration == gst::ClockTime::ZERO { + duration = min_next_buffer_running_time - start_running_time; + } + } + + gst::trace!( + CAT, + imp = self, + "Outputting gap event at {start_running_time} with duration {duration}" + ); + + // Nothing to be output for this frame + + #[cfg(feature = "v1_26")] + { + self.obj().push_src_event( + gst::event::Gap::builder(start_running_time) + .duration(duration) + .build(), + ); + } + + #[cfg(not(feature = "v1_26"))] + { + self.obj().src_pad().push_event( + gst::event::Gap::builder(start_running_time) + .duration(duration) + .build(), + ); + } + + Ok(gst::FlowSuccess::Ok) + }; + + // Advance position to the next frame if there is a downstream framerate, or otherwise + // to the start of the next buffer. + if duration > gst::ClockTime::ZERO { + self.obj().set_position(end_running_time); + } else if let Some(min_next_buffer_running_time) = min_next_buffer_running_time { + self.obj().set_position(min_next_buffer_running_time); + } else { + self.obj() + .set_position(src_segment.position().opt_add(40.mseconds())); + } + + ret + } + + fn peek_next_sample(&self, pad: &gst_base::AggregatorPad) -> Option { + let pad = pad.downcast_ref::().unwrap(); + + let pad_state = pad.imp().pad_state.lock().unwrap(); + let caps = pad.current_caps()?; + if pad_state.queued_buffers.is_empty() { + return None; + } + + Some( + gst::Sample::builder() + .buffer_list( + &pad_state + .queued_buffers + .iter() + .cloned() + .collect::(), + ) + .segment(&pad.segment()) + .caps(&caps) + .build(), + ) + } + + fn next_time(&self) -> Option { + self.obj().simple_get_next_time() + } + + fn flush(&self) -> Result { + let mut state = self.state.lock().unwrap(); + *state = State::default(); + + self.obj() + .src_pad() + .segment() + .set_position(None::); + + Ok(gst::FlowSuccess::Ok) + } + + fn negotiate(&self) -> bool { + let templ_caps = self.obj().src_pad().pad_template_caps(); + let mut peer_caps = self.obj().src_pad().peer_query_caps(Some(&templ_caps)); + gst::debug!(CAT, imp = self, "Downstream caps {peer_caps:?}"); + + if peer_caps.is_empty() { + gst::warning!(CAT, imp = self, "Downstream returned EMPTY caps"); + return false; + } + + peer_caps.fixate(); + let framerate = peer_caps + .structure(0) + .unwrap() + .get::("framerate") + .ok(); + + let mut state = self.state.lock().unwrap(); + if let Some(framerate) = framerate { + gst::debug!( + CAT, + imp = self, + "Configuring downstream requested framerate {framerate}" + ); + state.downstream_framerate = Some(framerate); + drop(state); + + let duration = gst::ClockTime::SECOND + .nseconds() + .mul_div_round(framerate.denom() as u64, framerate.numer() as u64) + .unwrap() + .nseconds(); + + self.obj().set_latency(duration, duration); + } else { + gst::debug!(CAT, imp = self, "Downstream requested no framerate"); + state.downstream_framerate = None; + drop(state); + + // Assume 25fps as a worst case + self.obj().set_latency(40.mseconds(), None); + } + + self.obj().set_src_caps(&peer_caps); + + true + } + + fn sink_event(&self, aggregator_pad: &gst_base::AggregatorPad, event: gst::Event) -> bool { + #[allow(clippy::single_match)] + match event.view() { + gst::EventView::Segment(ev) => { + let segment = ev.segment(); + if segment.format() != gst::Format::Time { + gst::error!(CAT, imp = self, "Non-TIME segments not supported"); + return false; + } + } + _ => (), + } + + self.parent_sink_event(aggregator_pad, event) + } + + fn clip( + &self, + aggregator_pad: &gst_base::AggregatorPad, + buffer: gst::Buffer, + ) -> Option { + let Some(pts) = buffer.pts() else { + return Some(buffer); + }; + let segment = aggregator_pad.segment(); + segment + .downcast_ref::() + .map(|segment| segment.clip(pts, pts)) + .map(|_| buffer) + } + + fn start(&self) -> Result<(), gst::ErrorMessage> { + gst::trace!(CAT, imp = self, "Starting"); + let mut state = self.state.lock().unwrap(); + *state = State::default(); + + Ok(()) + } + + fn stop(&self) -> Result<(), gst::ErrorMessage> { + gst::trace!(CAT, imp = self, "Stopping"); + let mut state = self.state.lock().unwrap(); + *state = State::default(); + + Ok(()) + } +} + +impl ElementImpl for St2038AncMux { + fn metadata() -> Option<&'static gst::subclass::ElementMetadata> { + static ELEMENT_METADATA: Lazy = Lazy::new(|| { + gst::subclass::ElementMetadata::new( + "ST2038 Anc Mux", + "Muxer", + "Combines multiple ST2038 Anc streams", + "Sebastian Dröge ", + ) + }); + + Some(&*ELEMENT_METADATA) + } + + fn pad_templates() -> &'static [gst::PadTemplate] { + static PAD_TEMPLATES: Lazy> = Lazy::new(|| { + let caps = gst::Caps::new_empty_simple("meta/x-st-2038"); + let src_pad_template = gst::PadTemplate::builder( + "src", + gst::PadDirection::Src, + gst::PadPresence::Always, + &caps, + ) + .gtype(gst_base::AggregatorPad::static_type()) + .build() + .unwrap(); + + let sink_pad_template = gst::PadTemplate::builder( + "sink_%u", + gst::PadDirection::Sink, + gst::PadPresence::Request, + &caps, + ) + .gtype(super::St2038AncMuxSinkPad::static_type()) + .build() + .unwrap(); + + vec![src_pad_template, sink_pad_template] + }); + + PAD_TEMPLATES.as_ref() + } +} + +impl GstObjectImpl for St2038AncMux {} + +impl ObjectImpl for St2038AncMux {} + +#[glib::object_subclass] +impl ObjectSubclass for St2038AncMux { + const NAME: &'static str = "GstSt2038AncMux"; + type Type = super::St2038AncMux; + type ParentType = gst_base::Aggregator; +} + +#[derive(Default)] +struct PadState { + queued_buffers: Vec, +} + +#[derive(Default)] +pub struct St2038AncMuxSinkPad { + pad_state: Mutex, +} + +impl St2038AncMuxSinkPad {} + +impl AggregatorPadImpl for St2038AncMuxSinkPad { + fn flush( + &self, + _aggregator: &gst_base::Aggregator, + ) -> Result { + let mut state = self.pad_state.lock().unwrap(); + state.queued_buffers.clear(); + Ok(gst::FlowSuccess::Ok) + } +} + +impl PadImpl for St2038AncMuxSinkPad {} + +impl GstObjectImpl for St2038AncMuxSinkPad {} + +impl ObjectImpl for St2038AncMuxSinkPad {} + +#[glib::object_subclass] +impl ObjectSubclass for St2038AncMuxSinkPad { + const NAME: &'static str = "GstSt2038AncMuxSinkPad"; + type Type = super::St2038AncMuxSinkPad; + type ParentType = gst_base::AggregatorPad; +} diff --git a/video/closedcaption/src/st2038ancmux/mod.rs b/video/closedcaption/src/st2038ancmux/mod.rs new file mode 100644 index 00000000..c0cbe962 --- /dev/null +++ b/video/closedcaption/src/st2038ancmux/mod.rs @@ -0,0 +1,32 @@ +// Copyright (C) 2024 Sebastian Dröge +// +// This Source Code Form is subject to the terms of the Mozilla Public License, v2.0. +// If a copy of the MPL was not distributed with this file, You can obtain one at +// . +// +// SPDX-License-Identifier: MPL-2.0 + +use gst::glib; +use gst::prelude::*; + +mod imp; + +glib::wrapper! { + pub struct St2038AncMux(ObjectSubclass) @extends gst_base::Aggregator, gst::Element, gst::Object; +} + +glib::wrapper! { + pub struct St2038AncMuxSinkPad(ObjectSubclass) @extends gst_base::AggregatorPad, gst::Pad, gst::Object; +} + +pub fn register(plugin: &gst::Plugin) -> Result<(), glib::BoolError> { + #[cfg(feature = "doc")] + St2038AncMuxSinkPad::static_type().mark_as_plugin_api(gst::PluginAPIFlags::empty()); + + gst::Element::register( + Some(plugin), + "st2038ancmux", + gst::Rank::NONE, + St2038AncMux::static_type(), + ) +}