threadshare: Try pushing pending items immediately if we did not schedule a future for it yet

It might not be necessary to first go through a future, we might
directly be able to push them now.
This commit is contained in:
Sebastian Dröge 2018-05-17 11:59:04 +03:00
parent 7ac9534322
commit c5d901609f
2 changed files with 46 additions and 11 deletions

View file

@ -377,14 +377,33 @@ impl ProxySink {
let mut queue = queue.0.lock().unwrap();
let item = if queue.pending_queue.is_none() {
// The source might not have started yet
match queue.queue {
Some(ref queue) => queue.push(item),
None => Err(item),
let item = {
let SharedQueueInner {
ref mut pending_queue,
ref queue,
..
} = *queue;
match (pending_queue, queue) {
(None, Some(ref queue)) => queue.push(item),
(Some((_, false, ref mut items)), Some(ref queue)) => {
let mut failed_item = None;
while let Some(item) = items.pop_front() {
if let Err(item) = queue.push(item) {
failed_item = Some(item);
}
}
if let Some(failed_item) = failed_item {
items.push_front(failed_item);
Err(item)
} else {
queue.push(item)
}
}
_ => Err(item),
}
} else {
Err(item)
};
if let Err(item) = item {

View file

@ -286,11 +286,27 @@ impl Queue {
Some(ref queue) => queue,
};
let item = if pending_queue.is_none() {
queue.push(item)
} else {
Err(item)
let item = match pending_queue {
None => queue.push(item),
Some((_, false, ref mut items)) => {
let mut failed_item = None;
while let Some(item) = items.pop_front() {
if let Err(item) = queue.push(item) {
failed_item = Some(item);
}
}
if let Some(failed_item) = failed_item {
items.push_front(failed_item);
Err(item)
} else {
queue.push(item)
}
}
_ => Err(item),
};
if let Err(item) = item {
if pending_queue
.as_ref()