diff --git a/video/closedcaption/src/transcriberbin/imp.rs b/video/closedcaption/src/transcriberbin/imp.rs index 8053fc4f..79d9be7b 100644 --- a/video/closedcaption/src/transcriberbin/imp.rs +++ b/video/closedcaption/src/transcriberbin/imp.rs @@ -38,6 +38,7 @@ const DEFAULT_INPUT_LANG_CODE: &str = "en-US"; const DEFAULT_MUX_METHOD: MuxMethod = MuxMethod::Cea608; const CEAX08MUX_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(100); +const CCCOMBINER_LATENCY: gst::ClockTime = gst::ClockTime::from_mseconds(100); #[derive(Debug)] enum TargetPassthroughState { @@ -421,7 +422,7 @@ impl TranscriberBin { self.obj().add(&state.internal_bin)?; - state.cccombiner.set_property("latency", 100.mseconds()); + state.cccombiner.set_property("latency", CCCOMBINER_LATENCY); self.video_sinkpad .set_target(Some(&state.internal_bin.static_pad("video_sink").unwrap()))?; @@ -1108,39 +1109,31 @@ impl TranscriberBin { ); } - fn any_sink_is_translating(&self, state: &State) -> bool { - for pad in state.audio_sink_pads.values() { - let ps = pad.imp().state.lock().unwrap(); - let pad_state = ps.as_ref().unwrap(); - if pad_state - .transcription_channels - .values() - .any(|c| c.language != "transcript") - { - return true; - } - } - false - } + fn query_upstream_latency(&self, state: &State) -> gst::ClockTime { + let mut min = gst::ClockTime::from_seconds(0); - fn any_sink_is_rollup(&self, state: &State) -> bool { - for pad in state.audio_sink_pads.values() { - let pad_settings = pad.imp().settings.lock().unwrap(); - if pad_settings.mode.is_rollup() { - return true; - } - } - false - } + for pad in state + .audio_sink_pads + .values() + .map(|p| p.upcast_ref::()) + .chain( + [&self.video_sinkpad] + .iter() + .map(|p| p.upcast_ref::()), + ) + { + let mut upstream_query = gst::query::Latency::new(); - fn all_sinks_are_passthrough(&self, state: &State) -> bool { - for pad in state.audio_sink_pads.values() { - let pad_settings = pad.imp().settings.lock().unwrap(); - if !pad_settings.passthrough { - return false; + if pad.query(&mut upstream_query) { + let (_, upstream_min, _) = upstream_query.result(); + + if min < upstream_min { + min = upstream_min; + } } } - true + + min } #[allow(clippy::single_match)] @@ -1151,49 +1144,31 @@ impl TranscriberBin { match query.view_mut() { QueryViewMut::Latency(q) => { - let mut upstream_query = gst::query::Latency::new(); - - let ret = gst::Pad::query_default(pad, Some(&*self.obj()), &mut upstream_query); - - if ret { - let (_, mut min, _) = upstream_query.result(); - let state = self.state.lock().unwrap(); - let (received_framerate, translating, all_passthrough) = { - if let Some(state) = state.as_ref() { - ( - state.framerate, - self.any_sink_is_translating(state), - self.all_sinks_are_passthrough(state), - ) - } else { - (None, false, true) - } - }; - + let state = self.state.lock().unwrap(); + if let Some(state) = state.as_ref() { + let upstream_min = self.query_upstream_latency(state); let settings = self.settings.lock().unwrap(); - if all_passthrough || received_framerate.is_none() { - min += settings.latency + settings.accumulate_time + CEAX08MUX_LATENCY; + let min = upstream_min + + settings.latency + + settings.accumulate_time + + CEAX08MUX_LATENCY + + settings.translate_latency + + CCCOMBINER_LATENCY + + state + .framerate + .map(|f| { + 2 * gst::ClockTime::SECOND + .mul_div_floor(f.denom() as u64, f.numer() as u64) + .unwrap() + }) + .unwrap_or(gst::ClockTime::from_seconds(0)); - if translating { - min += settings.translate_latency; - } - - /* The sub latency introduced by ceax08mux */ - if let Some(framerate) = received_framerate { - min += gst::ClockTime::SECOND - .mul_div_floor(framerate.denom() as u64, framerate.numer() as u64) - .unwrap(); - } - } else if let Some(state) = state.as_ref() { - if self.any_sink_is_rollup(state) { - min += settings.accumulate_time; - } - } + gst::debug!(CAT, imp = self, "calculated latency: {}", min); q.set(true, min, gst::ClockTime::NONE); } - ret + true } _ => gst::Pad::query_default(pad, Some(&*self.obj()), query), }