onvifmetadataparse: Refactor clock/condvar waiting

Always first try draining queued data in the loop and only start waiting
if there's nothing to drain right now. Otherwise data might have to be
drained right now but we still wait and nothing is ever waking up the
source pad task again.

Also make sure to not wait multiple times on the same gst::ClockId but
instead unset it after waiting on it and no new one was scheduled in the
meantime. Future waits on the same ClockId will immediately return and
instead we should wait on the condvar if no new ClockId is available.
This commit is contained in:
Sebastian Dröge 2022-09-23 12:47:26 +03:00
parent c4d2f4a60a
commit 692a063528

View file

@ -1232,14 +1232,86 @@ impl OnvifMetadataParse {
fn src_loop(&self, element: &super::OnvifMetadataParse) -> Result<(), gst::FlowError> {
let mut state = self.state.lock().unwrap();
let mut clock_wait_time = None;
// Remember last clock wait time in case we got woken up slightly earlier than the timer
// and a bit more than up to the current clock time should be drained.
let mut last_clock_wait_time: Option<gst::ClockTime> = None;
loop {
// If flushing or any other error then just return here
state.last_flow_ret?;
if !self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) {
if let Some(clock_wait) = state.clock_wait.clone() {
// Calculate running time until which to drain now
let mut drain_running_time = None;
if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) {
// Drain completely
gst::debug!(CAT, obj: element, "Sink pad is EOS, draining");
} else if let Some((true, min_latency)) = state.upstream_latency {
// Drain until the current clock running time minus the configured latency when
// live
if let Some((now, base_time)) = Option::zip(
element.clock().and_then(|clock| clock.time()),
element.base_time(),
) {
gst::trace!(
CAT,
obj: element,
"Clock time now {}, last timer was at {} and current timer at {}",
now,
last_clock_wait_time.display(),
state
.clock_wait
.as_ref()
.map(|clock_wait| clock_wait.time())
.display(),
);
let now =
(now - base_time).saturating_sub(min_latency + state.configured_latency);
let now = if let Some(last_clock_wait_time) = last_clock_wait_time {
let last_clock_wait_time = (last_clock_wait_time - base_time)
.saturating_sub(min_latency + state.configured_latency);
std::cmp::max(now, last_clock_wait_time)
} else {
now
};
drain_running_time = Some(now.into_positive());
}
} else {
// Otherwise if not live drain up to the last input running time minus the
// configured latency, i.e. keep only up to the configured latency queued
let current_running_time = state
.in_segment
.to_running_time_full(state.in_segment.position());
drain_running_time = current_running_time.and_then(|current_running_time| {
current_running_time.checked_sub_unsigned(state.configured_latency)
});
}
// And drain up to that running time now, or everything if EOS
let data = if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) {
self.drain(element, &mut state, None)?
} else if let Some(drain_utc_time) =
Option::zip(drain_running_time, state.utc_time_running_time_mapping).and_then(
|(drain_running_time, utc_time_running_time_mapping)| {
running_time_to_utc_time(utc_time_running_time_mapping, drain_running_time)
},
)
{
self.drain(element, &mut state, Some(drain_utc_time))?
} else {
vec![]
};
// If there's nothing to drain currently then wait
last_clock_wait_time = None;
if data.is_empty() {
if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) {
state.last_flow_ret = Err(gst::FlowError::Eos);
gst::debug!(CAT, obj: element, "EOS, waiting on cond");
state = self.cond.wait(state).unwrap();
gst::trace!(CAT, obj: element, "Woke up");
} else if let Some(clock_wait) = state.clock_wait.clone() {
gst::trace!(
CAT,
obj: element,
@ -1247,15 +1319,22 @@ impl OnvifMetadataParse {
clock_wait.time(),
clock_wait.clock().and_then(|clock| clock.time()).display(),
);
clock_wait_time = Some(clock_wait.time());
drop(state);
let res = clock_wait.wait();
state = self.state.lock().unwrap();
// Unset clock wait if it didn't change in the meantime: waiting again
// on it is going to return immediately and instead if there's nothing
// new to drain then we need to wait on the condvar
if state.clock_wait.as_ref() == Some(&clock_wait) {
state.clock_wait = None;
}
match res {
(Ok(_), jitter) => {
gst::trace!(CAT, obj: element, "Woke up after waiting for {}", jitter);
last_clock_wait_time = Some(clock_wait.time());
}
(Err(err), jitter) => {
gst::trace!(
@ -1265,91 +1344,17 @@ impl OnvifMetadataParse {
err,
jitter
);
// If unscheduled wait again or return immediately above if flushing
if err == gst::ClockError::Unscheduled {
continue;
}
}
}
} else {
gst::debug!(CAT, obj: element, "Waiting on cond");
state = self.cond.wait(state).unwrap();
gst::trace!(CAT, obj: element, "Woke up");
}
if state.clock_wait.is_some() {
gst::trace!(CAT, obj: element, "Got timer now, waiting again");
// And retry if there's anything to drain now.
continue;
}
gst::trace!(CAT, obj: element, "Woke up and checking for data to drain");
}
}
break;
}
// If flushing or any other error then just return here
state.last_flow_ret?;
let res = loop {
// Calculate running time until which to drain now
let mut drain_running_time = None;
if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) {
// Drain completely
gst::debug!(CAT, obj: element, "Sink pad is EOS, draining");
} else if let Some((true, min_latency)) = state.upstream_latency {
if let Some((now, base_time)) = Option::zip(
element.clock().and_then(|clock| clock.time()),
element.base_time(),
) {
gst::trace!(
CAT,
obj: element,
"Clock time now {}, timer at {}",
now,
clock_wait_time.display()
);
let now =
(now - base_time).saturating_sub(min_latency + state.configured_latency);
let now = if let Some(clock_wait_time) = clock_wait_time {
let clock_wait_time = (clock_wait_time - base_time)
.saturating_sub(min_latency + state.configured_latency);
std::cmp::max(now, clock_wait_time)
} else {
now
};
drain_running_time = Some(now.into_positive());
}
} else {
let current_running_time = state
.in_segment
.to_running_time_full(state.in_segment.position());
drain_running_time = current_running_time.and_then(|current_running_time| {
current_running_time.checked_sub_unsigned(state.configured_latency)
});
}
// And drain up to that running time now
let data = if self.sinkpad.pad_flags().contains(gst::PadFlags::EOS) {
self.drain(element, &mut state, None)?
} else if let Some((drain_running_time, utc_time_running_time_mapping)) =
Option::zip(drain_running_time, state.utc_time_running_time_mapping)
{
if let Some(drain_utc_time) =
running_time_to_utc_time(utc_time_running_time_mapping, drain_running_time)
{
self.drain(element, &mut state, Some(drain_utc_time))?
} else {
vec![]
}
} else {
vec![]
};
if data.is_empty() {
break state.last_flow_ret.map(|_| ());
}
drop(state);
@ -1374,15 +1379,17 @@ impl OnvifMetadataParse {
gst::trace!(CAT, obj: element, "Pushing returned {:?}", res);
state = self.state.lock().unwrap();
// If flushing or any other error then just return here
state.last_flow_ret?;
state.last_flow_ret = res.map(|_| gst::FlowSuccess::Ok);
// Schedule a new clock wait now that data was drained in case we have to wait some
// more time into the future on the next iteration
self.reschedule_clock_wait(element, &mut state);
};
res
// Loop and check if more data has to be drained now
}
}
}