appsrc/appsink: Make setting/replacing callbacks thread-safe

Previously we would simply use them without any locking at all, while
using the object lock for setting them. Nothing prevented new callbacks
to be set in the meantime, potentially calling a callback with already
freed user_data.

To prevent this move the callbacks into a reference counted struct and
use the appsrc/appsink mutex to protect access to it, which is used in
all functions calling the callbacks already anyway.

Fixes https://gitlab.freedesktop.org/gstreamer/gst-plugins-base/issues/729
This commit is contained in:
Sebastian Dröge 2020-02-12 13:11:43 +02:00 committed by GStreamer Merge Bot
parent 85e201fe30
commit 9a55945c0b
2 changed files with 162 additions and 81 deletions

View file

@ -80,6 +80,34 @@ typedef enum
APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */ APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
} GstAppSinkWaitStatus; } GstAppSinkWaitStatus;
typedef struct
{
GstAppSinkCallbacks callbacks;
gpointer user_data;
GDestroyNotify destroy_notify;
gint ref_count;
} Callbacks;
static Callbacks *
callbacks_ref (Callbacks * callbacks)
{
g_atomic_int_inc (&callbacks->ref_count);
return callbacks;
}
static void
callbacks_unref (Callbacks * callbacks)
{
if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
return;
if (callbacks->destroy_notify)
callbacks->destroy_notify (callbacks->user_data);
g_free (callbacks);
}
struct _GstAppSinkPrivate struct _GstAppSinkPrivate
{ {
GstCaps *caps; GstCaps *caps;
@ -104,9 +132,7 @@ struct _GstAppSinkPrivate
gboolean is_eos; gboolean is_eos;
gboolean buffer_lists_supported; gboolean buffer_lists_supported;
GstAppSinkCallbacks callbacks; Callbacks *callbacks;
gpointer user_data;
GDestroyNotify notify;
GstSample *sample; GstSample *sample;
}; };
@ -476,21 +502,18 @@ gst_app_sink_dispose (GObject * obj)
GstAppSink *appsink = GST_APP_SINK_CAST (obj); GstAppSink *appsink = GST_APP_SINK_CAST (obj);
GstAppSinkPrivate *priv = appsink->priv; GstAppSinkPrivate *priv = appsink->priv;
GstMiniObject *queue_obj; GstMiniObject *queue_obj;
Callbacks *callbacks = NULL;
GST_OBJECT_LOCK (appsink); GST_OBJECT_LOCK (appsink);
if (priv->caps) { if (priv->caps) {
gst_caps_unref (priv->caps); gst_caps_unref (priv->caps);
priv->caps = NULL; priv->caps = NULL;
} }
if (priv->notify) {
priv->notify (priv->user_data);
}
priv->user_data = NULL;
priv->notify = NULL;
GST_OBJECT_UNLOCK (appsink); GST_OBJECT_UNLOCK (appsink);
g_mutex_lock (&priv->mutex); g_mutex_lock (&priv->mutex);
if (priv->callbacks)
callbacks = g_steal_pointer (&priv->callbacks);
while ((queue_obj = gst_queue_array_pop_head (priv->queue))) while ((queue_obj = gst_queue_array_pop_head (priv->queue)))
gst_mini_object_unref (queue_obj); gst_mini_object_unref (queue_obj);
gst_buffer_replace (&priv->preroll_buffer, NULL); gst_buffer_replace (&priv->preroll_buffer, NULL);
@ -502,6 +525,8 @@ gst_app_sink_dispose (GObject * obj)
} }
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
g_clear_pointer (&callbacks, callbacks_unref);
G_OBJECT_CLASS (parent_class)->dispose (obj); G_OBJECT_CLASS (parent_class)->dispose (obj);
} }
@ -715,6 +740,7 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
break; break;
case GST_EVENT_EOS:{ case GST_EVENT_EOS:{
gboolean emit = TRUE; gboolean emit = TRUE;
Callbacks *callbacks = NULL;
g_mutex_lock (&priv->mutex); g_mutex_lock (&priv->mutex);
GST_DEBUG_OBJECT (appsink, "receiving EOS"); GST_DEBUG_OBJECT (appsink, "receiving EOS");
@ -748,14 +774,19 @@ gst_app_sink_event (GstBaseSink * sink, GstEvent * event)
} }
if (priv->flushing) if (priv->flushing)
emit = FALSE; emit = FALSE;
if (emit && priv->callbacks)
callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
if (emit) { if (emit) {
/* emit EOS now */ /* emit EOS now */
if (priv->callbacks.eos) if (callbacks && callbacks->callbacks.eos)
priv->callbacks.eos (appsink, priv->user_data); callbacks->callbacks.eos (appsink, callbacks->user_data);
else else
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_EOS], 0); g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_EOS], 0);
g_clear_pointer (&callbacks, callbacks_unref);
} }
break; break;
@ -784,6 +815,7 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
GstAppSink *appsink = GST_APP_SINK_CAST (psink); GstAppSink *appsink = GST_APP_SINK_CAST (psink);
GstAppSinkPrivate *priv = appsink->priv; GstAppSinkPrivate *priv = appsink->priv;
gboolean emit; gboolean emit;
Callbacks *callbacks = NULL;
g_mutex_lock (&priv->mutex); g_mutex_lock (&priv->mutex);
if (priv->flushing) if (priv->flushing)
@ -796,10 +828,12 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
g_cond_signal (&priv->cond); g_cond_signal (&priv->cond);
emit = priv->emit_signals; emit = priv->emit_signals;
if (priv->callbacks)
callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
if (priv->callbacks.new_preroll) { if (callbacks && callbacks->callbacks.new_preroll) {
res = priv->callbacks.new_preroll (appsink, priv->user_data); res = callbacks->callbacks.new_preroll (appsink, callbacks->user_data);
} else { } else {
res = GST_FLOW_OK; res = GST_FLOW_OK;
if (emit) if (emit)
@ -807,6 +841,8 @@ gst_app_sink_preroll (GstBaseSink * psink, GstBuffer * buffer)
&res); &res);
} }
g_clear_pointer (&callbacks, callbacks_unref);
return res; return res;
flushing: flushing:
@ -870,6 +906,7 @@ gst_app_sink_render_common (GstBaseSink * psink, GstMiniObject * data,
GstAppSink *appsink = GST_APP_SINK_CAST (psink); GstAppSink *appsink = GST_APP_SINK_CAST (psink);
GstAppSinkPrivate *priv = appsink->priv; GstAppSinkPrivate *priv = appsink->priv;
gboolean emit; gboolean emit;
Callbacks *callbacks = NULL;
restart: restart:
g_mutex_lock (&priv->mutex); g_mutex_lock (&priv->mutex);
@ -929,15 +966,19 @@ restart:
g_cond_signal (&priv->cond); g_cond_signal (&priv->cond);
emit = priv->emit_signals; emit = priv->emit_signals;
if (priv->callbacks)
callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
if (priv->callbacks.new_sample) { if (callbacks && callbacks->callbacks.new_sample) {
ret = priv->callbacks.new_sample (appsink, priv->user_data); ret = callbacks->callbacks.new_sample (appsink, callbacks->user_data);
} else { } else {
ret = GST_FLOW_OK; ret = GST_FLOW_OK;
if (emit) if (emit)
g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_SAMPLE], 0, &ret); g_signal_emit (appsink, gst_app_sink_signals[SIGNAL_NEW_SAMPLE], 0, &ret);
} }
g_clear_pointer (&callbacks, callbacks_unref);
return ret; return ret;
flushing: flushing:
@ -1721,12 +1762,15 @@ not_started:
* *
* If callbacks are installed, no signals will be emitted for performance * If callbacks are installed, no signals will be emitted for performance
* reasons. * reasons.
*
* Before 1.16.3 it was not possible to change the callbacks in a thread-safe
* way.
*/ */
void void
gst_app_sink_set_callbacks (GstAppSink * appsink, gst_app_sink_set_callbacks (GstAppSink * appsink,
GstAppSinkCallbacks * callbacks, gpointer user_data, GDestroyNotify notify) GstAppSinkCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
{ {
GDestroyNotify old_notify; Callbacks *old_callbacks, *new_callbacks = NULL;
GstAppSinkPrivate *priv; GstAppSinkPrivate *priv;
g_return_if_fail (GST_IS_APP_SINK (appsink)); g_return_if_fail (GST_IS_APP_SINK (appsink));
@ -1734,26 +1778,20 @@ gst_app_sink_set_callbacks (GstAppSink * appsink,
priv = appsink->priv; priv = appsink->priv;
GST_OBJECT_LOCK (appsink); if (callbacks) {
old_notify = priv->notify; new_callbacks = g_new0 (Callbacks, 1);
new_callbacks->callbacks = *callbacks;
if (old_notify) { new_callbacks->user_data = user_data;
gpointer old_data; new_callbacks->destroy_notify = notify;
new_callbacks->ref_count = 1;
old_data = priv->user_data;
priv->user_data = NULL;
priv->notify = NULL;
GST_OBJECT_UNLOCK (appsink);
old_notify (old_data);
GST_OBJECT_LOCK (appsink);
} }
priv->callbacks = *callbacks;
priv->user_data = user_data; g_mutex_lock (&priv->mutex);
priv->notify = notify; old_callbacks = g_steal_pointer (&priv->callbacks);
GST_OBJECT_UNLOCK (appsink); priv->callbacks = g_steal_pointer (&new_callbacks);
g_mutex_unlock (&priv->mutex);
g_clear_pointer (&old_callbacks, callbacks_unref);
} }
/*** GSTURIHANDLER INTERFACE *************************************************/ /*** GSTURIHANDLER INTERFACE *************************************************/

View file

@ -108,6 +108,35 @@ typedef enum
APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */ APP_WAITING = 1 << 1, /* application thread is waiting for streaming thread */
} GstAppSrcWaitStatus; } GstAppSrcWaitStatus;
typedef struct
{
GstAppSrcCallbacks callbacks;
gpointer user_data;
GDestroyNotify destroy_notify;
gint ref_count;
} Callbacks;
static Callbacks *
callbacks_ref (Callbacks * callbacks)
{
g_atomic_int_inc (&callbacks->ref_count);
return callbacks;
}
static void
callbacks_unref (Callbacks * callbacks)
{
if (!g_atomic_int_dec_and_test (&callbacks->ref_count))
return;
if (callbacks->destroy_notify)
callbacks->destroy_notify (callbacks->user_data);
g_free (callbacks);
}
struct _GstAppSrcPrivate struct _GstAppSrcPrivate
{ {
GCond cond; GCond cond;
@ -138,9 +167,7 @@ struct _GstAppSrcPrivate
gboolean emit_signals; gboolean emit_signals;
guint min_percent; guint min_percent;
GstAppSrcCallbacks callbacks; Callbacks *callbacks;
gpointer user_data;
GDestroyNotify notify;
}; };
GST_DEBUG_CATEGORY_STATIC (app_src_debug); GST_DEBUG_CATEGORY_STATIC (app_src_debug);
@ -621,6 +648,7 @@ gst_app_src_dispose (GObject * obj)
{ {
GstAppSrc *appsrc = GST_APP_SRC_CAST (obj); GstAppSrc *appsrc = GST_APP_SRC_CAST (obj);
GstAppSrcPrivate *priv = appsrc->priv; GstAppSrcPrivate *priv = appsrc->priv;
Callbacks *callbacks = NULL;
GST_OBJECT_LOCK (appsrc); GST_OBJECT_LOCK (appsrc);
if (priv->current_caps) { if (priv->current_caps) {
@ -631,18 +659,16 @@ gst_app_src_dispose (GObject * obj)
gst_caps_unref (priv->last_caps); gst_caps_unref (priv->last_caps);
priv->last_caps = NULL; priv->last_caps = NULL;
} }
if (priv->notify) {
priv->notify (priv->user_data);
}
priv->user_data = NULL;
priv->notify = NULL;
GST_OBJECT_UNLOCK (appsrc); GST_OBJECT_UNLOCK (appsrc);
g_mutex_lock (&priv->mutex); g_mutex_lock (&priv->mutex);
if (priv->callbacks)
callbacks = g_steal_pointer (&priv->callbacks);
gst_app_src_flush_queued (appsrc, FALSE); gst_app_src_flush_queued (appsrc, FALSE);
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
g_clear_pointer (&callbacks, callbacks_unref);
G_OBJECT_CLASS (parent_class)->dispose (obj); G_OBJECT_CLASS (parent_class)->dispose (obj);
} }
@ -997,6 +1023,8 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
GstAppSrcPrivate *priv = appsrc->priv; GstAppSrcPrivate *priv = appsrc->priv;
gint64 desired_position; gint64 desired_position;
gboolean res = FALSE; gboolean res = FALSE;
gboolean emit;
Callbacks *callbacks = NULL;
desired_position = segment->position; desired_position = segment->position;
@ -1007,20 +1035,23 @@ gst_app_src_do_seek (GstBaseSrc * src, GstSegment * segment)
GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s", GST_DEBUG_OBJECT (appsrc, "seeking to %" G_GINT64_FORMAT ", format %s",
desired_position, gst_format_get_name (segment->format)); desired_position, gst_format_get_name (segment->format));
if (priv->callbacks.seek_data) g_mutex_lock (&priv->mutex);
res = priv->callbacks.seek_data (appsrc, desired_position, priv->user_data); emit = priv->emit_signals;
else { if (priv->callbacks)
gboolean emit; callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex);
g_mutex_lock (&priv->mutex); if (callbacks && callbacks->callbacks.seek_data) {
emit = priv->emit_signals; res =
g_mutex_unlock (&priv->mutex); callbacks->callbacks.seek_data (appsrc, desired_position,
callbacks->user_data);
if (emit) } else if (emit) {
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0, g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
desired_position, &res); desired_position, &res);
} }
g_clear_pointer (&callbacks, callbacks_unref);
if (res) { if (res) {
GST_DEBUG_OBJECT (appsrc, "flushing queue"); GST_DEBUG_OBJECT (appsrc, "flushing queue");
g_mutex_lock (&priv->mutex); g_mutex_lock (&priv->mutex);
@ -1041,20 +1072,25 @@ gst_app_src_emit_seek (GstAppSrc * appsrc, guint64 offset)
gboolean res = FALSE; gboolean res = FALSE;
gboolean emit; gboolean emit;
GstAppSrcPrivate *priv = appsrc->priv; GstAppSrcPrivate *priv = appsrc->priv;
Callbacks *callbacks = NULL;
emit = priv->emit_signals; emit = priv->emit_signals;
if (priv->callbacks)
callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
GST_DEBUG_OBJECT (appsrc, GST_DEBUG_OBJECT (appsrc,
"we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT, "we are at %" G_GINT64_FORMAT ", seek to %" G_GINT64_FORMAT,
priv->offset, offset); priv->offset, offset);
if (priv->callbacks.seek_data) if (callbacks && callbacks->callbacks.seek_data)
res = priv->callbacks.seek_data (appsrc, offset, priv->user_data); res = callbacks->callbacks.seek_data (appsrc, offset, callbacks->user_data);
else if (emit) else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0, g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_SEEK_DATA], 0,
offset, &res); offset, &res);
g_clear_pointer (&callbacks, callbacks_unref);
g_mutex_lock (&priv->mutex); g_mutex_lock (&priv->mutex);
return res; return res;
@ -1067,17 +1103,22 @@ gst_app_src_emit_need_data (GstAppSrc * appsrc, guint size)
{ {
gboolean emit; gboolean emit;
GstAppSrcPrivate *priv = appsrc->priv; GstAppSrcPrivate *priv = appsrc->priv;
Callbacks *callbacks = NULL;
emit = priv->emit_signals; emit = priv->emit_signals;
if (priv->callbacks)
callbacks = callbacks_ref (priv->callbacks);
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
/* we have no data, we need some. We fire the signal with the size hint. */ /* we have no data, we need some. We fire the signal with the size hint. */
if (priv->callbacks.need_data) if (callbacks && callbacks->callbacks.need_data)
priv->callbacks.need_data (appsrc, size, priv->user_data); callbacks->callbacks.need_data (appsrc, size, callbacks->user_data);
else if (emit) else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size, g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_NEED_DATA], 0, size,
NULL); NULL);
g_clear_pointer (&callbacks, callbacks_unref);
g_mutex_lock (&priv->mutex); g_mutex_lock (&priv->mutex);
/* we can be flushing now because we released the lock */ /* we can be flushing now because we released the lock */
} }
@ -1817,18 +1858,23 @@ gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
priv->queued_bytes, priv->max_bytes); priv->queued_bytes, priv->max_bytes);
if (first) { if (first) {
Callbacks *callbacks = NULL;
gboolean emit; gboolean emit;
emit = priv->emit_signals; emit = priv->emit_signals;
if (priv->callbacks)
callbacks = callbacks_ref (priv->callbacks);
/* only signal on the first push */ /* only signal on the first push */
g_mutex_unlock (&priv->mutex); g_mutex_unlock (&priv->mutex);
if (priv->callbacks.enough_data) if (callbacks && callbacks->callbacks.enough_data)
priv->callbacks.enough_data (appsrc, priv->user_data); callbacks->callbacks.enough_data (appsrc, callbacks->user_data);
else if (emit) else if (emit)
g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0, g_signal_emit (appsrc, gst_app_src_signals[SIGNAL_ENOUGH_DATA], 0,
NULL); NULL);
g_clear_pointer (&callbacks, callbacks_unref);
g_mutex_lock (&priv->mutex); g_mutex_lock (&priv->mutex);
/* continue to check for flushing/eos after releasing the lock */ /* continue to check for flushing/eos after releasing the lock */
first = FALSE; first = FALSE;
@ -2081,12 +2127,15 @@ flushing:
* *
* If callbacks are installed, no signals will be emitted for performance * If callbacks are installed, no signals will be emitted for performance
* reasons. * reasons.
*
* Before 1.16.3 it was not possible to change the callbacks in a thread-safe
* way.
*/ */
void void
gst_app_src_set_callbacks (GstAppSrc * appsrc, gst_app_src_set_callbacks (GstAppSrc * appsrc,
GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify) GstAppSrcCallbacks * callbacks, gpointer user_data, GDestroyNotify notify)
{ {
GDestroyNotify old_notify; Callbacks *old_callbacks, *new_callbacks = NULL;
GstAppSrcPrivate *priv; GstAppSrcPrivate *priv;
g_return_if_fail (GST_IS_APP_SRC (appsrc)); g_return_if_fail (GST_IS_APP_SRC (appsrc));
@ -2094,26 +2143,20 @@ gst_app_src_set_callbacks (GstAppSrc * appsrc,
priv = appsrc->priv; priv = appsrc->priv;
GST_OBJECT_LOCK (appsrc); if (callbacks) {
old_notify = priv->notify; new_callbacks = g_new0 (Callbacks, 1);
new_callbacks->callbacks = *callbacks;
if (old_notify) { new_callbacks->user_data = user_data;
gpointer old_data; new_callbacks->destroy_notify = notify;
new_callbacks->ref_count = 1;
old_data = priv->user_data;
priv->user_data = NULL;
priv->notify = NULL;
GST_OBJECT_UNLOCK (appsrc);
old_notify (old_data);
GST_OBJECT_LOCK (appsrc);
} }
priv->callbacks = *callbacks;
priv->user_data = user_data; g_mutex_lock (&priv->mutex);
priv->notify = notify; old_callbacks = g_steal_pointer (&priv->callbacks);
GST_OBJECT_UNLOCK (appsrc); priv->callbacks = g_steal_pointer (&new_callbacks);
g_mutex_unlock (&priv->mutex);
g_clear_pointer (&old_callbacks, callbacks_unref);
} }
/*** GSTURIHANDLER INTERFACE *************************************************/ /*** GSTURIHANDLER INTERFACE *************************************************/