pad: rework pad blocking, first part

Make pad block call the callback as soon as the pad is not in use. This makes it
possible to make sure that when the callback is called, no activity is happening
on the pad and that no activity will ever happen until the pad is unblocked
again. This makes pad blocking work when there is no dataflow or after EOS and
greatly helps dynamic pipelines.
Move the probe handling right where we wait on the pad block. The two are
related but not the same and the probe can eventually influence the pad
blocking as we'll se later.
Fix up some broken unit tests or tests that fail with the new behaviour.
This commit is contained in:
Wim Taymans 2011-05-27 17:20:56 +02:00
parent 690c81b95d
commit 415da89f3c
5 changed files with 307 additions and 332 deletions

View file

@ -121,7 +121,6 @@ static void gst_pad_set_property (GObject * object, guint prop_id,
static void gst_pad_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstFlowReturn handle_pad_block (GstPad * pad);
static GstCaps *gst_pad_get_caps_unlocked (GstPad * pad, GstCaps * filter);
static void gst_pad_set_pad_template (GstPad * pad, GstPadTemplate * templ);
static gboolean gst_pad_activate_default (GstPad * pad);
@ -1115,56 +1114,67 @@ gst_pad_set_blocked (GstPad * pad, gboolean blocked,
if (G_UNLIKELY (was_blocked == blocked))
goto had_right_state;
if (pad->block_destroy_data && pad->block_data)
pad->block_destroy_data (pad->block_data);
pad->block_callback = NULL;
pad->block_data = NULL;
pad->block_destroy_data = NULL;
if (blocked) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocking pad");
/* set the blocking flag, future push operations will block */
GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED);
if (pad->block_destroy_data && pad->block_data)
pad->block_destroy_data (pad->block_data);
pad->block_callback = callback;
pad->block_data = user_data;
pad->block_destroy_data = destroy_data;
pad->block_callback_called = FALSE;
if (!callback) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block");
GST_PAD_BLOCK_WAIT (pad);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocked");
if (pad->priv->using > 0) {
/* the pad is in use, we can't signal the callback yet. Since we set the
* flag above, the last thread to leave the push will do the callback. New
* threads going into the push will block. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is in use");
if (callback == NULL) {
/* backwards compat, if there is no callback, this method should wait
* until the pad is blocked. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block");
GST_PAD_BLOCK_WAIT (pad);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocked");
} else {
/* else save the callback, we will signal from the streaming thread when
* the last thread using the pad is stopped */
pad->block_callback = callback;
pad->block_data = user_data;
pad->block_destroy_data = destroy_data;
}
GST_OBJECT_UNLOCK (pad);
} else {
/* the pad is idle now, we can signal the callback now */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is idle");
GST_OBJECT_UNLOCK (pad);
if (callback) {
callback (pad, TRUE, user_data);
if (destroy_data)
destroy_data (user_data);
}
}
} else {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocking pad");
if (GST_PAD_IS_SRC (pad)) {
GstPad *peer;
/* a pad block dropped all events, make sure we copy any new events on the
* srcpad to the sinkpad and schedule an update on the sinkpad */
if ((peer = GST_PAD_PEER (pad))) {
GST_OBJECT_LOCK (peer);
prepare_event_update (pad, peer);
GST_OBJECT_UNLOCK (peer);
}
}
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED);
if (pad->block_destroy_data && pad->block_data)
pad->block_destroy_data (pad->block_data);
pad->block_callback = callback;
pad->block_data = user_data;
pad->block_destroy_data = destroy_data;
pad->block_callback_called = FALSE;
GST_PAD_BLOCK_BROADCAST (pad);
if (!callback) {
/* no callback, wait for the unblock to happen */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for unblock");
GST_PAD_BLOCK_WAIT (pad);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocked");
GST_OBJECT_UNLOCK (pad);
if (callback) {
callback (pad, FALSE, user_data);
if (destroy_data)
destroy_data (user_data);
}
}
GST_OBJECT_UNLOCK (pad);
return TRUE;
@ -1174,6 +1184,9 @@ had_right_state:
"pad was in right state (%d)", was_blocked);
GST_OBJECT_UNLOCK (pad);
if (destroy_data)
destroy_data (user_data);
return FALSE;
}
}
@ -2694,7 +2707,7 @@ gst_pad_set_caps (GstPad * pad, GstCaps * caps)
event = gst_event_new_caps (caps);
if (GST_PAD_IS_SRC (pad))
gst_pad_push_event (pad, event);
res = gst_pad_push_event (pad, event);
else
res = gst_pad_send_event (pad, event);
@ -3436,136 +3449,6 @@ gst_pad_query_default (GstPad * pad, GstQuery * query)
}
}
/*
* should be called with pad OBJECT_LOCK and STREAM_LOCK held.
* GST_PAD_IS_BLOCKED (pad) == TRUE when this function is
* called.
*
* This function performs the pad blocking when an event, buffer push
* is performed on a _SRC_ pad. It blocks the streaming thread after
* informing the pad has been blocked.
*
* An application can with this method wait and block any streaming
* thread and perform operations such as seeking or linking.
*
* Two methods are available for notifying the application of the
* block:
* - the callback method, which happens in the STREAMING thread with
* the STREAM_LOCK held. With this method, the most useful way of
* dealing with the callback is to post a message to the main thread
* where the pad block can then be handled outside of the streaming
* thread. With the last method one can perform all operations such
* as doing a state change, linking, unblocking, seeking etc on the
* pad.
* - the GCond signal method, which makes any thread unblock when
* the pad block happens.
*
* During the actual blocking state, the GST_PAD_BLOCKING flag is set.
* The GST_PAD_BLOCKING flag is unset when the pad was unblocked.
*
* MT safe.
*/
static GstFlowReturn
handle_pad_block (GstPad * pad)
{
GstPadBlockCallback callback;
gpointer user_data;
GstFlowReturn ret = GST_FLOW_OK;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "signal block taken");
/* flushing, don't bother trying to block and return WRONG_STATE
* right away */
if (GST_PAD_IS_FLUSHING (pad))
goto flushingnonref;
/* we grab an extra ref for the callbacks, this is probably not
* needed (callback code does not have a ref and cannot unref). I
* think this was done to make it possible to unref the element in
* the callback, which is in the end totally impossible as it
* requires grabbing the STREAM_LOCK and OBJECT_LOCK which are
* all taken when calling this function. */
gst_object_ref (pad);
while (GST_PAD_IS_BLOCKED (pad)) {
do {
/* we either have a callback installed to notify the block or
* some other thread is doing a GCond wait. */
callback = pad->block_callback;
pad->block_callback_called = TRUE;
if (callback) {
/* there is a callback installed, call it. We release the
* lock so that the callback can do something usefull with the
* pad */
user_data = pad->block_data;
GST_OBJECT_UNLOCK (pad);
callback (pad, TRUE, user_data);
GST_OBJECT_LOCK (pad);
/* we released the lock, recheck flushing */
if (GST_PAD_IS_FLUSHING (pad))
goto flushing;
} else {
/* no callback, signal the thread that is doing a GCond wait
* if any. */
GST_PAD_BLOCK_BROADCAST (pad);
}
} while (pad->block_callback_called == FALSE && GST_PAD_IS_BLOCKED (pad));
/* OBJECT_LOCK could have been released when we did the callback, which
* then could have made the pad unblock so we need to check the blocking
* condition again. */
if (!GST_PAD_IS_BLOCKED (pad))
break;
/* now we block the streaming thread. It can be unlocked when we
* deactivate the pad (which will also set the FLUSHING flag) or
* when the pad is unblocked. A flushing event will also unblock
* the pad after setting the FLUSHING flag. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"Waiting to be unblocked or set flushing");
GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
GST_PAD_BLOCK_WAIT (pad);
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
/* see if we got unblocked by a flush or not */
if (GST_PAD_IS_FLUSHING (pad))
goto flushing;
}
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "got unblocked");
/* when we get here, the pad is unblocked again and we perform
* the needed unblock code. */
callback = pad->block_callback;
if (callback) {
/* we need to call the callback */
user_data = pad->block_data;
GST_OBJECT_UNLOCK (pad);
callback (pad, FALSE, user_data);
GST_OBJECT_LOCK (pad);
} else {
/* we need to signal the thread waiting on the GCond */
GST_PAD_BLOCK_BROADCAST (pad);
}
gst_object_unref (pad);
return ret;
flushingnonref:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad was flushing");
return GST_FLOW_WRONG_STATE;
}
flushing:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad became flushing");
gst_object_unref (pad);
return GST_FLOW_WRONG_STATE;
}
}
/* pad offsets */
/**
@ -3684,6 +3567,8 @@ gst_pad_emit_have_data_signal (GstPad * pad, GstMiniObject * obj)
else
detail = 0;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "Emiting have-data signal");
/* actually emit */
g_signal_emitv (args, gst_pad_signals[PAD_HAVE_DATA], detail, &ret);
res = g_value_get_boolean (&ret);
@ -3951,33 +3836,50 @@ gst_pad_chain_list (GstPad * pad, GstBufferList * list)
static GstFlowReturn
pad_pre_push (GstPad * pad, GstPad ** peer, gpointer data)
{
GstFlowReturn ret;
gboolean need_probes, did_probes = FALSE;
gboolean need_probes, do_probes = TRUE;
again:
GST_OBJECT_LOCK (pad);
/* FIXME: this check can go away; pad_set_blocked could be implemented with
* probes completely or probes with an extended pad block. */
while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
if ((ret = handle_pad_block (pad)) != GST_FLOW_OK)
goto flushed;
do {
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto flushing;
need_probes = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0;
need_probes = do_probes && (GST_PAD_DO_BUFFER_SIGNALS (pad) > 0);
/* we emit signals on the pad arg, the peer will have a chance to
* emit in the _chain() function */
if (G_UNLIKELY (need_probes && !did_probes)) {
did_probes = TRUE;
/* unlock before emitting */
GST_OBJECT_UNLOCK (pad);
/* we emit signals on the pad arg, the peer will have a chance to
* emit in the _chain() function */
if (G_UNLIKELY (need_probes)) {
/* don't do probes next time */
do_probes = FALSE;
/* unlock before emitting */
GST_OBJECT_UNLOCK (pad);
/* if the signal handler returned FALSE, it means we should just drop the
* buffer */
if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data)))
goto dropped;
/* if the signal handler returned FALSE, it means we should just drop the
* buffer */
/* FIXME, we need more return values so that we can influence the pad
* block below and let it temporarily unblock for this buffer */
if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data)))
goto dropped;
goto again;
}
/* we released the lock, recheck everything */
goto again;
}
/* when we get here, the item is not dropped by the probe, if we are
* blocking, we now need to wait until unblocked */
if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad)))
break;
/* now we block the streaming thread. It can be unlocked when we
* deactivate the pad (which will also set the FLUSHING flag) or
* when the pad is unblocked. A flushing event will also unblock
* the pad after setting the FLUSHING flag. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"Waiting to be unblocked or set flushing");
GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
GST_PAD_BLOCK_WAIT (pad);
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
} while (TRUE);
if (G_UNLIKELY ((*peer = GST_PAD_PEER (pad)) == NULL))
goto not_linked;
@ -3990,11 +3892,11 @@ again:
return GST_FLOW_OK;
/* ERRORS */
flushed:
flushing:
{
GST_DEBUG_OBJECT (pad, "pad block stopped by flush");
GST_DEBUG_OBJECT (pad, "we are flushing");
GST_OBJECT_UNLOCK (pad);
return ret;
return GST_FLOW_WRONG_STATE;
}
dropped:
{
@ -4015,6 +3917,23 @@ pad_post_push (GstPad * pad)
{
GST_OBJECT_LOCK (pad);
pad->priv->using--;
if (pad->priv->using == 0) {
/* pad is not active anymore, check if we need to trigger the block */
if (GST_PAD_IS_BLOCKED (pad)) {
GstPadBlockCallback callback;
gpointer user_data;
callback = pad->block_callback;
user_data = pad->block_data;
GST_PAD_BLOCK_BROADCAST (pad);
GST_OBJECT_UNLOCK (pad);
if (callback)
callback (pad, TRUE, user_data);
return;
}
}
GST_OBJECT_UNLOCK (pad);
}
@ -4289,8 +4208,10 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
GST_OBJECT_LOCK (pad);
#if 0
while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
handle_pad_block (pad);
#endif
if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
goto not_connected;
@ -4385,8 +4306,8 @@ gboolean
gst_pad_push_event (GstPad * pad, GstEvent * event)
{
GstPad *peerpad;
gboolean result, need_probes, did_probes = FALSE, did_event_actions = FALSE;
gint64 offset;
gboolean result, need_probes, do_probes = TRUE, do_event_actions = TRUE;
gboolean stored = FALSE;
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
g_return_val_if_fail (event != NULL, FALSE);
@ -4397,6 +4318,9 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
again:
GST_OBJECT_LOCK (pad);
peerpad = GST_PAD_PEER (pad);
need_probes = do_probes && (GST_PAD_DO_EVENT_SIGNALS (pad) > 0);
/* Two checks to be made:
* . (un)set the FLUSHING flag for flushing events,
* . handle pad blocking */
@ -4416,93 +4340,119 @@ again:
break;
case GST_EVENT_FLUSH_STOP:
GST_PAD_UNSET_FLUSHING (pad);
if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
GST_LOG_OBJECT (pad, "Pad is blocked, not forwarding flush-stop");
goto flushed;
}
break;
default:
break;
}
{
/* stop for flushing pads */
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto flushed;
/* store the event on the pad, but only on srcpads */
if (GST_PAD_IS_SRC (pad) && GST_EVENT_IS_STICKY (event)) {
if (GST_PAD_IS_FLUSHING (pad)) {
goto flushing;
} else {
guint idx;
/* store the event on the pad, but only on srcpads */
if (GST_PAD_IS_SRC (pad) && GST_EVENT_IS_STICKY (event)) {
guint idx;
idx = GST_EVENT_STICKY_IDX (event);
GST_LOG_OBJECT (pad, "storing sticky event %s at index %u",
GST_EVENT_TYPE_NAME (event), idx);
idx = GST_EVENT_STICKY_IDX (event);
GST_LOG_OBJECT (pad, "storing sticky event %s at index %u",
GST_EVENT_TYPE_NAME (event), idx);
/* srcpad sticky events always become active immediately */
gst_event_replace (&pad->priv->events[idx].event, event);
}
}
/* srcpad sticky events always become active immediately */
gst_event_replace (&pad->priv->events[idx].event, event);
/* drop all events when blocking. Sticky events will stay on the pad and will
* be activated on the peer when unblocking. */
if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
GST_LOG_OBJECT (pad, "Pad is blocked, not forwarding event");
goto flushed;
}
offset = pad->offset;
need_probes = !did_probes && (GST_PAD_DO_EVENT_SIGNALS (pad) > 0);
peerpad = GST_PAD_PEER (pad);
/* backwards compatibility mode for caps */
if (!did_event_actions) {
did_event_actions = TRUE;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
{
GstCaps *caps;
GST_OBJECT_UNLOCK (pad);
gst_event_parse_caps (event, &caps);
/* FIXME, this is awkward because we don't check flushing here which means
* that we can call the setcaps functions on flushing pads, this is not
* quite what we want, otoh, this code should just go away and elements
* that set caps on their srcpad should just setup stuff themselves. */
gst_pad_call_setcaps (pad, caps);
/* recheck everything, we released the lock */
goto again;
stored = TRUE;
}
case GST_EVENT_SEGMENT:
/* check if we need to adjust the segment */
if (offset != 0 && (need_probes || peerpad != NULL)) {
GstSegment segment;
/* copy segment values */
gst_event_copy_segment (event, &segment);
gst_event_unref (event);
/* backwards compatibility mode for caps */
if (do_event_actions) {
do_event_actions = FALSE;
/* adjust and make a new event with the offset applied */
segment.base += offset;
event = gst_event_new_segment (&segment);
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
{
GstCaps *caps;
GST_OBJECT_UNLOCK (pad);
gst_event_parse_caps (event, &caps);
/* FIXME, this code should just go away and elements
* that set caps on their srcpad should just setup stuff themselves. */
gst_pad_call_setcaps (pad, caps);
/* recheck everything, we released the lock */
goto again;
}
case GST_EVENT_SEGMENT:
{
gint64 offset;
offset = pad->offset;
/* check if we need to adjust the segment */
if (offset != 0 && (need_probes || peerpad != NULL)) {
GstSegment segment;
/* copy segment values */
gst_event_copy_segment (event, &segment);
gst_event_unref (event);
/* adjust and make a new event with the offset applied */
segment.base += offset;
event = gst_event_new_segment (&segment);
}
break;
}
case GST_EVENT_RECONFIGURE:
if (GST_PAD_IS_SINK (pad))
GST_OBJECT_FLAG_SET (pad, GST_PAD_NEED_RECONFIGURE);
break;
default:
break;
}
break;
case GST_EVENT_RECONFIGURE:
if (GST_PAD_IS_SINK (pad))
GST_OBJECT_FLAG_SET (pad, GST_PAD_NEED_RECONFIGURE);
break;
default:
break;
}
break;
}
}
/* send probes after modifying the events above */
if (G_UNLIKELY (need_probes)) {
did_probes = TRUE;
GST_OBJECT_UNLOCK (pad);
do {
/* we emit signals on the pad arg, the peer will have a chance to
* emit in the _chain() function */
if (G_UNLIKELY (need_probes)) {
/* don't do probes next time */
do_probes = FALSE;
/* unlock before emitting */
GST_OBJECT_UNLOCK (pad);
if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (event)))
goto dropping;
/* if the signal handler returned FALSE, it means we should just drop the
* buffer */
/* FIXME, we need more return values so that we can influence the pad
* block below and let it temporarily unblock for this buffer */
if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (event)))
goto dropped;
/* retry, we released the lock */
goto again;
}
/* we released the lock, recheck everything */
goto again;
}
/* when we get here, the item is not dropped by the probe, if we are
* blocking, we now need to wait until unblocked */
if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad)))
break;
/* now we block the streaming thread. It can be unlocked when we
* deactivate the pad (which will also set the FLUSHING flag) or
* when the pad is unblocked. A flushing event will also unblock
* the pad after setting the FLUSHING flag. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"Waiting to be unblocked or set flushing");
GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
GST_PAD_BLOCK_WAIT (pad);
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
if (GST_PAD_IS_FLUSHING (pad))
goto flushed;
} while (TRUE);
/* now check the peer pad */
if (peerpad == NULL)
@ -4523,40 +4473,31 @@ again:
gst_object_unref (peerpad);
GST_OBJECT_LOCK (pad);
pad->priv->using--;
GST_OBJECT_UNLOCK (pad);
pad_post_push (pad);
return result;
return result | stored;
/* ERROR handling */
flushed:
{
GST_DEBUG_OBJECT (pad,
"Not forwarding event since we're flushing and blocking");
gst_event_unref (event);
GST_OBJECT_UNLOCK (pad);
return TRUE;
gst_event_unref (event);
return stored;
}
dropping:
dropped:
{
GST_DEBUG_OBJECT (pad, "Dropping event after FALSE probe return");
gst_event_unref (event);
return FALSE;
return stored;
}
not_linked:
{
GST_DEBUG_OBJECT (pad, "Dropping event because pad is not linked");
GST_OBJECT_UNLOCK (pad);
gst_event_unref (event);
return FALSE;
}
flushing:
{
GST_DEBUG_OBJECT (pad, "Dropping event because pad is flushing");
GST_OBJECT_UNLOCK (pad);
gst_event_unref (event);
return FALSE;
return stored;
}
}

View file

@ -425,7 +425,7 @@ GST_START_TEST (test_eos)
fail_if (eret == TRUE);
}
/* send segment, this should fail */
/* send segment, this should work */
{
GstEvent *event;
GstSegment segment;
@ -439,7 +439,7 @@ GST_START_TEST (test_eos)
event = gst_event_new_segment (&segment);
eret = gst_pad_send_event (sinkpad, event);
fail_if (eret == TRUE);
fail_if (eret == FALSE);
}
/* send buffer that should fail after EOS */

View file

@ -762,12 +762,14 @@ GST_END_TEST;
static GMutex *blocked_lock;
static GCond *blocked_cond;
gboolean blocked_triggered;
static void
pad_blocked_cb (GstPad * pad, gboolean blocked, gpointer user_data)
{
g_mutex_lock (blocked_lock);
GST_DEBUG ("srcpad blocked: %d, sending signal", blocked);
blocked_triggered = TRUE;
g_cond_signal (blocked_cond);
g_mutex_unlock (blocked_lock);
}
@ -781,6 +783,7 @@ GST_START_TEST (test_add_live2)
blocked_lock = g_mutex_new ();
blocked_cond = g_cond_new ();
blocked_triggered = FALSE;
pipeline = gst_pipeline_new ("pipeline");
src = gst_element_factory_make ("fakesrc", "src");
@ -794,7 +797,6 @@ GST_START_TEST (test_add_live2)
ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
fail_unless (ret == GST_STATE_CHANGE_ASYNC, "no ASYNC state return");
g_mutex_lock (blocked_lock);
GST_DEBUG ("blocking srcpad");
/* block source pad */
@ -813,7 +815,9 @@ GST_START_TEST (test_add_live2)
gst_bin_add (GST_BIN (pipeline), src);
/* wait for pad blocked, this means the source is now PLAYING. */
g_cond_wait (blocked_cond, blocked_lock);
g_mutex_lock (blocked_lock);
while (!blocked_triggered)
g_cond_wait (blocked_cond, blocked_lock);
g_mutex_unlock (blocked_lock);
GST_DEBUG ("linking pads");

View file

@ -476,6 +476,7 @@ typedef struct
{
GMutex *mutex;
GCond *cond;
gboolean triggered;
} BlockData;
static void
@ -485,6 +486,7 @@ block_callback (GstPad * pad, gboolean blocked, gpointer user_data)
g_mutex_lock (block_data->mutex);
GST_DEBUG ("blocked\n");
block_data->triggered = TRUE;
g_cond_signal (block_data->cond);
g_mutex_unlock (block_data->mutex);
}
@ -512,12 +514,14 @@ GST_START_TEST (test_ghost_pads_block)
block_data.mutex = g_mutex_new ();
block_data.cond = g_cond_new ();
block_data.triggered = FALSE;
g_mutex_lock (block_data.mutex);
gst_pad_set_blocked (srcghost, TRUE, block_callback, &block_data, NULL);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
/* and wait now */
g_cond_wait (block_data.cond, block_data.mutex);
g_mutex_lock (block_data.mutex);
while (!block_data.triggered)
g_cond_wait (block_data.cond, block_data.mutex);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
g_mutex_unlock (block_data.mutex);
@ -553,12 +557,14 @@ GST_START_TEST (test_ghost_pads_probes)
block_data.mutex = g_mutex_new ();
block_data.cond = g_cond_new ();
block_data.triggered = FALSE;
g_mutex_lock (block_data.mutex);
gst_pad_set_blocked (srcghost, TRUE, block_callback, &block_data, NULL);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
/* and wait now */
g_cond_wait (block_data.cond, block_data.mutex);
g_mutex_lock (block_data.mutex);
while (!block_data.triggered)
g_cond_wait (block_data.cond, block_data.mutex);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
g_mutex_unlock (block_data.mutex);
@ -701,6 +707,8 @@ GST_START_TEST (test_ghost_pads_forward_setcaps)
fail_unless (gst_pad_link (ghost, sink) == GST_PAD_LINK_OK);
caps1 = gst_caps_from_string ("meh");
gst_pad_set_active (src, TRUE);
gst_pad_set_active (ghost, TRUE);
fail_unless (gst_pad_set_caps (src, caps1));
caps2 = gst_pad_get_current_caps (ghost);
fail_unless (gst_caps_is_equal (caps1, caps2));

View file

@ -91,7 +91,7 @@ GST_START_TEST (test_link_unlink_threaded)
gst_pad_set_caps (src, caps);
gst_pad_set_active (sink, TRUE);
gst_pad_set_caps (sink, caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
MAIN_START_THREADS (5, thread_link_unlink, NULL);
for (i = 0; i < 1000; ++i) {
@ -101,10 +101,10 @@ GST_START_TEST (test_link_unlink_threaded)
}
MAIN_STOP_THREADS ();
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
gst_caps_unref (caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_object_unref (src);
gst_object_unref (sink);
}
@ -127,18 +127,27 @@ GST_START_TEST (test_refcount)
/* one for me */
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
fail_unless (gst_pad_set_caps (src, caps) == TRUE);
/* can't set caps on flushing sinkpad */
/* can't set caps on flushing pads */
fail_if (gst_pad_set_caps (src, caps) == TRUE);
fail_if (gst_pad_set_caps (sink, caps) == TRUE);
/* one for me and one for each set_caps */
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
/* one for me and one for the pending caps on the sinkpad */
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
gst_pad_set_active (sink, TRUE);
fail_unless (gst_pad_set_caps (sink, caps) == TRUE);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
/* can't link with incompatible caps */
plr = gst_pad_link (src, sink);
fail_if (GST_PAD_LINK_SUCCESSFUL (plr));
gst_pad_set_active (src, TRUE);
fail_unless (gst_pad_set_caps (src, caps) == TRUE);
plr = gst_pad_link (src, sink);
fail_unless (GST_PAD_LINK_SUCCESSFUL (plr));
/* src caps added to pending caps on sink */
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
@ -176,12 +185,16 @@ GST_START_TEST (test_get_allowed_caps)
caps = gst_caps_from_string ("foo/bar");
sink = gst_pad_new ("sink", GST_PAD_SINK);
fail_unless (gst_pad_set_caps (src, caps) == TRUE);
fail_if (gst_pad_set_caps (src, caps) == TRUE);
fail_if (gst_pad_set_caps (sink, caps) == TRUE);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
gst_pad_set_active (sink, TRUE);
fail_unless (gst_pad_set_caps (sink, caps) == TRUE);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
gst_pad_set_active (src, TRUE);
fail_unless (gst_pad_set_caps (src, caps) == TRUE);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
plr = gst_pad_link (src, sink);
@ -189,11 +202,9 @@ GST_START_TEST (test_get_allowed_caps)
gotcaps = gst_pad_get_allowed_caps (src);
fail_if (gotcaps == NULL);
#if 0
/* FIXME, does not work, caps events are different so the sinkpad loses caps
* when linking */
fail_unless (gst_caps_is_equal (gotcaps, caps));
#endif
ASSERT_CAPS_REFCOUNT (gotcaps, "gotcaps", 1);
gst_caps_unref (gotcaps);
@ -274,7 +285,16 @@ GST_START_TEST (test_push_unlinked)
caps = gst_caps_from_string ("foo/bar");
gst_pad_set_caps (src, caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
/* pushing on an inactive pad will drop the buffer */
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_WRONG_STATE);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_set_active (src, TRUE);
/* pushing on an unlinked pad will drop the buffer */
buffer = gst_buffer_new ();
@ -307,7 +327,7 @@ GST_START_TEST (test_push_unlinked)
/* cleanup */
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
ASSERT_OBJECT_REFCOUNT (src, "src", 1);
gst_object_unref (src);
@ -334,12 +354,31 @@ GST_START_TEST (test_push_linked)
src = gst_pad_new ("src", GST_PAD_SRC);
fail_if (src == NULL);
buffer = gst_buffer_new ();
/* new pad should be flushing */
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_WRONG_STATE);
gst_buffer_ref (buffer);
fail_unless (gst_pad_chain (sink, buffer) == GST_FLOW_WRONG_STATE);
caps = gst_caps_from_string ("foo/bar");
/* one for me */
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_pad_set_caps (src, caps);
gst_pad_set_caps (sink, caps);
/* one for me and one for each set_caps */
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
plr = gst_pad_link (src, sink);
fail_if (GST_PAD_LINK_SUCCESSFUL (plr));
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
/* activate pads */
gst_pad_set_active (src, TRUE);
gst_pad_set_active (sink, TRUE);
gst_pad_set_caps (src, caps);
gst_pad_set_caps (sink, caps);
/* one for me and one for each set_caps */
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
@ -348,19 +387,6 @@ GST_START_TEST (test_push_linked)
fail_unless (GST_PAD_LINK_SUCCESSFUL (plr));
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
buffer = gst_buffer_new ();
#if 0
/* FIXME, new pad should be flushing */
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_WRONG_STATE);
gst_buffer_ref (buffer);
fail_unless (gst_pad_chain (sink, buffer) == GST_FLOW_WRONG_STATE);
#endif
/* activate pads */
gst_pad_set_active (src, TRUE);
gst_pad_set_active (sink, TRUE);
/* test */
/* pushing on a linked pad will drop the ref to the buffer */
gst_buffer_ref (buffer);
@ -453,7 +479,6 @@ GST_START_TEST (test_push_buffer_list_compat)
GstCaps *caps;
GstBufferList *list;
GstBuffer *buffer;
guint len;
/* setup */
sink = gst_pad_new ("sink", GST_PAD_SINK);
@ -466,6 +491,7 @@ GST_START_TEST (test_push_buffer_list_compat)
caps = gst_caps_from_string ("foo/bar");
gst_pad_set_active (src, TRUE);
gst_pad_set_caps (src, caps);
gst_pad_set_active (sink, TRUE);
gst_pad_set_caps (sink, caps);
@ -481,8 +507,6 @@ GST_START_TEST (test_push_buffer_list_compat)
/* test */
/* adding to a buffer list will drop the ref to the buffer */
len = gst_buffer_list_len (list);
gst_buffer_list_add (list, buffer_from_string ("ListGroup"));
gst_buffer_list_add (list, buffer_from_string ("AnotherListGroup"));
@ -626,6 +650,7 @@ GST_START_TEST (test_src_unref_unlink)
caps = gst_caps_from_string ("foo/bar");
gst_pad_set_active (src, TRUE);
gst_pad_set_caps (src, caps);
gst_pad_set_active (sink, TRUE);
gst_pad_set_caps (sink, caps);
@ -661,6 +686,7 @@ GST_START_TEST (test_sink_unref_unlink)
caps = gst_caps_from_string ("foo/bar");
gst_pad_set_active (src, TRUE);
gst_pad_set_caps (src, caps);
gst_pad_set_active (sink, TRUE);
gst_pad_set_caps (sink, caps);
@ -720,8 +746,8 @@ GST_START_TEST (test_block_async)
gst_pad_set_active (pad, TRUE);
gst_pad_set_blocked (pad, TRUE, block_async_cb, &data, NULL);
fail_unless (data[0] == FALSE);
fail_unless (data[1] == FALSE);
fail_unless (data[0] == TRUE);
fail_unless (data[1] == TRUE);
gst_pad_push (pad, gst_buffer_new ());
gst_object_unref (pad);
@ -816,18 +842,19 @@ GST_START_TEST (test_block_async_full_destroy)
gst_pad_set_blocked (pad, TRUE, block_async_full_cb,
&state, block_async_full_destroy);
fail_unless (state == 0);
fail_unless (state == 2);
gst_pad_push (pad, gst_buffer_new ());
/* block_async_full_cb sets state to 1 and then flushes to unblock temporarily
*/
fail_unless (state == 1);
fail_unless (state == 2);
gst_pad_push_event (pad, gst_event_new_flush_stop ());
/* pad was already blocked so nothing happens */
state = 0;
gst_pad_set_blocked (pad, TRUE, block_async_full_cb,
&state, block_async_full_destroy);
fail_unless (state == 1);
fail_unless (state == 2);
/* unblock with the same data, callback is called */
gst_pad_set_blocked (pad, FALSE, block_async_full_cb,
@ -842,10 +869,12 @@ GST_START_TEST (test_block_async_full_destroy)
/* now change user_data (to NULL in this case) so destroy_notify should be
* called */
#if 0
state = 1;
gst_pad_set_blocked (pad, FALSE, block_async_full_cb,
NULL, block_async_full_destroy);
fail_unless (state == 2);
#endif
gst_object_unref (pad);
}
@ -868,7 +897,7 @@ GST_START_TEST (test_block_async_full_destroy_dispose)
gst_pad_push (pad, gst_buffer_new ());
/* block_async_full_cb sets state to 1 and then flushes to unblock temporarily
*/
fail_unless_equals_int (state, 1);
fail_unless_equals_int (state, 2);
gst_pad_push_event (pad, gst_event_new_flush_stop ());
/* gst_pad_dispose calls the destroy_notify function if necessary */
@ -897,12 +926,6 @@ unblock_async_no_flush_cb (GstPad * pad, gboolean blocked, gpointer user_data)
}
static void
unblock_async_not_called (GstPad * pad, gboolean blocked, gpointer user_data)
{
g_warn_if_reached ();
}
static void
block_async_second_no_flush (GstPad * pad, gboolean blocked, gpointer user_data)
{
@ -938,8 +961,7 @@ block_async_first_no_flush (GstPad * pad, gboolean blocked, gpointer user_data)
fail_unless (bool_user_data[1] == FALSE);
fail_unless (bool_user_data[2] == FALSE);
fail_unless (gst_pad_set_blocked (pad, FALSE, unblock_async_not_called,
NULL, NULL));
fail_unless (gst_pad_set_blocked (pad, FALSE, NULL, NULL, NULL));
/* replace block_async_first with block_async_second so next time the pad is
* blocked the latter should be called */