app: Handle panicking callbacks by converting into an error message

And never calling the callbacks again but instead just failing.

Fixes https://gitlab.freedesktop.org/gstreamer/gstreamer-rs/issues/241
This commit is contained in:
Sebastian Dröge 2020-03-05 10:31:48 +02:00
parent 4e30798ac7
commit 01da01c9e6
2 changed files with 136 additions and 6 deletions

View file

@ -12,12 +12,15 @@ use glib::signal::SignalHandlerId;
use glib::translate::*;
use glib_sys::gpointer;
use gst;
use gst::gst_element_error;
use gst_app_sys;
use gst_sys;
use std::boxed::Box as Box_;
use std::cell::RefCell;
use std::mem::transmute;
use std::panic;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use AppSink;
#[cfg(any(feature = "v1_10"))]
@ -48,6 +51,7 @@ pub struct AppSinkCallbacks {
Box<dyn FnMut(&AppSink) -> Result<gst::FlowSuccess, gst::FlowError> + Send + 'static>,
>,
>,
panicked: AtomicBool,
callbacks: gst_app_sys::GstAppSinkCallbacks,
}
@ -122,6 +126,7 @@ impl AppSinkCallbacksBuilder {
eos: self.eos,
new_preroll: self.new_preroll,
new_sample: self.new_sample,
panicked: AtomicBool::new(false),
callbacks: gst_app_sys::GstAppSinkCallbacks {
eos: if have_eos { Some(trampoline_eos) } else { None },
new_preroll: if have_new_preroll {
@ -145,11 +150,37 @@ impl AppSinkCallbacksBuilder {
}
}
fn post_panic_error_message(element: &AppSink, err: &dyn std::any::Any) {
if let Some(cause) = err.downcast_ref::<&str>() {
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked: {}", cause]);
} else if let Some(cause) = err.downcast_ref::<String>() {
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked: {}", cause]);
} else {
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked"]);
}
}
unsafe extern "C" fn trampoline_eos(appsink: *mut gst_app_sys::GstAppSink, callbacks: gpointer) {
let callbacks = &*(callbacks as *const AppSinkCallbacks);
let element: AppSink = from_glib_borrow(appsink);
if callbacks.panicked.load(Ordering::Relaxed) {
let element: AppSink = from_glib_borrow(appsink);
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked"]);
return;
}
if let Some(ref eos) = callbacks.eos {
(&mut *eos.borrow_mut())(&from_glib_borrow(appsink))
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
(&mut *eos.borrow_mut())(&element)
}));
match result {
Ok(result) => result,
Err(err) => {
callbacks.panicked.store(true, Ordering::Relaxed);
post_panic_error_message(&element, &err);
}
}
}
}
@ -158,9 +189,27 @@ unsafe extern "C" fn trampoline_new_preroll(
callbacks: gpointer,
) -> gst_sys::GstFlowReturn {
let callbacks = &*(callbacks as *const AppSinkCallbacks);
let element: AppSink = from_glib_borrow(appsink);
if callbacks.panicked.load(Ordering::Relaxed) {
let element: AppSink = from_glib_borrow(appsink);
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked"]);
return gst::FlowReturn::Error.to_glib();
}
let ret = if let Some(ref new_preroll) = callbacks.new_preroll {
(&mut *new_preroll.borrow_mut())(&from_glib_borrow(appsink)).into()
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
(&mut *new_preroll.borrow_mut())(&element).into()
}));
match result {
Ok(result) => result,
Err(err) => {
callbacks.panicked.store(true, Ordering::Relaxed);
post_panic_error_message(&element, &err);
gst::FlowReturn::Error
}
}
} else {
gst::FlowReturn::Error
};
@ -173,9 +222,27 @@ unsafe extern "C" fn trampoline_new_sample(
callbacks: gpointer,
) -> gst_sys::GstFlowReturn {
let callbacks = &*(callbacks as *const AppSinkCallbacks);
let element: AppSink = from_glib_borrow(appsink);
if callbacks.panicked.load(Ordering::Relaxed) {
let element: AppSink = from_glib_borrow(appsink);
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked"]);
return gst::FlowReturn::Error.to_glib();
}
let ret = if let Some(ref new_sample) = callbacks.new_sample {
(&mut *new_sample.borrow_mut())(&from_glib_borrow(appsink)).into()
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
(&mut *new_sample.borrow_mut())(&element).into()
}));
match result {
Ok(result) => result,
Err(err) => {
callbacks.panicked.store(true, Ordering::Relaxed);
post_panic_error_message(&element, &err);
gst::FlowReturn::Error
}
}
} else {
gst::FlowReturn::Error
};

View file

@ -10,11 +10,15 @@ use futures_sink::Sink;
use glib::translate::*;
use glib_sys::{gboolean, gpointer};
use gst;
use gst::gst_element_error;
use gst_app_sys;
use std::cell::RefCell;
use std::mem;
use std::panic;
use std::pin::Pin;
use std::ptr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use std::task::{Context, Poll, Waker};
use AppSrc;
@ -29,6 +33,7 @@ pub struct AppSrcCallbacks {
need_data: Option<RefCell<Box<dyn FnMut(&AppSrc, u32) + Send + 'static>>>,
enough_data: Option<Box<dyn Fn(&AppSrc) + Send + Sync + 'static>>,
seek_data: Option<Box<dyn Fn(&AppSrc, u64) -> bool + Send + Sync + 'static>>,
panicked: AtomicBool,
callbacks: gst_app_sys::GstAppSrcCallbacks,
}
@ -89,6 +94,7 @@ impl AppSrcCallbacksBuilder {
need_data: self.need_data,
enough_data: self.enough_data,
seek_data: self.seek_data,
panicked: AtomicBool::new(false),
callbacks: gst_app_sys::GstAppSrcCallbacks {
need_data: if have_need_data {
Some(trampoline_need_data)
@ -116,15 +122,41 @@ impl AppSrcCallbacksBuilder {
}
}
fn post_panic_error_message(element: &AppSrc, err: &dyn std::any::Any) {
if let Some(cause) = err.downcast_ref::<&str>() {
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked: {}", cause]);
} else if let Some(cause) = err.downcast_ref::<String>() {
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked: {}", cause]);
} else {
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked"]);
}
}
unsafe extern "C" fn trampoline_need_data(
appsrc: *mut gst_app_sys::GstAppSrc,
length: u32,
callbacks: gpointer,
) {
let callbacks = &*(callbacks as *const AppSrcCallbacks);
let element: AppSrc = from_glib_borrow(appsrc);
if callbacks.panicked.load(Ordering::Relaxed) {
let element: AppSrc = from_glib_borrow(appsrc);
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked"]);
return;
}
if let Some(ref need_data) = callbacks.need_data {
(&mut *need_data.borrow_mut())(&from_glib_borrow(appsrc), length);
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| {
(&mut *need_data.borrow_mut())(&element, length)
}));
match result {
Ok(result) => result,
Err(err) => {
callbacks.panicked.store(true, Ordering::Relaxed);
post_panic_error_message(&element, &err);
}
}
}
}
@ -133,9 +165,23 @@ unsafe extern "C" fn trampoline_enough_data(
callbacks: gpointer,
) {
let callbacks = &*(callbacks as *const AppSrcCallbacks);
let element: AppSrc = from_glib_borrow(appsrc);
if callbacks.panicked.load(Ordering::Relaxed) {
let element: AppSrc = from_glib_borrow(appsrc);
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked"]);
return;
}
if let Some(ref enough_data) = callbacks.enough_data {
(*enough_data)(&from_glib_borrow(appsrc));
let result = panic::catch_unwind(panic::AssertUnwindSafe(|| (*enough_data)(&element)));
match result {
Ok(result) => result,
Err(err) => {
callbacks.panicked.store(true, Ordering::Relaxed);
post_panic_error_message(&element, &err);
}
}
}
}
@ -145,9 +191,26 @@ unsafe extern "C" fn trampoline_seek_data(
callbacks: gpointer,
) -> gboolean {
let callbacks = &*(callbacks as *const AppSrcCallbacks);
let element: AppSrc = from_glib_borrow(appsrc);
if callbacks.panicked.load(Ordering::Relaxed) {
let element: AppSrc = from_glib_borrow(appsrc);
gst_element_error!(&element, gst::LibraryError::Failed, ["Panicked"]);
return false.to_glib();
}
let ret = if let Some(ref seek_data) = callbacks.seek_data {
(*seek_data)(&from_glib_borrow(appsrc), offset)
let result =
panic::catch_unwind(panic::AssertUnwindSafe(|| (*seek_data)(&element, offset)));
match result {
Ok(result) => result,
Err(err) => {
callbacks.panicked.store(true, Ordering::Relaxed);
post_panic_error_message(&element, &err);
false
}
}
} else {
false
};