threadshare: Schedule the pending queue in queue/proxysink only on EOS or the first buffer/buffer list

We will get the custom sticky downstream event with the IO context only
after stream-start and others, so would potentially block the current
thread from another futures executor, which then panics. Instead let's
just queue up those events for the time being until a later time.
This commit is contained in:
Sebastian Dröge 2018-05-16 17:32:35 +03:00
parent 3a8ce35e60
commit 51aa06d013
2 changed files with 184 additions and 134 deletions

View file

@ -215,7 +215,7 @@ impl Drop for SharedQueue {
let mut inner = self.0.lock().unwrap(); let mut inner = self.0.lock().unwrap();
assert!(inner.have_sink); assert!(inner.have_sink);
inner.have_sink = false; inner.have_sink = false;
if let Some((Some(task), _)) = inner.pending_queue.take() { if let Some((Some(task), _, _)) = inner.pending_queue.take() {
task.notify(); task.notify();
} }
} else { } else {
@ -232,7 +232,7 @@ struct SharedQueueInner {
name: String, name: String,
queue: Option<DataQueue>, queue: Option<DataQueue>,
last_ret: gst::FlowReturn, last_ret: gst::FlowReturn,
pending_queue: Option<(Option<task::Task>, VecDeque<DataQueueItem>)>, pending_queue: Option<(Option<task::Task>, bool, VecDeque<DataQueueItem>)>,
pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>, pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>,
have_sink: bool, have_sink: bool,
have_src: bool, have_src: bool,
@ -388,9 +388,24 @@ impl ProxySink {
}; };
if let Err(item) = item { if let Err(item) = item {
if queue.pending_queue.is_none() { if queue
queue.pending_queue = Some((None, VecDeque::new())); .pending_queue
queue.pending_queue.as_mut().unwrap().1.push_back(item); .as_ref()
.map(|(_, scheduled, _)| !scheduled)
.unwrap_or(true)
{
if queue.pending_queue.is_none() {
queue.pending_queue = Some((None, false, VecDeque::new()));
}
let schedule_now = match item {
DataQueueItem::Event(ref ev) if ev.get_type() != gst::EventType::Eos => {
false
}
_ => true,
};
queue.pending_queue.as_mut().unwrap().2.push_back(item);
gst_log!( gst_log!(
self.cat, self.cat,
@ -398,95 +413,105 @@ impl ProxySink {
"Proxy is full - Pushing first item on pending queue" "Proxy is full - Pushing first item on pending queue"
); );
let element_clone = element.clone(); if schedule_now {
let future = future::poll_fn(move || { gst_log!(self.cat, obj: element, "Scheduling pending queue now");
let sink = element_clone
.get_impl()
.downcast_ref::<ProxySink>()
.unwrap();
let state = sink.state.lock().unwrap();
gst_log!( queue.pending_queue.as_mut().unwrap().1 = true;
sink.cat,
obj: &element_clone,
"Trying to empty pending queue"
);
let mut queue = match state.queue { let element_clone = element.clone();
Some(ref queue) => queue.0.lock().unwrap(), let future = future::poll_fn(move || {
None => { let sink = element_clone
return Ok(Async::Ready(())); .get_impl()
} .downcast_ref::<ProxySink>()
}; .unwrap();
let state = sink.state.lock().unwrap();
let SharedQueueInner { gst_log!(
ref mut pending_queue, sink.cat,
ref queue, obj: &element_clone,
.. "Trying to empty pending queue"
} = *queue; );
let res = if let Some((ref mut task, ref mut items)) = *pending_queue { let mut queue = match state.queue {
if let &Some(ref queue) = queue { Some(ref queue) => queue.0.lock().unwrap(),
let mut failed_item = None; None => {
for item in items.drain(..) { return Ok(Async::Ready(()));
if let Err(item) = queue.push(item) {
failed_item = Some(item);
break;
}
} }
};
if let Some(item) = failed_item { let SharedQueueInner {
items.push_front(item); ref mut pending_queue,
*task = Some(task::current()); ref queue,
gst_log!( ..
sink.cat, } = *queue;
obj: &element_clone,
"Waiting for more queue space" let res = if let Some((ref mut task, _, ref mut items)) = *pending_queue
); {
Ok(Async::NotReady) if let &Some(ref queue) = queue {
let mut failed_item = None;
for item in items.drain(..) {
if let Err(item) = queue.push(item) {
failed_item = Some(item);
break;
}
}
if let Some(item) = failed_item {
items.push_front(item);
*task = Some(task::current());
gst_log!(
sink.cat,
obj: &element_clone,
"Waiting for more queue space"
);
Ok(Async::NotReady)
} else {
gst_log!(
sink.cat,
obj: &element_clone,
"Pending queue is empty now"
);
Ok(Async::Ready(()))
}
} else { } else {
gst_log!( gst_log!(
sink.cat, sink.cat,
obj: &element_clone, obj: &element_clone,
"Pending queue is empty now" "Waiting for queue to be allocated"
); );
Ok(Async::Ready(())) Ok(Async::NotReady)
} }
} else { } else {
gst_log!( gst_log!(
sink.cat, sink.cat,
obj: &element_clone, obj: &element_clone,
"Waiting for queue to be allocated" "Flushing, dropping pending queue"
); );
Ok(Async::NotReady) Ok(Async::Ready(()))
};
if res == Ok(Async::Ready(())) {
*pending_queue = None;
} }
res
});
if let (Some(io_context), Some(pending_future_id)) =
(io_context.as_ref(), pending_future_id.as_ref())
{
io_context.add_pending_future(*pending_future_id, future);
None
} else { } else {
gst_log!( Some(future)
sink.cat,
obj: &element_clone,
"Flushing, dropping pending queue"
);
Ok(Async::Ready(()))
};
if res == Ok(Async::Ready(())) {
*pending_queue = None;
} }
res
});
if let (Some(io_context), Some(pending_future_id)) =
(io_context.as_ref(), pending_future_id.as_ref())
{
io_context.add_pending_future(*pending_future_id, future);
None
} else { } else {
Some(future) gst_log!(self.cat, obj: element, "Scheduling pending queue later");
None
} }
} else { } else {
assert!(io_context.is_some()); queue.pending_queue.as_mut().unwrap().2.push_back(item);
queue.pending_queue.as_mut().unwrap().1.push_back(item);
None None
} }
@ -645,7 +670,7 @@ impl ProxySink {
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap(); let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap();
if let Some((Some(task), _)) = queue.pending_queue.take() { if let Some((Some(task), _, _)) = queue.pending_queue.take() {
task.notify(); task.notify();
} }
queue.last_ret = gst::FlowReturn::Flushing; queue.last_ret = gst::FlowReturn::Flushing;
@ -908,7 +933,7 @@ impl ProxySrc {
let event = { let event = {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
let queue = state.queue.as_ref().unwrap().0.lock().unwrap(); let queue = state.queue.as_ref().unwrap().0.lock().unwrap();
if let Some((Some(ref task), _)) = queue.pending_queue { if let Some((Some(ref task), _, _)) = queue.pending_queue {
task.notify(); task.notify();
} }

View file

@ -123,7 +123,7 @@ struct State {
io_context_in: Option<IOContext>, io_context_in: Option<IOContext>,
pending_future_id_in: Option<PendingFutureId>, pending_future_id_in: Option<PendingFutureId>,
queue: Option<DataQueue>, queue: Option<DataQueue>,
pending_queue: Option<(Option<task::Task>, VecDeque<DataQueueItem>)>, pending_queue: Option<(Option<task::Task>, bool, VecDeque<DataQueueItem>)>,
last_ret: gst::FlowReturn, last_ret: gst::FlowReturn,
pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>, pending_future_cancel: Option<futures::sync::oneshot::Sender<()>>,
} }
@ -292,9 +292,23 @@ impl Queue {
Err(item) Err(item)
}; };
if let Err(item) = item { if let Err(item) = item {
if pending_queue.is_none() { if pending_queue
*pending_queue = Some((None, VecDeque::new())); .as_ref()
pending_queue.as_mut().unwrap().1.push_back(item); .map(|(_, scheduled, _)| !scheduled)
.unwrap_or(true)
{
if pending_queue.is_none() {
*pending_queue = Some((None, false, VecDeque::new()));
}
let schedule_now = match item {
DataQueueItem::Event(ref ev) if ev.get_type() != gst::EventType::Eos => {
false
}
_ => true,
};
pending_queue.as_mut().unwrap().2.push_back(item);
gst_log!( gst_log!(
self.cat, self.cat,
@ -302,79 +316,90 @@ impl Queue {
"Queue is full - Pushing first item on pending queue" "Queue is full - Pushing first item on pending queue"
); );
let element_clone = element.clone(); if schedule_now {
let future = future::poll_fn(move || { gst_log!(self.cat, obj: element, "Scheduling pending queue now");
let queue = element_clone.get_impl().downcast_ref::<Queue>().unwrap();
let mut state = queue.state.lock().unwrap();
let State { pending_queue.as_mut().unwrap().1 = true;
queue: ref dq,
ref mut pending_queue,
..
} = *state;
if dq.is_none() { let element_clone = element.clone();
return Ok(Async::Ready(())); let future = future::poll_fn(move || {
} let queue = element_clone.get_impl().downcast_ref::<Queue>().unwrap();
let mut state = queue.state.lock().unwrap();
gst_log!( let State {
queue.cat, queue: ref dq,
obj: &element_clone, ref mut pending_queue,
"Trying to empty pending queue" ..
); } = *state;
let res = if let Some((ref mut task, ref mut items)) = *pending_queue {
let mut failed_item = None; if dq.is_none() {
for item in items.drain(..) { return Ok(Async::Ready(()));
if let Err(item) = dq.as_ref().unwrap().push(item) {
failed_item = Some(item);
break;
}
} }
if let Some(item) = failed_item { gst_log!(
items.push_front(item); queue.cat,
*task = Some(task::current()); obj: &element_clone,
gst_log!( "Trying to empty pending queue"
queue.cat, );
obj: &element_clone, let res = if let Some((ref mut task, _, ref mut items)) = *pending_queue
"Waiting for more queue space" {
); let mut failed_item = None;
Ok(Async::NotReady) for item in items.drain(..) {
if let Err(item) = dq.as_ref().unwrap().push(item) {
failed_item = Some(item);
break;
}
}
if let Some(item) = failed_item {
items.push_front(item);
*task = Some(task::current());
gst_log!(
queue.cat,
obj: &element_clone,
"Waiting for more queue space"
);
Ok(Async::NotReady)
} else {
gst_log!(
queue.cat,
obj: &element_clone,
"Pending queue is empty now"
);
Ok(Async::Ready(()))
}
} else { } else {
gst_log!( gst_log!(
queue.cat, queue.cat,
obj: &element_clone, obj: &element_clone,
"Pending queue is empty now" "Flushing, dropping pending queue"
); );
Ok(Async::Ready(())) Ok(Async::Ready(()))
};
if res == Ok(Async::Ready(())) {
*pending_queue = None;
} }
res
});
if let (Some(io_context_in), Some(pending_future_id_in)) =
(io_context_in.as_ref(), pending_future_id_in.as_ref())
{
io_context_in.add_pending_future(*pending_future_id_in, future);
None
} else { } else {
gst_log!( Some(future)
queue.cat,
obj: &element_clone,
"Flushing, dropping pending queue"
);
Ok(Async::Ready(()))
};
if res == Ok(Async::Ready(())) {
*pending_queue = None;
} }
res
});
if let (Some(io_context_in), Some(pending_future_id_in)) =
(io_context_in.as_ref(), pending_future_id_in.as_ref())
{
io_context_in.add_pending_future(*pending_future_id_in, future);
None
} else { } else {
Some(future) gst_log!(self.cat, obj: element, "Scheduling pending queue later");
None
} }
} else { } else {
assert!(io_context_in.is_some()); assert!(io_context_in.is_some());
pending_queue.as_mut().unwrap().1.push_back(item); pending_queue.as_mut().unwrap().2.push_back(item);
None None
} }
@ -562,7 +587,7 @@ impl Queue {
> { > {
let event = { let event = {
let state = self.state.lock().unwrap(); let state = self.state.lock().unwrap();
if let Some((Some(ref task), _)) = state.pending_queue { if let Some((Some(ref task), _, _)) = state.pending_queue {
task.notify(); task.notify();
} }
@ -793,7 +818,7 @@ impl Queue {
queue.pause(); queue.pause();
queue.clear(&self.src_pad); queue.clear(&self.src_pad);
} }
if let Some((Some(task), _)) = state.pending_queue.take() { if let Some((Some(task), _, _)) = state.pending_queue.take() {
task.notify(); task.notify();
} }
let _ = state.pending_future_cancel.take(); let _ = state.pending_future_cancel.take();