diff --git a/net/onvif/src/onvifmetadataparse/imp.rs b/net/onvif/src/onvifmetadataparse/imp.rs index c5d02bf0..9b6db23b 100644 --- a/net/onvif/src/onvifmetadataparse/imp.rs +++ b/net/onvif/src/onvifmetadataparse/imp.rs @@ -1232,14 +1232,86 @@ impl OnvifMetadataParse { fn src_loop(&self, element: &super::OnvifMetadataParse) -> Result<(), gst::FlowError> { let mut state = self.state.lock().unwrap(); - let mut clock_wait_time = None; + // Remember last clock wait time in case we got woken up slightly earlier than the timer + // and a bit more than up to the current clock time should be drained. + let mut last_clock_wait_time: Option = None; loop { // If flushing or any other error then just return here state.last_flow_ret?; - if !self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) { - if let Some(clock_wait) = state.clock_wait.clone() { + // Calculate running time until which to drain now + let mut drain_running_time = None; + if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) { + // Drain completely + gst::debug!(CAT, obj: element, "Sink pad is EOS, draining"); + } else if let Some((true, min_latency)) = state.upstream_latency { + // Drain until the current clock running time minus the configured latency when + // live + if let Some((now, base_time)) = Option::zip( + element.clock().and_then(|clock| clock.time()), + element.base_time(), + ) { + gst::trace!( + CAT, + obj: element, + "Clock time now {}, last timer was at {} and current timer at {}", + now, + last_clock_wait_time.display(), + state + .clock_wait + .as_ref() + .map(|clock_wait| clock_wait.time()) + .display(), + ); + + let now = + (now - base_time).saturating_sub(min_latency + state.configured_latency); + let now = if let Some(last_clock_wait_time) = last_clock_wait_time { + let last_clock_wait_time = (last_clock_wait_time - base_time) + .saturating_sub(min_latency + state.configured_latency); + std::cmp::max(now, last_clock_wait_time) + } else { + now + }; + + drain_running_time = Some(now.into_positive()); + } + } else { + // Otherwise if not live drain up to the last input running time minus the + // configured latency, i.e. keep only up to the configured latency queued + let current_running_time = state + .in_segment + .to_running_time_full(state.in_segment.position()); + drain_running_time = current_running_time.and_then(|current_running_time| { + current_running_time.checked_sub_unsigned(state.configured_latency) + }); + } + + // And drain up to that running time now, or everything if EOS + let data = if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) { + self.drain(element, &mut state, None)? + } else if let Some(drain_utc_time) = + Option::zip(drain_running_time, state.utc_time_running_time_mapping).and_then( + |(drain_running_time, utc_time_running_time_mapping)| { + running_time_to_utc_time(utc_time_running_time_mapping, drain_running_time) + }, + ) + { + self.drain(element, &mut state, Some(drain_utc_time))? + } else { + vec![] + }; + + // If there's nothing to drain currently then wait + last_clock_wait_time = None; + if data.is_empty() { + if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) { + state.last_flow_ret = Err(gst::FlowError::Eos); + gst::debug!(CAT, obj: element, "EOS, waiting on cond"); + state = self.cond.wait(state).unwrap(); + gst::trace!(CAT, obj: element, "Woke up"); + } else if let Some(clock_wait) = state.clock_wait.clone() { gst::trace!( CAT, obj: element, @@ -1247,15 +1319,22 @@ impl OnvifMetadataParse { clock_wait.time(), clock_wait.clock().and_then(|clock| clock.time()).display(), ); - clock_wait_time = Some(clock_wait.time()); drop(state); let res = clock_wait.wait(); state = self.state.lock().unwrap(); + // Unset clock wait if it didn't change in the meantime: waiting again + // on it is going to return immediately and instead if there's nothing + // new to drain then we need to wait on the condvar + if state.clock_wait.as_ref() == Some(&clock_wait) { + state.clock_wait = None; + } + match res { (Ok(_), jitter) => { gst::trace!(CAT, obj: element, "Woke up after waiting for {}", jitter); + last_clock_wait_time = Some(clock_wait.time()); } (Err(err), jitter) => { gst::trace!( @@ -1265,90 +1344,16 @@ impl OnvifMetadataParse { err, jitter ); - - // If unscheduled wait again or return immediately above if flushing - if err == gst::ClockError::Unscheduled { - continue; - } } } } else { gst::debug!(CAT, obj: element, "Waiting on cond"); state = self.cond.wait(state).unwrap(); - - if state.clock_wait.is_some() { - gst::trace!(CAT, obj: element, "Got timer now, waiting again"); - continue; - } - gst::trace!(CAT, obj: element, "Woke up and checking for data to drain"); + gst::trace!(CAT, obj: element, "Woke up"); } - } - break; - } - - // If flushing or any other error then just return here - state.last_flow_ret?; - - let res = loop { - // Calculate running time until which to drain now - let mut drain_running_time = None; - if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) { - // Drain completely - gst::debug!(CAT, obj: element, "Sink pad is EOS, draining"); - } else if let Some((true, min_latency)) = state.upstream_latency { - if let Some((now, base_time)) = Option::zip( - element.clock().and_then(|clock| clock.time()), - element.base_time(), - ) { - gst::trace!( - CAT, - obj: element, - "Clock time now {}, timer at {}", - now, - clock_wait_time.display() - ); - - let now = - (now - base_time).saturating_sub(min_latency + state.configured_latency); - let now = if let Some(clock_wait_time) = clock_wait_time { - let clock_wait_time = (clock_wait_time - base_time) - .saturating_sub(min_latency + state.configured_latency); - std::cmp::max(now, clock_wait_time) - } else { - now - }; - - drain_running_time = Some(now.into_positive()); - } - } else { - let current_running_time = state - .in_segment - .to_running_time_full(state.in_segment.position()); - drain_running_time = current_running_time.and_then(|current_running_time| { - current_running_time.checked_sub_unsigned(state.configured_latency) - }); - } - - // And drain up to that running time now - let data = if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) { - self.drain(element, &mut state, None)? - } else if let Some((drain_running_time, utc_time_running_time_mapping)) = - Option::zip(drain_running_time, state.utc_time_running_time_mapping) - { - if let Some(drain_utc_time) = - running_time_to_utc_time(utc_time_running_time_mapping, drain_running_time) - { - self.drain(element, &mut state, Some(drain_utc_time))? - } else { - vec![] - } - } else { - vec![] - }; - - if data.is_empty() { - break state.last_flow_ret.map(|_| ()); + // And retry if there's anything to drain now. + continue; } drop(state); @@ -1374,15 +1379,17 @@ impl OnvifMetadataParse { gst::trace!(CAT, obj: element, "Pushing returned {:?}", res); state = self.state.lock().unwrap(); - // If flushing or any other error then just return here state.last_flow_ret?; + state.last_flow_ret = res.map(|_| gst::FlowSuccess::Ok); + // Schedule a new clock wait now that data was drained in case we have to wait some + // more time into the future on the next iteration self.reschedule_clock_wait(element, &mut state); - }; - res + // Loop and check if more data has to be drained now + } } }