threadshare: Make sure to shutdown sockets/queues without any mutexes locked

And make sure that the IOContext stays alive until they are fully done.
This commit is contained in:
Sebastian Dröge 2018-03-28 12:29:29 +03:00
parent b4d1145490
commit 6ebc8988b2
3 changed files with 74 additions and 33 deletions

View file

@ -426,7 +426,13 @@ impl ProxySink {
"Trying to empty pending queue" "Trying to empty pending queue"
); );
let mut queue = state.queue.as_ref().unwrap().0.lock().unwrap(); let mut queue = match state.queue {
Some(ref queue) => queue.0.lock().unwrap(),
None => {
return Ok(Async::Ready(()));
}
};
let SharedQueueInner { let SharedQueueInner {
ref mut pending_queue, ref mut pending_queue,
ref queue, ref queue,
@ -1161,24 +1167,38 @@ impl ProxySrc {
fn unprepare(&self, element: &Element) -> Result<(), ()> { fn unprepare(&self, element: &Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing"); gst_debug!(self.cat, obj: element, "Unpreparing");
let mut state = self.state.lock().unwrap(); // FIXME: The IO Context has to be alive longer than the queue,
if let Some(ref queue) = state.queue { // otherwise the queue can't finish any remaining work
let queue = queue.0.lock().unwrap(); let (mut queue, io_context) = {
let mut state = self.state.lock().unwrap();
if let Some(ref queue) = queue.queue { if let (&Some(ref pending_future_id), &Some(ref io_context)) =
queue.shutdown(); (&state.pending_future_id, &state.io_context)
{
io_context.release_pending_future_id(*pending_future_id);
} }
}
if let (&Some(ref pending_future_id), &Some(ref io_context)) = let queue = if let Some(ref queue) = state.queue.take() {
(&state.pending_future_id, &state.io_context) let mut queue = queue.0.lock().unwrap();
{ queue.queue.take()
io_context.release_pending_future_id(*pending_future_id); } else {
} None
};
*state = StateSrc::default(); let io_context = state.io_context.take();
*state = StateSrc::default();
(queue, io_context)
};
if let Some(ref queue) = queue.take() {
queue.shutdown();
}
drop(io_context);
gst_debug!(self.cat, obj: element, "Unprepared"); gst_debug!(self.cat, obj: element, "Unprepared");
Ok(()) Ok(())
} }

View file

@ -329,6 +329,10 @@ impl Queue {
.. ..
} = *state; } = *state;
if dq.is_none() {
return Ok(Async::Ready(()));
}
gst_log!( gst_log!(
queue.cat, queue.cat,
obj: &element_clone, obj: &element_clone,
@ -779,19 +783,28 @@ impl Queue {
fn unprepare(&self, element: &Element) -> Result<(), ()> { fn unprepare(&self, element: &Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing"); gst_debug!(self.cat, obj: element, "Unpreparing");
let mut state = self.state.lock().unwrap(); // FIXME: The IO Context has to be alive longer than the queue,
// otherwise the queue can't finish any remaining work
let (mut queue, io_context) = {
let mut state = self.state.lock().unwrap();
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
{
io_context.release_pending_future_id(*pending_future_id);
}
if let Some(ref queue) = state.queue { let queue = state.queue.take();
let io_context = state.io_context.take();
*state = State::default();
(queue, io_context)
};
if let Some(ref queue) = queue.take() {
queue.shutdown(); queue.shutdown();
} }
drop(io_context);
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
{
io_context.release_pending_future_id(*pending_future_id);
}
*state = State::default();
gst_debug!(self.cat, obj: element, "Unprepared"); gst_debug!(self.cat, obj: element, "Unprepared");
Ok(()) Ok(())

View file

@ -607,19 +607,27 @@ impl UdpSrc {
fn unprepare(&self, element: &Element) -> Result<(), ()> { fn unprepare(&self, element: &Element) -> Result<(), ()> {
gst_debug!(self.cat, obj: element, "Unpreparing"); gst_debug!(self.cat, obj: element, "Unpreparing");
let mut state = self.state.lock().unwrap(); // FIXME: The IO Context has to be alive longer than the queue,
// otherwise the queue can't finish any remaining work
let (mut socket, io_context) = {
let mut state = self.state.lock().unwrap();
if let Some(ref socket) = state.socket { if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
{
io_context.release_pending_future_id(*pending_future_id);
}
let socket = state.socket.take();
let io_context = state.io_context.take();
*state = State::default();
(socket, io_context)
};
if let Some(ref socket) = socket.take() {
socket.shutdown(); socket.shutdown();
} }
drop(io_context);
if let (&Some(ref pending_future_id), &Some(ref io_context)) =
(&state.pending_future_id, &state.io_context)
{
io_context.release_pending_future_id(*pending_future_id);
}
*state = State::default();
gst_debug!(self.cat, obj: element, "Unprepared"); gst_debug!(self.cat, obj: element, "Unprepared");
Ok(()) Ok(())