reqwesthttpsrc: Drop the receiver before stopping the element

Otherwise there might still some thread waiting for the receiver to
consume a message and we would deadlock here. Dropping the receiver
drops all messages and unblocks them all immediately.

Also don't panic when sending messages to a dropped receiver.
This commit is contained in:
Sebastian Dröge 2019-07-04 18:12:01 +03:00
parent 2df6a5e049
commit 0a008354a4

View file

@ -26,7 +26,7 @@ fn init() {
struct Harness { struct Harness {
src: gst::Element, src: gst::Element,
pad: gst::Pad, pad: gst::Pad,
receiver: mpsc::Receiver<Message>, receiver: Option<mpsc::Receiver<Message>>,
rt: Option<tokio::runtime::Runtime>, rt: Option<tokio::runtime::Runtime>,
} }
@ -73,19 +73,20 @@ impl Harness {
// Collect all buffers, events and messages sent from the source // Collect all buffers, events and messages sent from the source
let sender_clone = sender.clone(); let sender_clone = sender.clone();
pad.set_chain_function(move |_pad, _parent, buffer| { pad.set_chain_function(move |_pad, _parent, buffer| {
sender_clone.send(Message::Buffer(buffer)).unwrap(); let _ = sender_clone.send(Message::Buffer(buffer));
Ok(gst::FlowSuccess::Ok) Ok(gst::FlowSuccess::Ok)
}); });
let sender_clone = sender.clone(); let sender_clone = sender.clone();
pad.set_event_function(move |_pad, _parent, event| { pad.set_event_function(move |_pad, _parent, event| {
sender_clone.send(Message::Event(event)).unwrap(); let _ = sender_clone.send(Message::Event(event));
true true
}); });
let bus = gst::Bus::new(); let bus = gst::Bus::new();
bus.set_flushing(false);
src.set_bus(Some(&bus)); src.set_bus(Some(&bus));
let sender_clone = sender.clone(); let sender_clone = sender.clone();
bus.set_sync_handler(move |_bus, msg| { bus.set_sync_handler(move |_bus, msg| {
sender_clone.send(Message::Message(msg.clone())).unwrap(); let _ = sender_clone.send(Message::Message(msg.clone()));
gst::BusSyncReply::Drop gst::BusSyncReply::Drop
}); });
@ -119,9 +120,7 @@ impl Harness {
// Spawn the server in the background so that it can handle requests // Spawn the server in the background so that it can handle requests
rt.spawn(server.map_err(move |e| { rt.spawn(server.map_err(move |e| {
sender let _ = sender.send(Message::ServerError(format!("{:?}", e)));
.send(Message::ServerError(format!("{:?}", e)))
.unwrap();
})); }));
// Let the test setup anything needed on the HTTP source now // Let the test setup anything needed on the HTTP source now
@ -130,7 +129,7 @@ impl Harness {
Harness { Harness {
src, src,
pad, pad,
receiver, receiver: Some(receiver),
rt: Some(rt), rt: Some(rt),
} }
} }
@ -140,7 +139,7 @@ impl Harness {
/// This function will panic on errors. /// This function will panic on errors.
fn wait_buffer_or_eos(&mut self) -> Option<gst::Buffer> { fn wait_buffer_or_eos(&mut self) -> Option<gst::Buffer> {
loop { loop {
match self.receiver.recv().unwrap() { match self.receiver.as_mut().unwrap().recv().unwrap() {
Message::ServerError(err) => { Message::ServerError(err) => {
panic!("Got server error: {}", err); panic!("Got server error: {}", err);
} }
@ -187,6 +186,12 @@ impl Drop for Harness {
// and wait until the tokio runtime exited // and wait until the tokio runtime exited
let bus = self.src.get_bus().unwrap(); let bus = self.src.get_bus().unwrap();
bus.set_flushing(true); bus.set_flushing(true);
// Drop the receiver first before setting the state so that
// any threads that might still be blocked on the sender
// are immediately unblocked
self.receiver.take().unwrap();
self.pad.set_active(false).unwrap(); self.pad.set_active(false).unwrap();
self.src.set_state(gst::State::Null).unwrap(); self.src.set_state(gst::State::Null).unwrap();