threadshare: Only take the current queue levels into account instead of the future ones

Otherwise we might never ever enqueue a single buffer if it is already
by itself going over the limits.
This commit is contained in:
Sebastian Dröge 2018-03-26 18:19:02 +03:00
parent e03c27814b
commit 811893ccf9

View file

@ -154,21 +154,6 @@ impl DataQueueItem {
DataQueueItem::Event(_) => None, DataQueueItem::Event(_) => None,
} }
} }
fn timestamp_end(&self) -> Option<u64> {
match *self {
DataQueueItem::Buffer(ref buffer) => buffer
.get_dts_or_pts()
.map(|ts| ts + buffer.get_duration().unwrap_or(0)),
DataQueueItem::BufferList(ref list) => list.iter()
.filter_map(|b| {
b.get_dts_or_pts()
.map(|ts| (ts + b.get_duration().unwrap_or(0)))
})
.last(),
DataQueueItem::Event(_) => None,
}
}
} }
#[derive(PartialEq, Eq, Debug)] #[derive(PartialEq, Eq, Debug)]
@ -363,32 +348,26 @@ impl DataQueue {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Pushing item {:?}", item); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Pushing item {:?}", item);
let (count, bytes) = item.size(); let (count, bytes) = item.size();
let ts_end = item.timestamp_end();
let ts = inner.queue.iter().filter_map(|i| i.timestamp()).next(); let ts = inner.queue.iter().filter_map(|i| i.timestamp()).next();
if let Some(max) = inner.max_size_buffers { if let Some(max) = inner.max_size_buffers {
if max <= inner.cur_size_buffers + count { if max <= inner.cur_size_buffers {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (buffers): {} <= {}", max, inner.cur_size_buffers + count); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (buffers): {} <= {}", max, inner.cur_size_buffers);
return Err(item); return Err(item);
} }
} }
if let Some(max) = inner.max_size_bytes { if let Some(max) = inner.max_size_bytes {
if max <= inner.cur_size_bytes + bytes { if max <= inner.cur_size_bytes {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (bytes): {} <= {}", max, inner.cur_size_bytes + bytes); gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (bytes): {} <= {}", max, inner.cur_size_bytes);
return Err(item); return Err(item);
} }
} }
// FIXME: Use running time // FIXME: Use running time
if let (Some(max), Some(ts), Some(ts_end)) = (inner.max_size_time, ts, ts_end) { if let (Some(max), Some(ts)) = (inner.max_size_time, ts) {
let level = if ts > ts_end { if max <= ts {
ts - ts_end gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (time): {} <= {}", max, ts);
} else {
ts_end - ts
};
if max <= level {
gst_debug!(DATA_QUEUE_CAT, obj: &inner.element, "Queue is full (time): {} <= {}", max, level);
return Err(item); return Err(item);
} }
} }