From 8abc457a3b7c1455d759e52fc5164ef066f3fe6f Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Tue, 31 May 2011 19:16:09 +0200 Subject: [PATCH] pad: implement pad block with probes --- gst/gstpad.c | 711 ++++++++++++------------- gst/gstpad.h | 75 ++- gst/gstutils.c | 212 -------- gst/gstutils.h | 22 - libs/gst/check/gstbufferstraw.c | 8 +- libs/gst/check/gstconsistencychecker.c | 8 +- tests/check/gst/gstevent.c | 44 +- tests/check/gst/gstghostpad.c | 11 +- tests/check/gst/gstpad.c | 125 +++-- tests/check/gst/gstpipeline.c | 5 +- 10 files changed, 518 insertions(+), 703 deletions(-) diff --git a/gst/gstpad.c b/gst/gstpad.c index 5f540e88cb..e34540eb2c 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -112,8 +112,27 @@ struct _GstPadPrivate PadEvent events[GST_EVENT_MAX_STICKY]; gint using; + gint probe_cookie; }; +typedef struct +{ + GHook hook; + guint cookie; +} GstProbe; + +#define PROBE_COOKIE(h) (((GstProbe *)(h))->cookie) + +typedef struct +{ + GstPad *pad; + GstProbeType mask; + gpointer type_data; + GstProbeReturn ret; + gboolean pass; + guint cookie; +} ProbeMarshall; + static void gst_pad_dispose (GObject * object); static void gst_pad_finalize (GObject * object); static void gst_pad_set_property (GObject * object, guint prop_id, @@ -121,7 +140,6 @@ static void gst_pad_set_property (GObject * object, guint prop_id, static void gst_pad_get_property (GObject * object, guint prop_id, GValue * value, GParamSpec * pspec); -static GstFlowReturn handle_pad_block (GstPad * pad, GstBlockType type); static GstCaps *gst_pad_get_caps_unlocked (GstPad * pad, GstCaps * filter); static void gst_pad_set_pad_template (GstPad * pad, GstPadTemplate * templ); static gboolean gst_pad_activate_default (GstPad * pad); @@ -361,6 +379,8 @@ gst_pad_init (GstPad * pad) g_static_rec_mutex_init (pad->stream_rec_lock); pad->block_cond = g_cond_new (); + + g_hook_list_init (&pad->probes, sizeof (GstProbe)); } /* called when setting the pad inactive. It removes all sticky events from @@ -483,13 +503,10 @@ gst_pad_dispose (GObject * object) gst_pad_set_pad_template (pad, NULL); - if (pad->block_destroy_data && pad->block_data) { - pad->block_destroy_data (pad->block_data); - pad->block_data = NULL; - } - clear_events (pad->priv->events); + g_hook_list_clear (&pad->probes); + G_OBJECT_CLASS (parent_class)->dispose (object); } @@ -1070,53 +1087,68 @@ gst_pad_is_active (GstPad * pad) } /** - * gst_pad_block: - * @pad: the #GstPad to block or unblock - * @type: the different pad block states - * @callback: #GstPadBlockCallback that will be called with notifications of - * the pad block state + * gst_pad_add_probe: + * @pad: the #GstPad to add the probe to + * @mask: the probe mask + * @callback: #GstPadProbeCallback that will be called with notifications of + * the pad state * @user_data: (closure): user data passed to the callback * @destroy_data: #GDestroyNotify for user_data * - * Blocks the dataflow on a pad. The provided callback is called with - * notifications about the different stages in the pad block. + * Be notified of different states of pads. The provided callback is called for + * every state that matches @mask. * * - * Pad block handlers are only called for source pads in push mode + * Pad probe handlers are only called for source pads in push mode * and sink pads in pull mode. * * - * Returns: TRUE if the pad could be blocked. This function can fail if the - * wrong parameters were passed or the pad was already blocked. + * Returns: an id or 0 on error. The id can be used to remove the probe with + * gst_pad_remove_probe(). * * MT safe. */ -gboolean -gst_pad_block (GstPad * pad, GstBlockType type, - GstPadBlockCallback callback, gpointer user_data, +gulong +gst_pad_add_probe (GstPad * pad, GstProbeType mask, + GstPadProbeCallback callback, gpointer user_data, GDestroyNotify destroy_data) { - g_return_val_if_fail (GST_IS_PAD (pad), FALSE); - g_return_val_if_fail (callback != NULL, FALSE); + GHook *hook; + gulong res; + + g_return_val_if_fail (GST_IS_PAD (pad), 0); + g_return_val_if_fail (mask != 0, 0); + g_return_val_if_fail (callback != NULL, 0); GST_OBJECT_LOCK (pad); + /* make a new probe */ + hook = g_hook_alloc (&pad->probes); - if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) - goto was_blocked; + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "adding probe for mask 0x%08x", + mask); - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocking pad"); + /* store our flags and other fields */ + hook->flags |= (mask << G_HOOK_FLAG_USER_SHIFT); + hook->func = callback; + hook->data = user_data; + hook->destroy = destroy_data; + PROBE_COOKIE (hook) = 0; - GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED); + /* incremenent cookie so that the new hook get's called */ + pad->priv->probe_cookie++; - if (pad->block_destroy_data && pad->block_data) - pad->block_destroy_data (pad->block_data); + /* add the probe */ + g_hook_prepend (&pad->probes, hook); - /* always store the block_cb */ - pad->block_type = type; - pad->block_callback = callback; - pad->block_data = user_data; - pad->block_destroy_data = destroy_data; - pad->block_callback_called = FALSE; + res = hook->hook_id; + + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "got probe id %lu", res); + + if (mask & (GST_PROBE_TYPE_IDLE | GST_PROBE_TYPE_BLOCK)) { + /* we have a block probe */ + pad->num_blocked++; + GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED); + } if (pad->priv->using > 0) { /* the pad is in use, we can't signal the idle callback yet. Since we set the @@ -1130,58 +1162,62 @@ gst_pad_block (GstPad * pad, GstBlockType type, GST_OBJECT_UNLOCK (pad); /* call the callback if we need to be called for idle callbacks */ - if (type & GST_BLOCK_TYPE_IDLE) - callback (pad, GST_BLOCK_TYPE_IDLE, user_data); + if (mask & GST_PROBE_TYPE_IDLE) + callback (pad, GST_PROBE_TYPE_IDLE, NULL, user_data); } return TRUE; +} -was_blocked: - { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad was blocked"); - GST_OBJECT_UNLOCK (pad); +static void +cleanup_hook (GstPad * pad, GHook * hook) +{ + GstProbeType type; - return FALSE; + type = (hook->flags) >> G_HOOK_FLAG_USER_SHIFT; + + if (type & (GST_PROBE_TYPE_IDLE | GST_PROBE_TYPE_BLOCK)) { + /* unblock when we remove the last blocking probe */ + pad->num_blocked--; + if (pad->num_blocked == 0) { + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED); + GST_PAD_BLOCK_BROADCAST (pad); + } } + g_hook_destroy_link (&pad->probes, hook); } /** - * gst_pad_unblock: - * @pad: the #GstPad to unblock + * gst_pad_remove_probe: + * @pad: the #GstPad with the probe + * @id: the probe id to remove * - * Unblock @pad. All pending and current pad blocks, if any, are canceled. After - * this method, dataflow will continue on @pad. + * Remove the probe with @id from @pad. * * MT safe. */ void -gst_pad_unblock (GstPad * pad) +gst_pad_remove_probe (GstPad * pad, gulong id) { + GHook *hook; + g_return_if_fail (GST_IS_PAD (pad)); GST_OBJECT_LOCK (pad); - if (G_UNLIKELY (!GST_PAD_IS_BLOCKED (pad))) - goto had_right_state; + hook = g_hook_get (&pad->probes, id); + if (hook == NULL) + goto not_found; - /* cleanup */ - if (pad->block_destroy_data && pad->block_data) - pad->block_destroy_data (pad->block_data); - - pad->block_type = 0; - pad->block_callback = NULL; - pad->block_data = NULL; - pad->block_destroy_data = NULL; - - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocking pad"); - GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED); - GST_PAD_BLOCK_BROADCAST (pad); + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "removing hook %ld", + hook->hook_id); + cleanup_hook (pad, hook); GST_OBJECT_UNLOCK (pad); return; -had_right_state: +not_found: { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad was unblocked"); + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "no hook with id %d", id); GST_OBJECT_UNLOCK (pad); return; } @@ -3445,117 +3481,136 @@ gst_pad_query_default (GstPad * pad, GstQuery * query) } } -/* - * should be called with pad OBJECT_LOCK and STREAM_LOCK held. - * GST_PAD_IS_BLOCKED (pad) == TRUE when this function is - * called. - * - * This function performs the pad blocking when an event, buffer push - * is performed on a _SRC_ pad. It blocks the streaming thread after - * informing the pad has been blocked. - * - * An application can with this method wait and block any streaming - * thread and perform operations such as seeking or linking. - * - * Two methods are available for notifying the application of the - * block: - * - the callback method, which happens in the STREAMING thread with - * the STREAM_LOCK held. With this method, the most useful way of - * dealing with the callback is to post a message to the main thread - * where the pad block can then be handled outside of the streaming - * thread. With the last method one can perform all operations such - * as doing a state change, linking, unblocking, seeking etc on the - * pad. - * - the GCond signal method, which makes any thread unblock when - * the pad block happens. - * - * During the actual blocking state, the GST_PAD_BLOCKING flag is set. - * The GST_PAD_BLOCKING flag is unset when the pad was unblocked. - * - * MT safe. - */ -static GstFlowReturn -handle_pad_block (GstPad * pad, GstBlockType type) +static void +probe_hook_marshal (GHook * hook, ProbeMarshall * data) { - GstPadBlockCallback callback; - gpointer user_data; - GstFlowReturn ret = GST_FLOW_OK; + GstPad *pad = data->pad; + GstProbeType flags; - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "signal block taken"); + /* if we have called this callback, do nothing */ + if (PROBE_COOKIE (hook) == data->cookie) + return; - /* flushing, don't bother trying to block and return WRONG_STATE - * right away */ - if (GST_PAD_IS_FLUSHING (pad)) - goto flushingnonref; + PROBE_COOKIE (hook) = data->cookie; - /* we grab an extra ref for the callbacks, this is probably not - * needed (callback code does not have a ref and cannot unref). I - * think this was done to make it possible to unref the element in - * the callback, which is in the end totally impossible as it - * requires grabbing the STREAM_LOCK and OBJECT_LOCK which are - * all taken when calling this function. */ - gst_object_ref (pad); + flags = hook->flags >> G_HOOK_FLAG_USER_SHIFT; - while (GST_PAD_IS_BLOCKED (pad)) { - do { - /* we either have a callback installed to notify the block or - * some other thread is doing a GCond wait. */ - callback = pad->block_callback; - pad->block_callback_called = TRUE; - if ((pad->block_type & type) == pad->block_type) { - user_data = pad->block_data; - GST_OBJECT_UNLOCK (pad); + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "have hook %lu with flags 0x%08x", hook->hook_id, flags); - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "calling block callback"); - /* Call the callback. We release the lock so that the callback can - * do something usefull with the pad */ - callback (pad, type, user_data); + if ((flags & data->mask) == flags) { + GstPadProbeCallback callback; - GST_OBJECT_LOCK (pad); - /* we released the lock, recheck flushing */ - if (GST_PAD_IS_FLUSHING (pad)) - goto flushing; + callback = (GstPadProbeCallback) hook->func; + if (callback) { + GstProbeReturn ret; + + GST_OBJECT_UNLOCK (pad); + + ret = callback (pad, data->mask, data->type_data, hook->data); + + GST_OBJECT_LOCK (pad); + + switch (ret) { + case GST_PROBE_REMOVE: + /* remove the probe */ + GST_DEBUG_OBJECT (pad, "asked to remove hook"); + cleanup_hook (pad, hook); + break; + case GST_PROBE_DROP: + /* need to drop the data, make sure other probes don't get called + * anymore */ + GST_DEBUG_OBJECT (pad, "asked to drop item"); + data->mask = GST_PROBE_TYPE_INVALID; + data->ret = GST_PROBE_DROP; + break; + case GST_PROBE_PASS: + /* inform the pad block to let things pass */ + GST_DEBUG_OBJECT (pad, "asked to pass item"); + data->pass = TRUE; + break; + default: + GST_DEBUG_OBJECT (pad, "probe returned %d", ret); + break; } - } while (pad->block_callback_called == FALSE && GST_PAD_IS_BLOCKED (pad)); + } + } +} - /* OBJECT_LOCK could have been released when we did the callback, which - * then could have made the pad unblock so we need to check the blocking - * condition again. */ - if (!GST_PAD_IS_BLOCKED (pad)) - break; +static GstFlowReturn +do_probe_callbacks (GstPad * pad, GstProbeType mask, gpointer type_data) +{ + ProbeMarshall data; + guint cookie; - /* now we block the streaming thread. It can be unlocked when we - * deactivate the pad (which will also set the FLUSHING flag) or - * when the pad is unblocked. A flushing event will also unblock - * the pad after setting the FLUSHING flag. */ - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "Waiting to be unblocked or set flushing"); - GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING); - GST_PAD_BLOCK_WAIT (pad); - GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING); + if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + goto flushing; - /* see if we got unblocked by a flush or not */ - if (GST_PAD_IS_FLUSHING (pad)) - goto flushing; + data.pad = pad; + data.mask = mask; + data.type_data = type_data; + data.ret = GST_PROBE_OK; + data.pass = FALSE; + data.cookie = pad->priv->probe_cookie++; + +again: + cookie = pad->priv->probe_cookie; + + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "doing callbacks for probe 0x%08x", data.mask); + g_hook_list_marshal (&pad->probes, FALSE, + (GHookMarshaller) probe_hook_marshal, &data); + + /* if the list changed, call the new callbacks (they will not have their + * cookie set to data.cookie */ + if (cookie != pad->priv->probe_cookie) + goto again; + + /* we might have released the lock */ + if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + goto flushing; + + if (data.ret == GST_PROBE_DROP) + goto dropped; + + if (data.pass) + goto passed; + + if (mask & (GST_PROBE_TYPE_BLOCK)) { + while (GST_PAD_IS_BLOCKING (pad)) { + /* now we block the streaming thread. It can be unlocked when we + * deactivate the pad (which will also set the FLUSHING flag) or + * when the pad is unblocked. A flushing event will also unblock + * the pad after setting the FLUSHING flag. */ + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "Waiting to be unblocked or set flushing"); + GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING); + GST_PAD_BLOCK_WAIT (pad); + GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING); + + if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + goto flushing; + } } - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "got unblocked"); + return GST_FLOW_OK; - gst_object_unref (pad); - - return ret; - -flushingnonref: - { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad was flushing"); - return GST_FLOW_WRONG_STATE; - } + /* ERRORS */ flushing: { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad became flushing"); - gst_object_unref (pad); + GST_DEBUG_OBJECT (pad, "pad is flushing"); return GST_FLOW_WRONG_STATE; } +dropped: + { + GST_DEBUG_OBJECT (pad, "data is dropped"); + return GST_FLOW_CUSTOM_SUCCESS; + } +passed: + { + GST_DEBUG_OBJECT (pad, "data is passed"); + return GST_FLOW_OK; + } } /* pad offsets */ @@ -3651,6 +3706,7 @@ done: * Data passing functions */ +#if 0 static gboolean gst_pad_emit_have_data_signal (GstPad * pad, GstMiniObject * obj) { @@ -3689,6 +3745,7 @@ gst_pad_emit_have_data_signal (GstPad * pad, GstMiniObject * obj) return res; } +#endif static void gst_pad_data_unref (gboolean is_buffer, void *data) @@ -3700,12 +3757,14 @@ gst_pad_data_unref (gboolean is_buffer, void *data) } } -static GstFlowReturn -pad_pre_chain (GstPad * pad, gpointer data) +/* this is the chain function that does not perform the additional argument + * checking for that little extra speed. + */ +static inline GstFlowReturn +gst_pad_chain_data_unchecked (GstPad * pad, gboolean is_buffer, void *data) { GstFlowReturn ret; gboolean needs_events; - gboolean emit_signal; GST_PAD_STREAM_LOCK (pad); @@ -3722,57 +3781,15 @@ pad_pre_chain (GstPad * pad, gpointer data) if (G_UNLIKELY (ret != GST_FLOW_OK)) goto events_error; } - emit_signal = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0; + + ret = + do_probe_callbacks (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_BUFFER, + data); + if (ret != GST_FLOW_OK) + goto probe_stopped; + GST_OBJECT_UNLOCK (pad); - /* see if the signal should be emited */ - if (G_UNLIKELY (emit_signal)) { - if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data))) - goto dropping; - } - return GST_FLOW_OK; - - /* ERRORS */ -flushing: - { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "pushing, but pad was flushing"); - GST_OBJECT_UNLOCK (pad); - GST_PAD_STREAM_UNLOCK (pad); - return GST_FLOW_WRONG_STATE; - } -events_error: - { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "events were not accepted"); - GST_OBJECT_UNLOCK (pad); - GST_PAD_STREAM_UNLOCK (pad); - return ret; - } -dropping: - { - GST_DEBUG_OBJECT (pad, "Dropping buffer due to FALSE probe return"); - GST_PAD_STREAM_UNLOCK (pad); - return GST_FLOW_CUSTOM_SUCCESS; - } -} - -static void -pad_post_chain (GstPad * pad) -{ - GST_PAD_STREAM_UNLOCK (pad); -} - -/* this is the chain function that does not perform the additional argument - * checking for that little extra speed. - */ -static inline GstFlowReturn -gst_pad_chain_data_unchecked (GstPad * pad, gboolean is_buffer, void *data) -{ - GstFlowReturn ret; - - if (G_UNLIKELY ((ret = pad_pre_chain (pad, data)) != GST_FLOW_OK)) - goto error; - /* NOTE: we read the chainfunc unlocked. * we cannot hold the lock for the pad so we might send * the data to the wrong function. This is not really a @@ -3810,13 +3827,30 @@ gst_pad_chain_data_unchecked (GstPad * pad, gboolean is_buffer, void *data) GST_DEBUG_FUNCPTR_NAME (chainlistfunc), gst_flow_get_name (ret)); } - pad_post_chain (pad); + GST_PAD_STREAM_UNLOCK (pad); return ret; /* ERRORS */ -error: +flushing: { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "chaining, but pad was flushing"); + GST_OBJECT_UNLOCK (pad); + GST_PAD_STREAM_UNLOCK (pad); + return GST_FLOW_WRONG_STATE; + } +events_error: + { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "events were not accepted"); + GST_OBJECT_UNLOCK (pad); + GST_PAD_STREAM_UNLOCK (pad); + return ret; + } +probe_stopped: + { + GST_OBJECT_UNLOCK (pad); + GST_PAD_STREAM_UNLOCK (pad); gst_pad_data_unref (is_buffer, data); switch (ret) { @@ -3838,7 +3872,7 @@ no_function: GST_ELEMENT_ERROR (GST_PAD_PARENT (pad), CORE, PAD, (NULL), ("push on pad %s:%s but it has no chainfunction", GST_DEBUG_PAD_NAME (pad))); - pad_post_chain (pad); + GST_PAD_STREAM_UNLOCK (pad); return GST_FLOW_NOT_SUPPORTED; } } @@ -3942,91 +3976,14 @@ gst_pad_chain_list (GstPad * pad, GstBufferList * list) return gst_pad_chain_data_unchecked (pad, FALSE, list); } -static GstFlowReturn -pad_pre_push (GstPad * pad, GstPad ** peer, gpointer data) -{ - GstFlowReturn ret; - gboolean need_probes, do_probes = TRUE; - -again: - GST_OBJECT_LOCK (pad); - /* FIXME: this check can go away; pad_set_blocked could be implemented with - * probes completely or probes with an extended pad block. */ - while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) - if ((ret = - handle_pad_block (pad, - GST_BLOCK_TYPE_PUSH | GST_BLOCK_TYPE_DATA) != GST_FLOW_OK)) - goto flushed; - - need_probes = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0; - - /* we emit signals on the pad arg, the peer will have a chance to - * emit in the _chain() function */ - if (G_UNLIKELY (need_probes && do_probes)) { - do_probes = FALSE; - /* unlock before emitting */ - GST_OBJECT_UNLOCK (pad); - - /* if the signal handler returned FALSE, it means we should just drop the - * buffer */ - if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data))) - goto dropped; - - goto again; - } - - if (G_UNLIKELY ((*peer = GST_PAD_PEER (pad)) == NULL)) - goto not_linked; - - /* take ref to peer pad before releasing the lock */ - gst_object_ref (*peer); - pad->priv->using++; - GST_OBJECT_UNLOCK (pad); - - return GST_FLOW_OK; - - /* ERRORS */ -flushed: - { - GST_DEBUG_OBJECT (pad, "pad block stopped by flush"); - GST_OBJECT_UNLOCK (pad); - return ret; - } -dropped: - { - GST_DEBUG_OBJECT (pad, "Dropping buffer due to FALSE probe return"); - return GST_FLOW_CUSTOM_SUCCESS; - } -not_linked: - { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "pushing, but it was not linked"); - GST_OBJECT_UNLOCK (pad); - return GST_FLOW_NOT_LINKED; - } -} - static void pad_post_push (GstPad * pad) { GST_OBJECT_LOCK (pad); pad->priv->using--; if (pad->priv->using == 0) { - /* pad is not active anymore, check if we need to trigger the block */ - if (GST_PAD_IS_BLOCKED (pad)) { - GstPadBlockCallback callback; - gpointer user_data; - - callback = pad->block_callback; - user_data = pad->block_data; - GST_PAD_BLOCK_BROADCAST (pad); - GST_OBJECT_UNLOCK (pad); - - if (callback) - callback (pad, GST_BLOCK_TYPE_IDLE, user_data); - - return; - } + /* pad is not active anymore, trigger idle callbacks */ + do_probe_callbacks (pad, GST_PROBE_TYPE_IDLE, NULL); } GST_OBJECT_UNLOCK (pad); } @@ -4036,9 +3993,32 @@ gst_pad_push_data (GstPad * pad, gboolean is_buffer, void *data) { GstPad *peer; GstFlowReturn ret; + GstProbeType type; - if ((ret = pad_pre_push (pad, &peer, data)) != GST_FLOW_OK) - goto error; + type = is_buffer ? GST_PROBE_TYPE_BUFFER : GST_PROBE_TYPE_BUFFER_LIST; + type |= GST_PROBE_TYPE_PUSH; + + GST_OBJECT_LOCK (pad); + if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + goto flushing; + + /* do block probes */ + ret = do_probe_callbacks (pad, type | GST_PROBE_TYPE_BLOCK, data); + if (ret != GST_FLOW_OK) + goto probe_stopped; + + /* do post-blocking probes */ + ret = do_probe_callbacks (pad, type, data); + if (ret != GST_FLOW_OK) + goto probe_stopped; + + if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL)) + goto not_linked; + + /* take ref to peer pad before releasing the lock */ + gst_object_ref (peer); + pad->priv->using++; + GST_OBJECT_UNLOCK (pad); ret = gst_pad_chain_data_unchecked (peer, is_buffer, data); @@ -4049,8 +4029,18 @@ gst_pad_push_data (GstPad * pad, gboolean is_buffer, void *data) return ret; /* ERROR recovery here */ -error: + /* ERRORS */ +flushing: { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "pushing, but pad was flushing"); + GST_OBJECT_UNLOCK (pad); + gst_pad_data_unref (is_buffer, data); + return GST_FLOW_WRONG_STATE; + } +probe_stopped: + { + GST_OBJECT_UNLOCK (pad); gst_pad_data_unref (is_buffer, data); switch (ret) { @@ -4064,6 +4054,14 @@ error: } return ret; } +not_linked: + { + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, + "pushing, but it was not linked"); + GST_OBJECT_UNLOCK (pad); + gst_pad_data_unref (is_buffer, data); + return GST_FLOW_NOT_LINKED; + } } /** @@ -4155,15 +4153,12 @@ gst_pad_get_range_unchecked (GstPad * pad, guint64 offset, guint size, { GstFlowReturn ret; GstPadGetRangeFunction getrangefunc; - gboolean emit_signal; GST_PAD_STREAM_LOCK (pad); GST_OBJECT_LOCK (pad); if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) goto flushing; - - emit_signal = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0; GST_OBJECT_UNLOCK (pad); if (G_UNLIKELY ((getrangefunc = GST_PAD_GETRANGEFUNC (pad)) == NULL)) @@ -4176,16 +4171,20 @@ gst_pad_get_range_unchecked (GstPad * pad, guint64 offset, guint size, ret = getrangefunc (pad, offset, size, buffer); - /* can only fire the signal if we have a valid buffer */ - if (G_UNLIKELY (emit_signal) && (ret == GST_FLOW_OK)) { - if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (*buffer))) - goto dropping; - } - GST_PAD_STREAM_UNLOCK (pad); - if (G_UNLIKELY (ret != GST_FLOW_OK)) goto get_range_failed; + /* can only fire the signal if we have a valid buffer */ + GST_OBJECT_LOCK (pad); + ret = + do_probe_callbacks (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, + *buffer); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto probe_stopped; + GST_OBJECT_LOCK (pad); + + GST_PAD_STREAM_UNLOCK (pad); + return ret; /* ERRORS */ @@ -4205,17 +4204,19 @@ no_function: GST_PAD_STREAM_UNLOCK (pad); return GST_FLOW_NOT_SUPPORTED; } -dropping: +probe_stopped: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "Dropping data after FALSE probe return"); + "probe returned %s", gst_flow_get_name (ret)); + GST_OBJECT_UNLOCK (pad); GST_PAD_STREAM_UNLOCK (pad); gst_buffer_unref (*buffer); *buffer = NULL; - return GST_FLOW_UNEXPECTED; + return ret; } get_range_failed: { + GST_PAD_STREAM_UNLOCK (pad); *buffer = NULL; GST_CAT_LEVEL_LOG (GST_CAT_SCHEDULING, (ret >= GST_FLOW_UNEXPECTED) ? GST_LEVEL_INFO : GST_LEVEL_WARNING, @@ -4293,7 +4294,6 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size, { GstPad *peer; GstFlowReturn ret; - gboolean emit_signal; gboolean needs_events; g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR); @@ -4302,16 +4302,14 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size, GST_OBJECT_LOCK (pad); - while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) - if ((ret = handle_pad_block (pad, GST_BLOCK_TYPE_PULL) != GST_FLOW_OK)) - goto flushed; + ret = + do_probe_callbacks (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BLOCK, + NULL); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto pre_probe_stopped; if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL)) - goto not_connected; - - /* signal emision for the pad, peer has chance to emit when - * we call _get_range() */ - emit_signal = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0; + goto not_linked; gst_object_ref (peer); GST_OBJECT_UNLOCK (pad); @@ -4323,19 +4321,12 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size, if (G_UNLIKELY (ret != GST_FLOW_OK)) goto pull_range_failed; - while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) - if ((ret = - handle_pad_block (pad, - GST_BLOCK_TYPE_PULL | GST_BLOCK_TYPE_DATA) != GST_FLOW_OK)) - goto flushed; - - /* can only fire the signal if we have a valid buffer */ - if (G_UNLIKELY (emit_signal)) { - if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (*buffer))) - goto dropping; - } - GST_OBJECT_LOCK (pad); + ret = + do_probe_callbacks (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, + buffer); + if (G_UNLIKELY (ret != GST_FLOW_OK)) + goto post_probe_stopped; needs_events = GST_PAD_NEEDS_EVENTS (pad); if (G_UNLIKELY (needs_events)) { @@ -4351,13 +4342,14 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size, return ret; /* ERROR recovery here */ -flushed: +pre_probe_stopped: { - GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "we are flushing"); + GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pre probe returned %s", + gst_flow_get_name (ret)); GST_OBJECT_UNLOCK (pad); - return GST_FLOW_NOT_LINKED; + return ret; } -not_connected: +not_linked: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pulling range, but it was not linked"); @@ -4372,13 +4364,14 @@ pull_range_failed: pad, "pullrange failed, flow: %s", gst_flow_get_name (ret)); return ret; } -dropping: +post_probe_stopped: { GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, - "Dropping data after FALSE probe return"); + "post probe returned %s", gst_flow_get_name (ret)); + GST_OBJECT_UNLOCK (pad); gst_buffer_unref (*buffer); *buffer = NULL; - return GST_FLOW_UNEXPECTED; + return ret; } events_error: { @@ -4410,6 +4403,7 @@ events_error: gboolean gst_pad_push_event (GstPad * pad, GstEvent * event) { + GstFlowReturn ret; GstPad *peerpad; gboolean result, need_probes, do_probes = TRUE, do_event_actions = TRUE; gboolean stored = FALSE; @@ -4514,27 +4508,24 @@ again: break; } } + if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) + goto flushed; - while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) - if (handle_pad_block (pad, - GST_BLOCK_TYPE_PUSH | GST_BLOCK_TYPE_DATA) != GST_FLOW_OK) - goto flushed; + ret = do_probe_callbacks (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_EVENT + | GST_PROBE_TYPE_BLOCK, event); + if (ret != GST_FLOW_OK) + goto probe_stopped; break; } } /* send probes after modifying the events above */ - if (G_UNLIKELY (need_probes)) { - do_probes = FALSE; - GST_OBJECT_UNLOCK (pad); - - if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (event))) - goto dropped; - - /* retry, we released the lock */ - goto again; - } + ret = + do_probe_callbacks (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_EVENT, + event); + if (ret != GST_FLOW_OK) + goto probe_stopped; /* now check the peer pad */ if (peerpad == NULL) @@ -4567,9 +4558,10 @@ flushed: gst_event_unref (event); return stored; } -dropped: +probe_stopped: { - GST_DEBUG_OBJECT (pad, "Dropping event after FALSE probe return"); + GST_DEBUG_OBJECT (pad, "Probe returned %s", gst_flow_get_name (ret)); + GST_OBJECT_UNLOCK (pad); gst_event_unref (event); return stored; } @@ -4613,6 +4605,7 @@ not_linked: gboolean gst_pad_send_event (GstPad * pad, GstEvent * event) { + GstFlowReturn ret; gboolean result = FALSE; gboolean serialized, need_unlock = FALSE, needs_events, sticky; @@ -4633,15 +4626,6 @@ gst_pad_send_event (GstPad * pad, GstEvent * event) } else goto unknown_direction; - /* pad signals */ - if (G_UNLIKELY (GST_PAD_DO_EVENT_SIGNALS (pad) > 0)) { - GST_OBJECT_UNLOCK (pad); - - if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (event))) - goto dropping; - - GST_OBJECT_LOCK (pad); - } /* get the flag first, we clear it when we have a FLUSH or a non-serialized * event. */ needs_events = GST_PAD_NEEDS_EVENTS (pad); @@ -4727,10 +4711,14 @@ gst_pad_send_event (GstPad * pad, GstEvent * event) needs_events = TRUE; } } - /* now check the flushing flag */ + /* now do the probe */ if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad))) goto flushing; + ret = do_probe_callbacks (pad, GST_PROBE_TYPE_EVENT, event); + if (ret != GST_FLOW_OK) + goto probe_stopped; + break; } @@ -4802,9 +4790,12 @@ flushing: gst_event_unref (event); return FALSE; } -dropping: +probe_stopped: { - GST_DEBUG_OBJECT (pad, "Dropping event after FALSE probe return"); + GST_DEBUG_OBJECT (pad, "probe returned %s", gst_flow_get_name (ret)); + GST_OBJECT_UNLOCK (pad); + if (need_unlock) + GST_PAD_STREAM_UNLOCK (pad); gst_event_unref (event); return FALSE; } diff --git a/gst/gstpad.h b/gst/gstpad.h index 850957fbe1..2d7088c30d 100644 --- a/gst/gstpad.h +++ b/gst/gstpad.h @@ -471,32 +471,62 @@ typedef void (*GstPadFixateCapsFunction) (GstPad *pad, GstCaps *caps); typedef gboolean (*GstPadDispatcherFunction) (GstPad *pad, gpointer data); /** - * GstBlockType: - * @GST_BLOCK_TYPE_IDLE: The pad is idle - * @GST_BLOCK_TYPE_DATA: Data is queued on the pad - * @GST_BLOCK_TYPE_PUSH: Blocked on a push operation - * @GST_BLOCK_TYPE_PULL: Blocked on a pull operation + * GstProbeType: + * @GST_PROBE_TYPE_INVALID: invalid probe type + * @GST_PROBE_TYPE_IDLE: probe idle pads and block + * @GST_PROBE_TYPE_BLOCK: probe and block pads + * @GST_PROBE_TYPE_BUFFER: probe buffers + * @GST_PROBE_TYPE_BUFFER_LIST: probe buffer lists + * @GST_PROBE_TYPE_EVENT: probe events + * @GST_PROBE_TYPE_PUSH: probe push + * @GST_PROBE_TYPE_PULL: probe pull * - * The different blocking types that can occur. + * The different probing types that can occur. */ typedef enum { - GST_BLOCK_TYPE_IDLE = (1 << 0), - GST_BLOCK_TYPE_DATA = (1 << 1), - GST_BLOCK_TYPE_PUSH = (1 << 2), - GST_BLOCK_TYPE_PULL = (1 << 3), -} GstBlockType; + GST_PROBE_TYPE_INVALID = (1 << 0), + GST_PROBE_TYPE_IDLE = (1 << 1), + GST_PROBE_TYPE_BLOCK = (1 << 2), + GST_PROBE_TYPE_BUFFER = (1 << 3), + GST_PROBE_TYPE_BUFFER_LIST = (1 << 4), + GST_PROBE_TYPE_EVENT = (1 << 5), + GST_PROBE_TYPE_PUSH = (1 << 6), + GST_PROBE_TYPE_PULL = (1 << 7), +} GstProbeType; + +#define GST_PROBE_TYPE_DATA (GST_PROBE_TYPE_BUFFER | GST_PROBE_TYPE_EVENT | GST_PROBE_TYPE_BUFFER_LIST) /** - * GstPadBlockCallback: + * GstProbeResult: + * @GST_PROBE_RESULT_OK: normal probe return value + * @GST_PROBE_RESULT_DROP: drop data in data probes + * @GST_PROBE_RESULT_REMOVE: remove probe + * @GST_PROBE_RESULT_PASS: pass the data item in the block probe and block on + * the next item + * + * Different return values for the GstPadProbeCallback. + */ +typedef enum +{ + GST_PROBE_OK, + GST_PROBE_DROP, + GST_PROBE_REMOVE, + GST_PROBE_PASS, +} GstProbeReturn; + +/** + * GstPadProbeCallback * @pad: the #GstPad that is blocked - * @type: the current blocking type + * @type: the current probe type + * @type_data: type specific data * @user_data: the gpointer to optional user data. * - * Callback used by gst_pad_block(). Gets called to notify about the current + * Callback used by gst_pad_add_probe(). Gets called to notify about the current * blocking type. */ -typedef void (*GstPadBlockCallback) (GstPad *pad, GstBlockType type, gpointer user_data); +typedef GstProbeReturn (*GstPadProbeCallback) (GstPad *pad, GstProbeType type, + gpointer type_data, gpointer user_data); /** * GstPadStickyEventsForeachFunction: @@ -617,11 +647,7 @@ struct _GstPad { /*< public >*/ /* with LOCK */ /* block cond, mutex is from the object */ GCond *block_cond; - GstBlockType block_type; - GstPadBlockCallback block_callback; - gpointer block_data; - GDestroyNotify block_destroy_data; - gboolean block_callback_called; + GHookList probes; /* the pad capabilities */ GstPadGetCapsFunction getcapsfunc; @@ -659,6 +685,7 @@ struct _GstPad { * of handlers attached. */ gint do_buffer_signals; gint do_event_signals; + gint num_blocked; /*< private >*/ GstPadPrivate *priv; @@ -822,12 +849,12 @@ gboolean gst_pad_is_active (GstPad *pad); gboolean gst_pad_activate_pull (GstPad *pad, gboolean active); gboolean gst_pad_activate_push (GstPad *pad, gboolean active); -gboolean gst_pad_block (GstPad *pad, - GstBlockType type, - GstPadBlockCallback callback, +gulong gst_pad_add_probe (GstPad *pad, + GstProbeType mask, + GstPadProbeCallback callback, gpointer user_data, GDestroyNotify destroy_data); -void gst_pad_unblock (GstPad *pad); +void gst_pad_remove_probe (GstPad *pad, gulong id); gboolean gst_pad_is_blocked (GstPad *pad); gboolean gst_pad_is_blocking (GstPad *pad); diff --git a/gst/gstutils.c b/gst/gstutils.c index 780bba948e..29df1d5839 100644 --- a/gst/gstutils.c +++ b/gst/gstutils.c @@ -3127,218 +3127,6 @@ gst_pad_query_peer_convert (GstPad * pad, GstFormat src_format, gint64 src_val, return ret; } -/** - * gst_pad_add_data_probe: - * @pad: pad to add the data probe handler to - * @handler: function to call when data is passed over pad - * @data: (closure): data to pass along with the handler - * @notify: (allow-none): function to call when the probe is disconnected, - * or NULL - * - * Adds a "data probe" to a pad. This function will be called whenever data - * passes through a pad. In this case data means both events and buffers. The - * probe will be called with the data as an argument, meaning @handler should - * have the same callback signature as the #GstPad::have-data signal. - * Note that the data will have a reference count greater than 1, so it will - * be immutable -- you must not change it. - * - * For source pads, the probe will be called after the blocking function, if any - * (see gst_pad_set_blocked_async()), but before looking up the peer to chain - * to. For sink pads, the probe function will be called before configuring the - * sink with new caps, if any, and before calling the pad's chain function. - * - * Your data probe should return TRUE to let the data continue to flow, or FALSE - * to drop it. Dropping data is rarely useful, but occasionally comes in handy - * with events. - * - * Although probes are implemented internally by connecting @handler to the - * have-data signal on the pad, if you want to remove a probe it is insufficient - * to only call g_signal_handler_disconnect on the returned handler id. To - * remove a probe, use the appropriate function, such as - * gst_pad_remove_data_probe(). - * - * The @notify function is called when the probe is disconnected and usually - * used to free @data. - * - * Returns: The handler id. - * - * Since: 0.10.20 - */ -gulong -gst_pad_add_data_probe (GstPad * pad, GCallback handler, - gpointer data, GDestroyNotify notify) -{ - gulong sigid; - - g_return_val_if_fail (GST_IS_PAD (pad), 0); - g_return_val_if_fail (handler != NULL, 0); - - GST_OBJECT_LOCK (pad); - - /* we only expose a GDestroyNotify in our API because that's less confusing */ - sigid = g_signal_connect_data (pad, "have-data", handler, data, - (GClosureNotify) notify, 0); - - GST_PAD_DO_EVENT_SIGNALS (pad)++; - GST_PAD_DO_BUFFER_SIGNALS (pad)++; - GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, - "adding data probe, now %d data, %d event probes", - GST_PAD_DO_BUFFER_SIGNALS (pad), GST_PAD_DO_EVENT_SIGNALS (pad)); - GST_OBJECT_UNLOCK (pad); - - return sigid; -} - -/** - * gst_pad_add_event_probe: - * @pad: pad to add the event probe handler to - * @handler: function to call when events are passed over pad - * @data: (closure): data to pass along with the handler, or NULL - * @notify: (allow-none): function to call when probe is disconnected, or NULL - * - * Adds a probe that will be called for all events passing through a pad. See - * gst_pad_add_data_probe() for more information. - * - * The @notify function is called when the probe is disconnected and usually - * used to free @data. - * - * Returns: The handler id - * - * Since: 0.10.20 - */ -gulong -gst_pad_add_event_probe (GstPad * pad, GCallback handler, - gpointer data, GDestroyNotify notify) -{ - gulong sigid; - - g_return_val_if_fail (GST_IS_PAD (pad), 0); - g_return_val_if_fail (handler != NULL, 0); - - GST_OBJECT_LOCK (pad); - - /* we only expose a GDestroyNotify in our API because that's less confusing */ - sigid = g_signal_connect_data (pad, "have-data::event", handler, data, - (GClosureNotify) notify, 0); - - GST_PAD_DO_EVENT_SIGNALS (pad)++; - GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "adding event probe, now %d probes", - GST_PAD_DO_EVENT_SIGNALS (pad)); - GST_OBJECT_UNLOCK (pad); - - return sigid; -} - -/** - * gst_pad_add_buffer_probe: - * @pad: pad to add the buffer probe handler to - * @handler: function to call when buffer are passed over pad - * @data: (closure): data to pass along with the handler - * @notify: (allow-none): function to call when the probe is disconnected, - * or NULL - * - * Adds a probe that will be called for all buffers passing through a pad. See - * gst_pad_add_data_probe() for more information. - * - * The @notify function is called when the probe is disconnected and usually - * used to free @data. - * - * Returns: The handler id - * - * Since: 0.10.20 - */ -gulong -gst_pad_add_buffer_probe (GstPad * pad, GCallback handler, - gpointer data, GDestroyNotify notify) -{ - gulong sigid; - - g_return_val_if_fail (GST_IS_PAD (pad), 0); - g_return_val_if_fail (handler != NULL, 0); - - GST_OBJECT_LOCK (pad); - - /* we only expose a GDestroyNotify in our API because that's less confusing */ - sigid = g_signal_connect_data (pad, "have-data::buffer", handler, data, - (GClosureNotify) notify, 0); - - GST_PAD_DO_BUFFER_SIGNALS (pad)++; - GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "adding buffer probe, now %d probes", - GST_PAD_DO_BUFFER_SIGNALS (pad)); - GST_OBJECT_UNLOCK (pad); - - return sigid; -} - -/** - * gst_pad_remove_data_probe: - * @pad: pad to remove the data probe handler from - * @handler_id: handler id returned from gst_pad_add_data_probe - * - * Removes a data probe from @pad. - */ -void -gst_pad_remove_data_probe (GstPad * pad, guint handler_id) -{ - g_return_if_fail (GST_IS_PAD (pad)); - g_return_if_fail (handler_id > 0); - - GST_OBJECT_LOCK (pad); - g_signal_handler_disconnect (pad, handler_id); - GST_PAD_DO_BUFFER_SIGNALS (pad)--; - GST_PAD_DO_EVENT_SIGNALS (pad)--; - GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, - "removed data probe, now %d event, %d buffer probes", - GST_PAD_DO_EVENT_SIGNALS (pad), GST_PAD_DO_BUFFER_SIGNALS (pad)); - GST_OBJECT_UNLOCK (pad); - -} - -/** - * gst_pad_remove_event_probe: - * @pad: pad to remove the event probe handler from - * @handler_id: handler id returned from gst_pad_add_event_probe - * - * Removes an event probe from @pad. - */ -void -gst_pad_remove_event_probe (GstPad * pad, guint handler_id) -{ - g_return_if_fail (GST_IS_PAD (pad)); - g_return_if_fail (handler_id > 0); - - GST_OBJECT_LOCK (pad); - g_signal_handler_disconnect (pad, handler_id); - GST_PAD_DO_EVENT_SIGNALS (pad)--; - GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, - "removed event probe, now %d event probes", - GST_PAD_DO_EVENT_SIGNALS (pad)); - GST_OBJECT_UNLOCK (pad); -} - -/** - * gst_pad_remove_buffer_probe: - * @pad: pad to remove the buffer probe handler from - * @handler_id: handler id returned from gst_pad_add_buffer_probe - * - * Removes a buffer probe from @pad. - */ -void -gst_pad_remove_buffer_probe (GstPad * pad, guint handler_id) -{ - g_return_if_fail (GST_IS_PAD (pad)); - g_return_if_fail (handler_id > 0); - - GST_OBJECT_LOCK (pad); - g_signal_handler_disconnect (pad, handler_id); - GST_PAD_DO_BUFFER_SIGNALS (pad)--; - GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, - "removed buffer probe, now %d buffer probes", - GST_PAD_DO_BUFFER_SIGNALS (pad)); - GST_OBJECT_UNLOCK (pad); - -} - /** * gst_element_found_tags_for_pad: * @element: element for which to post taglist to bus. diff --git a/gst/gstutils.h b/gst/gstutils.h index ff0378630f..a5931cafbb 100644 --- a/gst/gstutils.h +++ b/gst/gstutils.h @@ -936,28 +936,6 @@ GstPad * gst_bin_find_unlinked_pad (GstBin *bin, GstPadDire GstBuffer * gst_buffer_merge (GstBuffer * buf1, GstBuffer * buf2); GstBuffer * gst_buffer_join (GstBuffer * buf1, GstBuffer * buf2); -/* probes */ -gulong gst_pad_add_data_probe (GstPad * pad, - GCallback handler, - gpointer data, - GDestroyNotify notify); - -void gst_pad_remove_data_probe (GstPad * pad, guint handler_id); - -gulong gst_pad_add_event_probe (GstPad * pad, - GCallback handler, - gpointer data, - GDestroyNotify notify); - -void gst_pad_remove_event_probe (GstPad * pad, guint handler_id); - -gulong gst_pad_add_buffer_probe (GstPad * pad, - GCallback handler, - gpointer data, - GDestroyNotify notify); - -void gst_pad_remove_buffer_probe (GstPad * pad, guint handler_id); - /* tag emission utility functions */ void gst_element_found_tags_for_pad (GstElement * element, GstPad * pad, diff --git a/libs/gst/check/gstbufferstraw.c b/libs/gst/check/gstbufferstraw.c index df7227ec92..71504ff64f 100644 --- a/libs/gst/check/gstbufferstraw.c +++ b/libs/gst/check/gstbufferstraw.c @@ -38,7 +38,8 @@ static gulong id; /* called for every buffer. Waits until the global "buf" variable is unset, * then sets it to the buffer received, and signals. */ static gboolean -buffer_probe (GstPad * pad, GstBuffer * buffer, gpointer unused) +buffer_probe (GstPad * pad, GstProbeType type, GstBuffer * buffer, + gpointer unused) { g_mutex_lock (lock); @@ -81,7 +82,8 @@ gst_buffer_straw_start_pipeline (GstElement * bin, GstPad * pad) { GstStateChangeReturn ret; - id = gst_pad_add_buffer_probe (pad, G_CALLBACK (buffer_probe), NULL, NULL); + id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) buffer_probe, NULL, NULL); cond = g_cond_new (); lock = g_mutex_new (); @@ -149,7 +151,7 @@ gst_buffer_straw_stop_pipeline (GstElement * bin, GstPad * pad) if (buf) gst_buffer_unref (buf); buf = NULL; - gst_pad_remove_buffer_probe (pad, (guint) id); + gst_pad_remove_probe (pad, (guint) id); id = 0; g_cond_signal (cond); g_mutex_unlock (lock); diff --git a/libs/gst/check/gstconsistencychecker.c b/libs/gst/check/gstconsistencychecker.c index ecbdc33836..731fc5a60a 100644 --- a/libs/gst/check/gstconsistencychecker.c +++ b/libs/gst/check/gstconsistencychecker.c @@ -42,7 +42,7 @@ struct _GstStreamConsistency }; static gboolean -source_pad_data_cb (GstPad * pad, GstMiniObject * data, +source_pad_data_cb (GstPad * pad, GstProbeType type, GstMiniObject * data, GstStreamConsistency * consist) { if (GST_IS_BUFFER (data)) { @@ -118,8 +118,8 @@ gst_consistency_checker_new (GstPad * pad) consist = g_new0 (GstStreamConsistency, 1); consist->pad = g_object_ref (pad); consist->probeid = - gst_pad_add_data_probe (pad, (GCallback) source_pad_data_cb, consist, - NULL); + gst_pad_add_probe (pad, GST_PROBE_TYPE_DATA, + (GstPadProbeCallback) source_pad_data_cb, consist, NULL); return consist; } @@ -154,7 +154,7 @@ void gst_consistency_checker_free (GstStreamConsistency * consist) { /* Remove the data probe */ - gst_pad_remove_data_probe (consist->pad, consist->probeid); + gst_pad_remove_probe (consist->pad, consist->probeid); g_object_unref (consist->pad); g_free (consist); } diff --git a/tests/check/gst/gstevent.c b/tests/check/gst/gstevent.c index bdec9823b0..4e3847fd20 100644 --- a/tests/check/gst/gstevent.c +++ b/tests/check/gst/gstevent.c @@ -264,14 +264,15 @@ static GTimeVal sent_event_time; static GstEvent *got_event_before_q, *got_event_after_q; static GTimeVal got_event_time; -static gboolean -event_probe (GstPad * pad, GstMiniObject ** data, gpointer user_data) +static GstProbeReturn +event_probe (GstPad * pad, GstProbeType type, GstMiniObject * data, + gpointer user_data) { gboolean before_q = (gboolean) GPOINTER_TO_INT (user_data); - fail_unless (GST_IS_EVENT (data)); + GST_DEBUG ("event probe called %p", data); - GST_DEBUG ("event probe called"); + fail_unless (GST_IS_EVENT (data)); if (before_q) { switch (GST_EVENT_TYPE (GST_EVENT (data))) { @@ -304,7 +305,7 @@ event_probe (GstPad * pad, GstMiniObject ** data, gpointer user_data) } } - return TRUE; + return GST_PROBE_OK; } @@ -318,6 +319,7 @@ typedef struct static void signal_data_init (SignalData * data) { + GST_DEBUG ("init %p", data); data->lock = g_mutex_new (); data->cond = g_cond_new (); data->signaled = FALSE; @@ -326,6 +328,7 @@ signal_data_init (SignalData * data) static void signal_data_cleanup (SignalData * data) { + GST_DEBUG ("free %p", data); g_mutex_free (data->lock); g_cond_free (data->cond); } @@ -336,6 +339,7 @@ signal_data_signal (SignalData * data) g_mutex_lock (data->lock); data->signaled = TRUE; g_cond_broadcast (data->cond); + GST_DEBUG ("signaling %p", data); g_mutex_unlock (data->lock); } @@ -343,17 +347,24 @@ static void signal_data_wait (SignalData * data) { g_mutex_lock (data->lock); + GST_DEBUG ("signal wait %p", data); while (!data->signaled) g_cond_wait (data->cond, data->lock); + GST_DEBUG ("signal wait done %p", data); g_mutex_unlock (data->lock); } -static void -signal_blocked (GstPad * pad, GstBlockType type, gpointer user_data) +static GstProbeReturn +signal_blocked (GstPad * pad, GstProbeType type, gpointer type_data, + gpointer user_data) { SignalData *data = (SignalData *) user_data; + GST_DEBUG ("signal called %p", data); signal_data_signal (data); + GST_DEBUG ("signal done %p", data); + + return GST_PROBE_OK; } static void test_event @@ -364,6 +375,7 @@ static void test_event GstPad *peer; gint i; SignalData data; + gulong id; got_event_before_q = got_event_after_q = NULL; @@ -382,8 +394,9 @@ static void test_event signal_data_init (&data); /* We block the pad so the stream lock is released and we can send the event */ - fail_unless (gst_pad_block (fake_srcpad, GST_BLOCK_TYPE_DATA, - signal_blocked, &data, NULL) == TRUE); + id = gst_pad_add_probe (fake_srcpad, GST_PROBE_TYPE_BLOCK, + signal_blocked, &data, NULL); + fail_unless (id != 0); signal_data_wait (&data); @@ -392,8 +405,7 @@ static void test_event gst_pad_send_event (peer, event); gst_object_unref (peer); - gst_pad_unblock (fake_srcpad); - signal_data_cleanup (&data); + gst_pad_remove_probe (fake_srcpad, id); if (expect_before_q) { /* Wait up to 5 seconds for the event to appear */ @@ -428,6 +440,8 @@ static void test_event gst_event_unref (got_event_after_q); got_event_before_q = got_event_after_q = NULL; + + signal_data_cleanup (&data); } static gint64 @@ -464,12 +478,12 @@ GST_START_TEST (send_custom_events) /* add pad-probes to faksrc.src and fakesink.sink */ fail_if ((srcpad = gst_element_get_static_pad (fakesrc, "src")) == NULL); - gst_pad_add_event_probe (srcpad, (GCallback) event_probe, - GINT_TO_POINTER (TRUE), NULL); + gst_pad_add_probe (srcpad, GST_PROBE_TYPE_EVENT, + (GstPadProbeCallback) event_probe, GINT_TO_POINTER (TRUE), NULL); fail_if ((sinkpad = gst_element_get_static_pad (fakesink, "sink")) == NULL); - gst_pad_add_event_probe (sinkpad, (GCallback) event_probe, - GINT_TO_POINTER (FALSE), NULL); + gst_pad_add_probe (sinkpad, GST_PROBE_TYPE_EVENT, + (GstPadProbeCallback) event_probe, GINT_TO_POINTER (FALSE), NULL); /* Upstream events */ test_event (pipeline, GST_EVENT_CUSTOM_UPSTREAM, sinkpad, TRUE, srcpad); diff --git a/tests/check/gst/gstghostpad.c b/tests/check/gst/gstghostpad.c index 9dfebb1b24..02e361411e 100644 --- a/tests/check/gst/gstghostpad.c +++ b/tests/check/gst/gstghostpad.c @@ -478,8 +478,9 @@ typedef struct GCond *cond; } BlockData; -static void -block_callback (GstPad * pad, GstBlockType type, gpointer user_data) +static GstProbeReturn +block_callback (GstPad * pad, GstProbeType type, gpointer type_data, + gpointer user_data) { BlockData *block_data = (BlockData *) user_data; @@ -487,6 +488,8 @@ block_callback (GstPad * pad, GstBlockType type, gpointer user_data) GST_DEBUG ("blocked\n"); g_cond_signal (block_data->cond); g_mutex_unlock (block_data->mutex); + + return GST_PROBE_OK; } GST_START_TEST (test_ghost_pads_block) @@ -514,7 +517,7 @@ GST_START_TEST (test_ghost_pads_block) block_data.cond = g_cond_new (); g_mutex_lock (block_data.mutex); - gst_pad_block (srcghost, GST_BLOCK_TYPE_DATA, block_callback, &block_data, + gst_pad_add_probe (srcghost, GST_PROBE_TYPE_DATA, block_callback, &block_data, NULL); gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING); /* and wait now */ @@ -556,7 +559,7 @@ GST_START_TEST (test_ghost_pads_probes) block_data.cond = g_cond_new (); g_mutex_lock (block_data.mutex); - gst_pad_block (srcghost, GST_BLOCK_TYPE_DATA, block_callback, &block_data, + gst_pad_add_probe (srcghost, GST_PROBE_TYPE_DATA, block_callback, &block_data, NULL); gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING); /* and wait now */ diff --git a/tests/check/gst/gstpad.c b/tests/check/gst/gstpad.c index 26303182a5..6b62b2589d 100644 --- a/tests/check/gst/gstpad.c +++ b/tests/check/gst/gstpad.c @@ -249,14 +249,16 @@ GST_START_TEST (test_name_is_valid) GST_END_TEST; -static gboolean -_probe_handler (GstPad * pad, GstBuffer * buffer, gpointer userdata) +static GstProbeReturn +_probe_handler (GstPad * pad, GstProbeType type, GstBuffer * buffer, + gpointer userdata) { gint ret = GPOINTER_TO_INT (userdata); if (ret == 1) - return TRUE; - return FALSE; + return GST_PROBE_OK; + + return GST_PROBE_DROP; } GST_START_TEST (test_push_unlinked) @@ -276,6 +278,15 @@ GST_START_TEST (test_push_unlinked) gst_pad_set_caps (src, caps); ASSERT_CAPS_REFCOUNT (caps, "caps", 2); + /* pushing on an inactive pad will return wrong state */ + buffer = gst_buffer_new (); + gst_buffer_ref (buffer); + fail_unless (gst_pad_push (src, buffer) == GST_FLOW_WRONG_STATE); + ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1); + gst_buffer_unref (buffer); + + gst_pad_set_active (src, TRUE); + /* pushing on an unlinked pad will drop the buffer */ buffer = gst_buffer_new (); gst_buffer_ref (buffer); @@ -285,25 +296,25 @@ GST_START_TEST (test_push_unlinked) /* adding a probe that returns FALSE will drop the buffer without trying * to chain */ - id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler, - GINT_TO_POINTER (0), NULL); + id = gst_pad_add_probe (src, GST_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) _probe_handler, GINT_TO_POINTER (0), NULL); buffer = gst_buffer_new (); gst_buffer_ref (buffer); fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK); ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1); gst_buffer_unref (buffer); - gst_pad_remove_buffer_probe (src, id); + gst_pad_remove_probe (src, id); /* adding a probe that returns TRUE will still chain the buffer, * and hence drop because pad is unlinked */ - id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler, - GINT_TO_POINTER (1), NULL); + id = gst_pad_add_probe (src, GST_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) _probe_handler, GINT_TO_POINTER (1), NULL); buffer = gst_buffer_new (); gst_buffer_ref (buffer); fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED); ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1); gst_buffer_unref (buffer); - gst_pad_remove_buffer_probe (src, id); + gst_pad_remove_probe (src, id); /* cleanup */ @@ -376,23 +387,23 @@ GST_START_TEST (test_push_linked) /* adding a probe that returns FALSE will drop the buffer without trying * to chain */ - id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler, - GINT_TO_POINTER (0), NULL); + id = gst_pad_add_probe (src, GST_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) _probe_handler, GINT_TO_POINTER (0), NULL); buffer = gst_buffer_new (); gst_buffer_ref (buffer); fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK); ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1); gst_buffer_unref (buffer); - gst_pad_remove_buffer_probe (src, id); + gst_pad_remove_probe (src, id); fail_unless_equals_int (g_list_length (buffers), 0); /* adding a probe that returns TRUE will still chain the buffer */ - id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler, - GINT_TO_POINTER (1), NULL); + id = gst_pad_add_probe (src, GST_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) _probe_handler, GINT_TO_POINTER (1), NULL); buffer = gst_buffer_new (); gst_buffer_ref (buffer); fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK); - gst_pad_remove_buffer_probe (src, id); + gst_pad_remove_probe (src, id); ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 2); gst_buffer_unref (buffer); @@ -681,12 +692,15 @@ GST_START_TEST (test_sink_unref_unlink) GST_END_TEST; -static void -block_async_cb (GstPad * pad, GstBlockType type, gpointer user_data) +static gulong id; + +static GstProbeReturn +block_async_cb (GstPad * pad, GstProbeType type, gpointer type_data, + gpointer user_data) { gboolean *bool_user_data = (gboolean *) user_data; - fail_unless ((type & GST_BLOCK_TYPE_DATA) != 0); + fail_unless ((type & GST_PROBE_TYPE_BLOCK) != 0); /* here we should have blocked == 0 unblocked == 0 */ fail_unless (bool_user_data[0] == FALSE); @@ -694,8 +708,10 @@ block_async_cb (GstPad * pad, GstBlockType type, gpointer user_data) bool_user_data[0] = TRUE; - gst_pad_unblock (pad); + gst_pad_remove_probe (pad, id); bool_user_data[1] = TRUE; + + return GST_PROBE_OK; } GST_START_TEST (test_block_async) @@ -709,7 +725,8 @@ GST_START_TEST (test_block_async) fail_unless (pad != NULL); gst_pad_set_active (pad, TRUE); - gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_cb, &data, NULL); + id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK, block_async_cb, &data, + NULL); fail_unless (data[0] == FALSE); fail_unless (data[1] == FALSE); @@ -786,13 +803,16 @@ block_async_full_destroy (gpointer user_data) *state = 2; } -static void -block_async_full_cb (GstPad * pad, GstBlockType type, gpointer user_data) +static GstProbeReturn +block_async_full_cb (GstPad * pad, GstProbeType type, gpointer type_data, + gpointer user_data) { *(gint *) user_data = (gint) TRUE; gst_pad_push_event (pad, gst_event_new_flush_start ()); GST_DEBUG ("setting state to 1"); + + return GST_PROBE_OK; } GST_START_TEST (test_block_async_full_destroy) @@ -800,12 +820,13 @@ GST_START_TEST (test_block_async_full_destroy) GstPad *pad; /* 0 = unblocked, 1 = blocked, 2 = destroyed */ gint state = 0; + gulong id; pad = gst_pad_new ("src", GST_PAD_SRC); fail_unless (pad != NULL); gst_pad_set_active (pad, TRUE); - gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb, + id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK, block_async_full_cb, &state, block_async_full_destroy); fail_unless (state == 0); @@ -815,25 +836,8 @@ GST_START_TEST (test_block_async_full_destroy) fail_unless (state == 1); gst_pad_push_event (pad, gst_event_new_flush_stop ()); - /* pad was already blocked so nothing happens */ - gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb, - &state, block_async_full_destroy); - fail_unless (state == 1); - /* unblock callback is called */ - gst_pad_unblock (pad); - fail_unless (state == 2); - - /* block with the same data, nothing is called */ - state = 1; - gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb, - &state, block_async_full_destroy); - fail_unless (state == 1); - - /* now change user_data (to NULL in this case) so destroy_notify should be - * called */ - state = 1; - gst_pad_unblock (pad); + gst_pad_remove_probe (pad, id); fail_unless (state == 2); gst_object_unref (pad); @@ -846,12 +850,13 @@ GST_START_TEST (test_block_async_full_destroy_dispose) GstPad *pad; /* 0 = unblocked, 1 = blocked, 2 = destroyed */ gint state = 0; + gulong id; pad = gst_pad_new ("src", GST_PAD_SRC); fail_unless (pad != NULL); gst_pad_set_active (pad, TRUE); - gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb, + id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK, block_async_full_cb, &state, block_async_full_destroy); gst_pad_push (pad, gst_buffer_new ()); @@ -860,7 +865,7 @@ GST_START_TEST (test_block_async_full_destroy_dispose) fail_unless_equals_int (state, 1); gst_pad_push_event (pad, gst_event_new_flush_stop ()); - /* gst_pad_dispose calls the destroy_notify function if necessary */ + /* gst_BLOCK calls the destroy_notify function if necessary */ gst_object_unref (pad); fail_unless_equals_int (state, 2); @@ -896,13 +901,13 @@ unblock_async_not_called (GstPad * pad, gboolean blocked, gpointer user_data) } #endif -static void -block_async_second_no_flush (GstPad * pad, GstBlockType type, - gpointer user_data) +static GstProbeReturn +block_async_second_no_flush (GstPad * pad, GstProbeType type, + gpointer type_data, gpointer user_data) { gboolean *bool_user_data = (gboolean *) user_data; - fail_unless (type & GST_BLOCK_TYPE_DATA); + fail_unless (type & GST_PROBE_TYPE_BLOCK); fail_unless (bool_user_data[0] == TRUE); fail_unless (bool_user_data[1] == FALSE); @@ -910,16 +915,19 @@ block_async_second_no_flush (GstPad * pad, GstBlockType type, bool_user_data[1] = TRUE; - gst_pad_unblock (pad); + gst_pad_remove_probe (pad, id); + + return GST_PROBE_OK; } -static void -block_async_first_no_flush (GstPad * pad, GstBlockType type, gpointer user_data) +static GstProbeReturn +block_async_first_no_flush (GstPad * pad, GstProbeType type, gpointer type_data, + gpointer user_data) { static int n_calls = 0; gboolean *bool_user_data = (gboolean *) user_data; - fail_unless (type & GST_BLOCK_TYPE_DATA); + fail_unless (type & GST_PROBE_TYPE_BLOCK); if (++n_calls > 1) /* we expect this callback to be called only once */ @@ -931,12 +939,14 @@ block_async_first_no_flush (GstPad * pad, GstBlockType type, gpointer user_data) fail_unless (bool_user_data[1] == FALSE); fail_unless (bool_user_data[2] == FALSE); - gst_pad_unblock (pad); + gst_pad_remove_probe (pad, id); /* replace block_async_first with block_async_second so next time the pad is * blocked the latter should be called */ - fail_unless (gst_pad_block (pad, GST_BLOCK_TYPE_DATA, - block_async_second_no_flush, user_data, NULL)); + id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK, + block_async_second_no_flush, user_data, NULL); + + return GST_PROBE_OK; } GST_START_TEST (test_block_async_replace_callback_no_flush) @@ -948,8 +958,9 @@ GST_START_TEST (test_block_async_replace_callback_no_flush) fail_unless (pad != NULL); gst_pad_set_active (pad, TRUE); - fail_unless (gst_pad_block (pad, GST_BLOCK_TYPE_DATA, - block_async_first_no_flush, bool_user_data, NULL)); + id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK, + block_async_first_no_flush, bool_user_data, NULL); + fail_if (id == 0); gst_pad_push (pad, gst_buffer_new ()); fail_unless (bool_user_data[0] == TRUE); diff --git a/tests/check/gst/gstpipeline.c b/tests/check/gst/gstpipeline.c index 3d7e4a315b..97de5417f7 100644 --- a/tests/check/gst/gstpipeline.c +++ b/tests/check/gst/gstpipeline.c @@ -238,7 +238,7 @@ static GMutex *probe_lock; static GCond *probe_cond; static gboolean -sink_pad_probe (GstPad * pad, GstBuffer * buffer, +sink_pad_probe (GstPad * pad, GstProbeType type, GstBuffer * buffer, GstClockTime * first_timestamp) { fail_if (GST_BUFFER_TIMESTAMP (buffer) == GST_CLOCK_TIME_NONE, @@ -274,7 +274,8 @@ GST_START_TEST (test_base_time) gst_element_link (fakesrc, fakesink); sink = gst_element_get_static_pad (fakesink, "sink"); - gst_pad_add_buffer_probe (sink, G_CALLBACK (sink_pad_probe), &observed, NULL); + gst_pad_add_probe (sink, GST_PROBE_TYPE_BUFFER, + (GstPadProbeCallback) sink_pad_probe, &observed, NULL); fail_unless (gst_element_set_state (pipeline, GST_STATE_PAUSED) == GST_STATE_CHANGE_NO_PREROLL, "expected no-preroll from live pipeline");