pad: Fix race condition causing the same probe to be called multiple times

Probes were remembering a cookie that was used to check if the probe was
already called this time before the probes list changed. However the
same probes could've been called by another thread in between and thus
gotten a new cookie, and would then be called a second time.

https://bugzilla.gnome.org/show_bug.cgi?id=795987
This commit is contained in:
Sebastian Dröge 2018-05-10 00:05:51 +03:00
parent 80dfb7bb3f
commit 13d5957fd7

View file

@ -143,7 +143,6 @@ struct _GstPadPrivate
gint using; gint using;
guint probe_list_cookie; guint probe_list_cookie;
guint probe_cookie;
/* counter of how many idle probes are running directly from the add_probe /* counter of how many idle probes are running directly from the add_probe
* call. Used to block any data flowing in the pad while the idle callback * call. Used to block any data flowing in the pad while the idle callback
@ -159,10 +158,8 @@ struct _GstPadPrivate
typedef struct typedef struct
{ {
GHook hook; GHook hook;
guint cookie;
} GstProbe; } GstProbe;
#define PROBE_COOKIE(h) (((GstProbe *)(h))->cookie)
#define GST_PAD_IS_RUNNING_IDLE_PROBE(p) \ #define GST_PAD_IS_RUNNING_IDLE_PROBE(p) \
(((GstPad *)(p))->priv->idle_running > 0) (((GstPad *)(p))->priv->idle_running > 0)
@ -174,7 +171,11 @@ typedef struct
gboolean pass; gboolean pass;
gboolean handled; gboolean handled;
gboolean marshalled; gboolean marshalled;
guint cookie;
GHook **called_probes;
guint n_called_probes;
guint called_probes_size;
gboolean retry;
} ProbeMarshall; } ProbeMarshall;
static void gst_pad_dispose (GObject * object); static void gst_pad_dispose (GObject * object);
@ -1456,7 +1457,6 @@ gst_pad_add_probe (GstPad * pad, GstPadProbeType mask,
hook->func = callback; hook->func = callback;
hook->data = user_data; hook->data = user_data;
hook->destroy = destroy_data; hook->destroy = destroy_data;
PROBE_COOKIE (hook) = (pad->priv->probe_cookie - 1);
/* add the probe */ /* add the probe */
g_hook_append (&pad->probes, hook); g_hook_append (&pad->probes, hook);
@ -3460,6 +3460,8 @@ gst_pad_query_default (GstPad * pad, GstObject * parent, GstQuery * query)
return ret; return ret;
} }
#define N_STACK_ALLOCATE_PROBES (16)
static void static void
probe_hook_marshal (GHook * hook, ProbeMarshall * data) probe_hook_marshal (GHook * hook, ProbeMarshall * data)
{ {
@ -3469,16 +3471,36 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
GstPadProbeCallback callback; GstPadProbeCallback callback;
GstPadProbeReturn ret; GstPadProbeReturn ret;
gpointer original_data; gpointer original_data;
guint i;
/* if we have called this callback, do nothing */ /* if we have called this callback, do nothing. But only check
if (PROBE_COOKIE (hook) == data->cookie) { * if we're actually calling probes a second time */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, if (data->retry) {
"hook %lu, cookie %u already called", hook->hook_id, for (i = 0; i < data->n_called_probes; i++) {
PROBE_COOKIE (hook)); if (data->called_probes[i] == hook) {
return; GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"hook %lu already called", hook->hook_id);
return;
}
}
} }
PROBE_COOKIE (hook) = data->cookie; /* reallocate on the heap if we had more than 16 probes */
if (data->n_called_probes == data->called_probes_size) {
if (data->called_probes_size > N_STACK_ALLOCATE_PROBES) {
data->called_probes_size *= 2;
data->called_probes =
g_renew (GHook *, data->called_probes, data->called_probes_size);
} else {
GHook **tmp = data->called_probes;
data->called_probes_size *= 2;
data->called_probes = g_new (GHook *, data->called_probes_size);
memcpy (data->called_probes, tmp,
N_STACK_ALLOCATE_PROBES * sizeof (GHook *));
}
}
data->called_probes[data->n_called_probes++] = hook;
flags = hook->flags >> G_HOOK_FLAG_USER_SHIFT; flags = hook->flags >> G_HOOK_FLAG_USER_SHIFT;
type = info->type; type = info->type;
@ -3526,8 +3548,7 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
goto no_match; goto no_match;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"hook %lu, cookie %u with flags 0x%08x matches", hook->hook_id, "hook %lu with flags 0x%08x matches", hook->hook_id, flags);
PROBE_COOKIE (hook), flags);
data->marshalled = TRUE; data->marshalled = TRUE;
@ -3583,8 +3604,8 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
no_match: no_match:
{ {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"hook %lu, cookie %u with flags 0x%08x does not match %08x", "hook %lu with flags 0x%08x does not match %08x",
hook->hook_id, PROBE_COOKIE (hook), flags, info->type); hook->hook_id, flags, info->type);
return; return;
} }
} }
@ -3669,6 +3690,7 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
ProbeMarshall data; ProbeMarshall data;
guint cookie; guint cookie;
gboolean is_block; gboolean is_block;
GHook *called_probes[N_STACK_ALLOCATE_PROBES];
data.pad = pad; data.pad = pad;
data.info = info; data.info = info;
@ -3676,7 +3698,14 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
data.handled = FALSE; data.handled = FALSE;
data.marshalled = FALSE; data.marshalled = FALSE;
data.dropped = FALSE; data.dropped = FALSE;
data.cookie = ++pad->priv->probe_cookie;
/* We stack-allocate for N_STACK_ALLOCATE_PROBES hooks as a first step. If more are needed,
* we will re-allocate with g_malloc(). This should usually never be needed
*/
data.called_probes = called_probes;
data.n_called_probes = 0;
data.called_probes_size = N_STACK_ALLOCATE_PROBES;
data.retry = FALSE;
is_block = is_block =
(info->type & GST_PAD_PROBE_TYPE_BLOCK) == GST_PAD_PROBE_TYPE_BLOCK; (info->type & GST_PAD_PROBE_TYPE_BLOCK) == GST_PAD_PROBE_TYPE_BLOCK;
@ -3687,18 +3716,18 @@ do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
} }
again: again:
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "do probes");
"do probes cookie %u", data.cookie);
cookie = pad->priv->probe_list_cookie; cookie = pad->priv->probe_list_cookie;
g_hook_list_marshal (&pad->probes, TRUE, g_hook_list_marshal (&pad->probes, TRUE,
(GHookMarshaller) probe_hook_marshal, &data); (GHookMarshaller) probe_hook_marshal, &data);
/* if the list changed, call the new callbacks (they will not have their /* if the list changed, call the new callbacks (they will not be in
* cookie set to data.cookie */ * called_probes yet) */
if (cookie != pad->priv->probe_list_cookie) { if (cookie != pad->priv->probe_list_cookie) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"probe list changed, restarting"); "probe list changed, restarting");
data.retry = TRUE;
goto again; goto again;
} }
@ -3740,11 +3769,12 @@ again:
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_BLOCKING); GST_OBJECT_FLAG_UNSET (pad, GST_PAD_FLAG_BLOCKING);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "We got unblocked"); GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "We got unblocked");
/* if the list changed, call the new callbacks (they will not have their /* if the list changed, call the new callbacks (they will not be in
* cookie set to data.cookie */ * called_probes yet) */
if (cookie != pad->priv->probe_list_cookie) { if (cookie != pad->priv->probe_list_cookie) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"probe list changed, restarting"); "probe list changed, restarting");
data.retry = TRUE;
goto again; goto again;
} }
@ -3753,28 +3783,39 @@ again:
} }
} }
if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
g_free (data.called_probes);
return defaultval; return defaultval;
/* ERRORS */ /* ERRORS */
flushing: flushing:
{ {
GST_DEBUG_OBJECT (pad, "pad is flushing"); GST_DEBUG_OBJECT (pad, "pad is flushing");
if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
g_free (data.called_probes);
return GST_FLOW_FLUSHING; return GST_FLOW_FLUSHING;
} }
dropped: dropped:
{ {
GST_DEBUG_OBJECT (pad, "data is dropped"); GST_DEBUG_OBJECT (pad, "data is dropped");
if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
g_free (data.called_probes);
return GST_FLOW_CUSTOM_SUCCESS; return GST_FLOW_CUSTOM_SUCCESS;
} }
passed: passed:
{ {
/* FIXME : Should we return FLOW_OK or the defaultval ?? */ /* FIXME : Should we return FLOW_OK or the defaultval ?? */
GST_DEBUG_OBJECT (pad, "data is passed"); GST_DEBUG_OBJECT (pad, "data is passed");
if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
g_free (data.called_probes);
return GST_FLOW_OK; return GST_FLOW_OK;
} }
handled: handled:
{ {
GST_DEBUG_OBJECT (pad, "data was handled"); GST_DEBUG_OBJECT (pad, "data was handled");
if (data.called_probes_size > N_STACK_ALLOCATE_PROBES)
g_free (data.called_probes);
return GST_FLOW_CUSTOM_SUCCESS_1; return GST_FLOW_CUSTOM_SUCCESS_1;
} }
} }