Revert "pad: rework pad blocking, first part"

This reverts commit 415da89f3c.

Conflicts:

	gst/gstpad.c
This commit is contained in:
Wim Taymans 2011-05-30 13:40:04 +02:00
parent 9f2424123f
commit 565f492af2
5 changed files with 335 additions and 310 deletions

View file

@ -121,6 +121,7 @@ static void gst_pad_set_property (GObject * object, guint prop_id,
static void gst_pad_get_property (GObject * object, guint prop_id,
GValue * value, GParamSpec * pspec);
static GstFlowReturn handle_pad_block (GstPad * pad);
static GstCaps *gst_pad_get_caps_unlocked (GstPad * pad, GstCaps * filter);
static void gst_pad_set_pad_template (GstPad * pad, GstPadTemplate * templ);
static gboolean gst_pad_activate_default (GstPad * pad);
@ -1114,67 +1115,56 @@ gst_pad_set_blocked (GstPad * pad, gboolean blocked,
if (G_UNLIKELY (was_blocked == blocked))
goto had_right_state;
if (pad->block_destroy_data && pad->block_data)
pad->block_destroy_data (pad->block_data);
pad->block_callback = NULL;
pad->block_data = NULL;
pad->block_destroy_data = NULL;
if (blocked) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocking pad");
/* set the blocking flag, future push operations will block */
GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED);
if (pad->block_destroy_data && pad->block_data)
pad->block_destroy_data (pad->block_data);
if (pad->priv->using > 0) {
/* the pad is in use, we can't signal the callback yet. Since we set the
* flag above, the last thread to leave the push will do the callback. New
* threads going into the push will block. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is in use");
if (callback == NULL) {
/* backwards compat, if there is no callback, this method should wait
* until the pad is blocked. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block");
GST_PAD_BLOCK_WAIT (pad);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocked");
} else {
/* else save the callback, we will signal from the streaming thread when
* the last thread using the pad is stopped */
pad->block_callback = callback;
pad->block_data = user_data;
pad->block_destroy_data = destroy_data;
}
GST_OBJECT_UNLOCK (pad);
} else {
/* the pad is idle now, we can signal the callback now */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is idle");
GST_OBJECT_UNLOCK (pad);
if (callback) {
callback (pad, TRUE, user_data);
if (destroy_data)
destroy_data (user_data);
}
pad->block_callback = callback;
pad->block_data = user_data;
pad->block_destroy_data = destroy_data;
pad->block_callback_called = FALSE;
if (!callback) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for block");
GST_PAD_BLOCK_WAIT (pad);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "blocked");
}
} else {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocking pad");
if (GST_PAD_IS_SRC (pad)) {
GstPad *peer;
/* a pad block dropped all events, make sure we copy any new events on the
* srcpad to the sinkpad and schedule an update on the sinkpad */
if ((peer = GST_PAD_PEER (pad))) {
GST_OBJECT_LOCK (peer);
prepare_event_update (pad, peer);
GST_OBJECT_UNLOCK (peer);
}
}
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED);
if (pad->block_destroy_data && pad->block_data)
pad->block_destroy_data (pad->block_data);
pad->block_callback = callback;
pad->block_data = user_data;
pad->block_destroy_data = destroy_data;
pad->block_callback_called = FALSE;
GST_PAD_BLOCK_BROADCAST (pad);
GST_OBJECT_UNLOCK (pad);
if (callback) {
callback (pad, FALSE, user_data);
if (destroy_data)
destroy_data (user_data);
if (!callback) {
/* no callback, wait for the unblock to happen */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "waiting for unblock");
GST_PAD_BLOCK_WAIT (pad);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "unblocked");
}
}
GST_OBJECT_UNLOCK (pad);
return TRUE;
@ -1184,9 +1174,6 @@ had_right_state:
"pad was in right state (%d)", was_blocked);
GST_OBJECT_UNLOCK (pad);
if (destroy_data)
destroy_data (user_data);
return FALSE;
}
}
@ -2707,7 +2694,7 @@ gst_pad_set_caps (GstPad * pad, GstCaps * caps)
event = gst_event_new_caps (caps);
if (GST_PAD_IS_SRC (pad))
res = gst_pad_push_event (pad, event);
gst_pad_push_event (pad, event);
else
res = gst_pad_send_event (pad, event);
@ -3449,6 +3436,136 @@ gst_pad_query_default (GstPad * pad, GstQuery * query)
}
}
/*
* should be called with pad OBJECT_LOCK and STREAM_LOCK held.
* GST_PAD_IS_BLOCKED (pad) == TRUE when this function is
* called.
*
* This function performs the pad blocking when an event, buffer push
* is performed on a _SRC_ pad. It blocks the streaming thread after
* informing the pad has been blocked.
*
* An application can with this method wait and block any streaming
* thread and perform operations such as seeking or linking.
*
* Two methods are available for notifying the application of the
* block:
* - the callback method, which happens in the STREAMING thread with
* the STREAM_LOCK held. With this method, the most useful way of
* dealing with the callback is to post a message to the main thread
* where the pad block can then be handled outside of the streaming
* thread. With the last method one can perform all operations such
* as doing a state change, linking, unblocking, seeking etc on the
* pad.
* - the GCond signal method, which makes any thread unblock when
* the pad block happens.
*
* During the actual blocking state, the GST_PAD_BLOCKING flag is set.
* The GST_PAD_BLOCKING flag is unset when the pad was unblocked.
*
* MT safe.
*/
static GstFlowReturn
handle_pad_block (GstPad * pad)
{
GstPadBlockCallback callback;
gpointer user_data;
GstFlowReturn ret = GST_FLOW_OK;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "signal block taken");
/* flushing, don't bother trying to block and return WRONG_STATE
* right away */
if (GST_PAD_IS_FLUSHING (pad))
goto flushingnonref;
/* we grab an extra ref for the callbacks, this is probably not
* needed (callback code does not have a ref and cannot unref). I
* think this was done to make it possible to unref the element in
* the callback, which is in the end totally impossible as it
* requires grabbing the STREAM_LOCK and OBJECT_LOCK which are
* all taken when calling this function. */
gst_object_ref (pad);
while (GST_PAD_IS_BLOCKED (pad)) {
do {
/* we either have a callback installed to notify the block or
* some other thread is doing a GCond wait. */
callback = pad->block_callback;
pad->block_callback_called = TRUE;
if (callback) {
/* there is a callback installed, call it. We release the
* lock so that the callback can do something usefull with the
* pad */
user_data = pad->block_data;
GST_OBJECT_UNLOCK (pad);
callback (pad, TRUE, user_data);
GST_OBJECT_LOCK (pad);
/* we released the lock, recheck flushing */
if (GST_PAD_IS_FLUSHING (pad))
goto flushing;
} else {
/* no callback, signal the thread that is doing a GCond wait
* if any. */
GST_PAD_BLOCK_BROADCAST (pad);
}
} while (pad->block_callback_called == FALSE && GST_PAD_IS_BLOCKED (pad));
/* OBJECT_LOCK could have been released when we did the callback, which
* then could have made the pad unblock so we need to check the blocking
* condition again. */
if (!GST_PAD_IS_BLOCKED (pad))
break;
/* now we block the streaming thread. It can be unlocked when we
* deactivate the pad (which will also set the FLUSHING flag) or
* when the pad is unblocked. A flushing event will also unblock
* the pad after setting the FLUSHING flag. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"Waiting to be unblocked or set flushing");
GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
GST_PAD_BLOCK_WAIT (pad);
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
/* see if we got unblocked by a flush or not */
if (GST_PAD_IS_FLUSHING (pad))
goto flushing;
}
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "got unblocked");
/* when we get here, the pad is unblocked again and we perform
* the needed unblock code. */
callback = pad->block_callback;
if (callback) {
/* we need to call the callback */
user_data = pad->block_data;
GST_OBJECT_UNLOCK (pad);
callback (pad, FALSE, user_data);
GST_OBJECT_LOCK (pad);
} else {
/* we need to signal the thread waiting on the GCond */
GST_PAD_BLOCK_BROADCAST (pad);
}
gst_object_unref (pad);
return ret;
flushingnonref:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad was flushing");
return GST_FLOW_WRONG_STATE;
}
flushing:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad became flushing");
gst_object_unref (pad);
return GST_FLOW_WRONG_STATE;
}
}
/* pad offsets */
/**
@ -3567,8 +3684,6 @@ gst_pad_emit_have_data_signal (GstPad * pad, GstMiniObject * obj)
else
detail = 0;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "Emiting have-data signal");
/* actually emit */
g_signal_emitv (args, gst_pad_signals[PAD_HAVE_DATA], detail, &ret);
res = g_value_get_boolean (&ret);
@ -3836,50 +3951,33 @@ gst_pad_chain_list (GstPad * pad, GstBufferList * list)
static GstFlowReturn
pad_pre_push (GstPad * pad, GstPad ** peer, gpointer data)
{
gboolean need_probes, do_probes = TRUE;
GstFlowReturn ret;
gboolean need_probes, did_probes = FALSE;
again:
GST_OBJECT_LOCK (pad);
do {
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto flushing;
/* FIXME: this check can go away; pad_set_blocked could be implemented with
* probes completely or probes with an extended pad block. */
while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
if ((ret = handle_pad_block (pad)) != GST_FLOW_OK)
goto flushed;
need_probes = do_probes && (GST_PAD_DO_BUFFER_SIGNALS (pad) > 0);
need_probes = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0;
/* we emit signals on the pad arg, the peer will have a chance to
* emit in the _chain() function */
if (G_UNLIKELY (need_probes)) {
/* don't do probes next time */
do_probes = FALSE;
/* unlock before emitting */
GST_OBJECT_UNLOCK (pad);
/* we emit signals on the pad arg, the peer will have a chance to
* emit in the _chain() function */
if (G_UNLIKELY (need_probes && !did_probes)) {
did_probes = TRUE;
/* unlock before emitting */
GST_OBJECT_UNLOCK (pad);
/* if the signal handler returned FALSE, it means we should just drop the
* buffer */
/* FIXME, we need more return values so that we can influence the pad
* block below and let it temporarily unblock for this buffer */
if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data)))
goto dropped;
/* if the signal handler returned FALSE, it means we should just drop the
* buffer */
if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (data)))
goto dropped;
/* we released the lock, recheck everything */
goto again;
}
/* when we get here, the item is not dropped by the probe, if we are
* blocking, we now need to wait until unblocked */
if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad)))
break;
/* now we block the streaming thread. It can be unlocked when we
* deactivate the pad (which will also set the FLUSHING flag) or
* when the pad is unblocked. A flushing event will also unblock
* the pad after setting the FLUSHING flag. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"Waiting to be unblocked or set flushing");
GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
GST_PAD_BLOCK_WAIT (pad);
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
} while (TRUE);
goto again;
}
if (G_UNLIKELY ((*peer = GST_PAD_PEER (pad)) == NULL))
goto not_linked;
@ -3892,11 +3990,11 @@ again:
return GST_FLOW_OK;
/* ERRORS */
flushing:
flushed:
{
GST_DEBUG_OBJECT (pad, "we are flushing");
GST_DEBUG_OBJECT (pad, "pad block stopped by flush");
GST_OBJECT_UNLOCK (pad);
return GST_FLOW_WRONG_STATE;
return ret;
}
dropped:
{
@ -3917,23 +4015,6 @@ pad_post_push (GstPad * pad)
{
GST_OBJECT_LOCK (pad);
pad->priv->using--;
if (pad->priv->using == 0) {
/* pad is not active anymore, check if we need to trigger the block */
if (GST_PAD_IS_BLOCKED (pad)) {
GstPadBlockCallback callback;
gpointer user_data;
callback = pad->block_callback;
user_data = pad->block_data;
GST_PAD_BLOCK_BROADCAST (pad);
GST_OBJECT_UNLOCK (pad);
if (callback)
callback (pad, TRUE, user_data);
return;
}
}
GST_OBJECT_UNLOCK (pad);
}
@ -4208,10 +4289,8 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
GST_OBJECT_LOCK (pad);
#if 0
while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
handle_pad_block (pad);
#endif
if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
goto not_connected;
@ -4306,8 +4385,8 @@ gboolean
gst_pad_push_event (GstPad * pad, GstEvent * event)
{
GstPad *peerpad;
gboolean result, need_probes, do_probes = TRUE, do_event_actions = TRUE;
gboolean stored = FALSE;
gboolean result, need_probes, did_probes = FALSE, did_event_actions = FALSE;
gint64 offset;
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
g_return_val_if_fail (event != NULL, FALSE);
@ -4318,9 +4397,6 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
again:
GST_OBJECT_LOCK (pad);
peerpad = GST_PAD_PEER (pad);
need_probes = do_probes && (GST_PAD_DO_EVENT_SIGNALS (pad) > 0);
/* Two checks to be made:
* . (un)set the FLUSHING flag for flushing events,
* . handle pad blocking */
@ -4340,120 +4416,93 @@ again:
break;
case GST_EVENT_FLUSH_STOP:
GST_PAD_UNSET_FLUSHING (pad);
if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
GST_LOG_OBJECT (pad, "Pad is blocked, not forwarding flush-stop");
goto flushed;
}
break;
default:
{
/* stop for flushing pads */
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto flushed;
/* store the event on the pad, but only on srcpads */
if (GST_PAD_IS_SRC (pad) && GST_EVENT_IS_STICKY (event)) {
guint idx;
idx = GST_EVENT_STICKY_IDX (event);
GST_LOG_OBJECT (pad, "storing sticky event %s at index %u",
GST_EVENT_TYPE_NAME (event), idx);
/* srcpad sticky events always become active immediately */
gst_event_replace (&pad->priv->events[idx].event, event);
stored = TRUE;
}
/* backwards compatibility mode for caps */
if (do_event_actions) {
do_event_actions = FALSE;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
{
GstCaps *caps;
GST_OBJECT_UNLOCK (pad);
gst_event_parse_caps (event, &caps);
/* FIXME, this code should just go away and elements
* that set caps on their srcpad should just setup stuff themselves. */
gst_pad_call_setcaps (pad, caps);
/* recheck everything, we released the lock */
goto again;
}
case GST_EVENT_SEGMENT:
{
gint64 offset;
offset = pad->offset;
/* check if we need to adjust the segment */
if (offset != 0 && (need_probes || peerpad != NULL)) {
GstSegment segment;
/* copy segment values */
gst_event_copy_segment (event, &segment);
gst_event_unref (event);
/* adjust and make a new event with the offset applied */
segment.base += offset;
event = gst_event_new_segment (&segment);
}
break;
}
case GST_EVENT_RECONFIGURE:
if (GST_PAD_IS_SINK (pad))
GST_OBJECT_FLAG_SET (pad, GST_PAD_NEED_RECONFIGURE);
break;
default:
break;
}
}
break;
}
/* store the event on the pad, but only on srcpads */
if (GST_PAD_IS_SRC (pad) && GST_EVENT_IS_STICKY (event)) {
if (GST_PAD_IS_FLUSHING (pad)) {
goto flushing;
} else {
guint idx;
idx = GST_EVENT_STICKY_IDX (event);
GST_LOG_OBJECT (pad, "storing sticky event %s at index %u",
GST_EVENT_TYPE_NAME (event), idx);
/* srcpad sticky events always become active immediately */
gst_event_replace (&pad->priv->events[idx].event, event);
}
}
do {
/* we emit signals on the pad arg, the peer will have a chance to
* emit in the _chain() function */
if (G_UNLIKELY (need_probes)) {
/* don't do probes next time */
do_probes = FALSE;
/* unlock before emitting */
GST_OBJECT_UNLOCK (pad);
/* drop all events when blocking. Sticky events will stay on the pad and will
* be activated on the peer when unblocking. */
if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
GST_LOG_OBJECT (pad, "Pad is blocked, not forwarding event");
goto flushed;
}
/* if the signal handler returned FALSE, it means we should just drop the
* buffer */
/* FIXME, we need more return values so that we can influence the pad
* block below and let it temporarily unblock for this buffer */
if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (event)))
goto dropped;
offset = pad->offset;
need_probes = !did_probes && (GST_PAD_DO_EVENT_SIGNALS (pad) > 0);
peerpad = GST_PAD_PEER (pad);
/* we released the lock, recheck everything */
goto again;
/* backwards compatibility mode for caps */
if (!did_event_actions) {
did_event_actions = TRUE;
switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_CAPS:
{
GstCaps *caps;
GST_OBJECT_UNLOCK (pad);
gst_event_parse_caps (event, &caps);
/* FIXME, this is awkward because we don't check flushing here which means
* that we can call the setcaps functions on flushing pads, this is not
* quite what we want, otoh, this code should just go away and elements
* that set caps on their srcpad should just setup stuff themselves. */
gst_pad_call_setcaps (pad, caps);
/* recheck everything, we released the lock */
goto again;
}
case GST_EVENT_SEGMENT:
/* check if we need to adjust the segment */
if (offset != 0 && (need_probes || peerpad != NULL)) {
GstSegment segment;
/* copy segment values */
gst_event_copy_segment (event, &segment);
gst_event_unref (event);
/* adjust and make a new event with the offset applied */
segment.base += offset;
event = gst_event_new_segment (&segment);
}
break;
case GST_EVENT_RECONFIGURE:
if (GST_PAD_IS_SINK (pad))
GST_OBJECT_FLAG_SET (pad, GST_PAD_NEED_RECONFIGURE);
break;
default:
break;
}
}
/* when we get here, the item is not dropped by the probe, if we are
* blocking, we now need to wait until unblocked */
if (G_LIKELY (!GST_PAD_IS_BLOCKED (pad)))
break;
/* send probes after modifying the events above */
if (G_UNLIKELY (need_probes)) {
did_probes = TRUE;
GST_OBJECT_UNLOCK (pad);
/* now we block the streaming thread. It can be unlocked when we
* deactivate the pad (which will also set the FLUSHING flag) or
* when the pad is unblocked. A flushing event will also unblock
* the pad after setting the FLUSHING flag. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"Waiting to be unblocked or set flushing");
GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
GST_PAD_BLOCK_WAIT (pad);
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT_CAST (event)))
goto dropping;
if (GST_PAD_IS_FLUSHING (pad))
goto flushed;
} while (TRUE);
/* retry, we released the lock */
goto again;
}
/* now check the peer pad */
if (peerpad == NULL)
@ -4474,9 +4523,11 @@ again:
gst_object_unref (peerpad);
pad_post_push (pad);
GST_OBJECT_LOCK (pad);
pad->priv->using--;
GST_OBJECT_UNLOCK (pad);
return result | stored;
return result;
/* ERROR handling */
flushed:
@ -4484,20 +4535,28 @@ flushed:
GST_DEBUG_OBJECT (pad, "We're flushing");
GST_OBJECT_UNLOCK (pad);
gst_event_unref (event);
return stored;
GST_OBJECT_UNLOCK (pad);
return TRUE;
}
dropped:
dropping:
{
GST_DEBUG_OBJECT (pad, "Dropping event after FALSE probe return");
gst_event_unref (event);
return stored;
return FALSE;
}
not_linked:
{
GST_DEBUG_OBJECT (pad, "Dropping event because pad is not linked");
GST_OBJECT_UNLOCK (pad);
gst_event_unref (event);
return stored;
return FALSE;
}
flushing:
{
GST_DEBUG_OBJECT (pad, "Dropping event because pad is flushing");
GST_OBJECT_UNLOCK (pad);
gst_event_unref (event);
return FALSE;
}
}

View file

@ -425,7 +425,7 @@ GST_START_TEST (test_eos)
fail_if (eret == TRUE);
}
/* send segment, this should work */
/* send segment, this should fail */
{
GstEvent *event;
GstSegment segment;
@ -439,7 +439,7 @@ GST_START_TEST (test_eos)
event = gst_event_new_segment (&segment);
eret = gst_pad_send_event (sinkpad, event);
fail_if (eret == FALSE);
fail_if (eret == TRUE);
}
/* send buffer that should fail after EOS */

View file

@ -762,14 +762,12 @@ GST_END_TEST;
static GMutex *blocked_lock;
static GCond *blocked_cond;
gboolean blocked_triggered;
static void
pad_blocked_cb (GstPad * pad, gboolean blocked, gpointer user_data)
{
g_mutex_lock (blocked_lock);
GST_DEBUG ("srcpad blocked: %d, sending signal", blocked);
blocked_triggered = TRUE;
g_cond_signal (blocked_cond);
g_mutex_unlock (blocked_lock);
}
@ -783,7 +781,6 @@ GST_START_TEST (test_add_live2)
blocked_lock = g_mutex_new ();
blocked_cond = g_cond_new ();
blocked_triggered = FALSE;
pipeline = gst_pipeline_new ("pipeline");
src = gst_element_factory_make ("fakesrc", "src");
@ -797,6 +794,7 @@ GST_START_TEST (test_add_live2)
ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
fail_unless (ret == GST_STATE_CHANGE_ASYNC, "no ASYNC state return");
g_mutex_lock (blocked_lock);
GST_DEBUG ("blocking srcpad");
/* block source pad */
@ -815,9 +813,7 @@ GST_START_TEST (test_add_live2)
gst_bin_add (GST_BIN (pipeline), src);
/* wait for pad blocked, this means the source is now PLAYING. */
g_mutex_lock (blocked_lock);
while (!blocked_triggered)
g_cond_wait (blocked_cond, blocked_lock);
g_cond_wait (blocked_cond, blocked_lock);
g_mutex_unlock (blocked_lock);
GST_DEBUG ("linking pads");

View file

@ -476,7 +476,6 @@ typedef struct
{
GMutex *mutex;
GCond *cond;
gboolean triggered;
} BlockData;
static void
@ -486,7 +485,6 @@ block_callback (GstPad * pad, gboolean blocked, gpointer user_data)
g_mutex_lock (block_data->mutex);
GST_DEBUG ("blocked\n");
block_data->triggered = TRUE;
g_cond_signal (block_data->cond);
g_mutex_unlock (block_data->mutex);
}
@ -514,14 +512,12 @@ GST_START_TEST (test_ghost_pads_block)
block_data.mutex = g_mutex_new ();
block_data.cond = g_cond_new ();
block_data.triggered = FALSE;
g_mutex_lock (block_data.mutex);
gst_pad_set_blocked (srcghost, TRUE, block_callback, &block_data, NULL);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
/* and wait now */
g_mutex_lock (block_data.mutex);
while (!block_data.triggered)
g_cond_wait (block_data.cond, block_data.mutex);
g_cond_wait (block_data.cond, block_data.mutex);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
g_mutex_unlock (block_data.mutex);
@ -557,14 +553,12 @@ GST_START_TEST (test_ghost_pads_probes)
block_data.mutex = g_mutex_new ();
block_data.cond = g_cond_new ();
block_data.triggered = FALSE;
g_mutex_lock (block_data.mutex);
gst_pad_set_blocked (srcghost, TRUE, block_callback, &block_data, NULL);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
/* and wait now */
g_mutex_lock (block_data.mutex);
while (!block_data.triggered)
g_cond_wait (block_data.cond, block_data.mutex);
g_cond_wait (block_data.cond, block_data.mutex);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_NULL);
g_mutex_unlock (block_data.mutex);
@ -707,8 +701,6 @@ GST_START_TEST (test_ghost_pads_forward_setcaps)
fail_unless (gst_pad_link (ghost, sink) == GST_PAD_LINK_OK);
caps1 = gst_caps_from_string ("meh");
gst_pad_set_active (src, TRUE);
gst_pad_set_active (ghost, TRUE);
fail_unless (gst_pad_set_caps (src, caps1));
caps2 = gst_pad_get_current_caps (ghost);
fail_unless (gst_caps_is_equal (caps1, caps2));

View file

@ -91,7 +91,7 @@ GST_START_TEST (test_link_unlink_threaded)
gst_pad_set_caps (src, caps);
gst_pad_set_active (sink, TRUE);
gst_pad_set_caps (sink, caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
MAIN_START_THREADS (5, thread_link_unlink, NULL);
for (i = 0; i < 1000; ++i) {
@ -101,10 +101,10 @@ GST_START_TEST (test_link_unlink_threaded)
}
MAIN_STOP_THREADS ();
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
gst_caps_unref (caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
gst_object_unref (src);
gst_object_unref (sink);
}
@ -127,27 +127,18 @@ GST_START_TEST (test_refcount)
/* one for me */
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
/* can't set caps on flushing pads */
fail_if (gst_pad_set_caps (src, caps) == TRUE);
fail_unless (gst_pad_set_caps (src, caps) == TRUE);
/* can't set caps on flushing sinkpad */
fail_if (gst_pad_set_caps (sink, caps) == TRUE);
/* one for me and one for the pending caps on the sinkpad */
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
/* one for me and one for each set_caps */
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
gst_pad_set_active (sink, TRUE);
fail_unless (gst_pad_set_caps (sink, caps) == TRUE);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
/* can't link with incompatible caps */
plr = gst_pad_link (src, sink);
fail_if (GST_PAD_LINK_SUCCESSFUL (plr));
gst_pad_set_active (src, TRUE);
fail_unless (gst_pad_set_caps (src, caps) == TRUE);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
plr = gst_pad_link (src, sink);
fail_unless (GST_PAD_LINK_SUCCESSFUL (plr));
/* src caps added to pending caps on sink */
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
@ -185,16 +176,12 @@ GST_START_TEST (test_get_allowed_caps)
caps = gst_caps_from_string ("foo/bar");
sink = gst_pad_new ("sink", GST_PAD_SINK);
fail_if (gst_pad_set_caps (src, caps) == TRUE);
fail_unless (gst_pad_set_caps (src, caps) == TRUE);
fail_if (gst_pad_set_caps (sink, caps) == TRUE);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
gst_pad_set_active (sink, TRUE);
fail_unless (gst_pad_set_caps (sink, caps) == TRUE);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
gst_pad_set_active (src, TRUE);
fail_unless (gst_pad_set_caps (src, caps) == TRUE);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
plr = gst_pad_link (src, sink);
@ -202,9 +189,11 @@ GST_START_TEST (test_get_allowed_caps)
gotcaps = gst_pad_get_allowed_caps (src);
fail_if (gotcaps == NULL);
#if 0
/* FIXME, does not work, caps events are different so the sinkpad loses caps
* when linking */
fail_unless (gst_caps_is_equal (gotcaps, caps));
#endif
ASSERT_CAPS_REFCOUNT (gotcaps, "gotcaps", 1);
gst_caps_unref (gotcaps);
@ -285,16 +274,7 @@ GST_START_TEST (test_push_unlinked)
caps = gst_caps_from_string ("foo/bar");
gst_pad_set_caps (src, caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
/* pushing on an inactive pad will drop the buffer */
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_WRONG_STATE);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_set_active (src, TRUE);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
/* pushing on an unlinked pad will drop the buffer */
buffer = gst_buffer_new ();
@ -327,7 +307,7 @@ GST_START_TEST (test_push_unlinked)
/* cleanup */
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
ASSERT_OBJECT_REFCOUNT (src, "src", 1);
gst_object_unref (src);
@ -354,31 +334,12 @@ GST_START_TEST (test_push_linked)
src = gst_pad_new ("src", GST_PAD_SRC);
fail_if (src == NULL);
buffer = gst_buffer_new ();
/* new pad should be flushing */
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_WRONG_STATE);
gst_buffer_ref (buffer);
fail_unless (gst_pad_chain (sink, buffer) == GST_FLOW_WRONG_STATE);
caps = gst_caps_from_string ("foo/bar");
/* one for me */
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_pad_set_caps (src, caps);
gst_pad_set_caps (sink, caps);
/* one for me and one for each set_caps */
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
plr = gst_pad_link (src, sink);
fail_if (GST_PAD_LINK_SUCCESSFUL (plr));
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
/* activate pads */
gst_pad_set_active (src, TRUE);
gst_pad_set_active (sink, TRUE);
gst_pad_set_caps (src, caps);
gst_pad_set_caps (sink, caps);
/* one for me and one for each set_caps */
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
@ -387,6 +348,19 @@ GST_START_TEST (test_push_linked)
fail_unless (GST_PAD_LINK_SUCCESSFUL (plr));
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
buffer = gst_buffer_new ();
#if 0
/* FIXME, new pad should be flushing */
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_WRONG_STATE);
gst_buffer_ref (buffer);
fail_unless (gst_pad_chain (sink, buffer) == GST_FLOW_WRONG_STATE);
#endif
/* activate pads */
gst_pad_set_active (src, TRUE);
gst_pad_set_active (sink, TRUE);
/* test */
/* pushing on a linked pad will drop the ref to the buffer */
gst_buffer_ref (buffer);
@ -479,6 +453,7 @@ GST_START_TEST (test_push_buffer_list_compat)
GstCaps *caps;
GstBufferList *list;
GstBuffer *buffer;
guint len;
/* setup */
sink = gst_pad_new ("sink", GST_PAD_SINK);
@ -491,7 +466,6 @@ GST_START_TEST (test_push_buffer_list_compat)
caps = gst_caps_from_string ("foo/bar");
gst_pad_set_active (src, TRUE);
gst_pad_set_caps (src, caps);
gst_pad_set_active (sink, TRUE);
gst_pad_set_caps (sink, caps);
@ -507,6 +481,8 @@ GST_START_TEST (test_push_buffer_list_compat)
/* test */
/* adding to a buffer list will drop the ref to the buffer */
len = gst_buffer_list_len (list);
gst_buffer_list_add (list, buffer_from_string ("ListGroup"));
gst_buffer_list_add (list, buffer_from_string ("AnotherListGroup"));
@ -650,7 +626,6 @@ GST_START_TEST (test_src_unref_unlink)
caps = gst_caps_from_string ("foo/bar");
gst_pad_set_active (src, TRUE);
gst_pad_set_caps (src, caps);
gst_pad_set_active (sink, TRUE);
gst_pad_set_caps (sink, caps);
@ -686,7 +661,6 @@ GST_START_TEST (test_sink_unref_unlink)
caps = gst_caps_from_string ("foo/bar");
gst_pad_set_active (src, TRUE);
gst_pad_set_caps (src, caps);
gst_pad_set_active (sink, TRUE);
gst_pad_set_caps (sink, caps);
@ -746,8 +720,8 @@ GST_START_TEST (test_block_async)
gst_pad_set_active (pad, TRUE);
gst_pad_set_blocked (pad, TRUE, block_async_cb, &data, NULL);
fail_unless (data[0] == TRUE);
fail_unless (data[1] == TRUE);
fail_unless (data[0] == FALSE);
fail_unless (data[1] == FALSE);
gst_pad_push (pad, gst_buffer_new ());
gst_object_unref (pad);
@ -842,19 +816,18 @@ GST_START_TEST (test_block_async_full_destroy)
gst_pad_set_blocked (pad, TRUE, block_async_full_cb,
&state, block_async_full_destroy);
fail_unless (state == 2);
fail_unless (state == 0);
gst_pad_push (pad, gst_buffer_new ());
/* block_async_full_cb sets state to 1 and then flushes to unblock temporarily
*/
fail_unless (state == 2);
fail_unless (state == 1);
gst_pad_push_event (pad, gst_event_new_flush_stop ());
/* pad was already blocked so nothing happens */
state = 0;
gst_pad_set_blocked (pad, TRUE, block_async_full_cb,
&state, block_async_full_destroy);
fail_unless (state == 2);
fail_unless (state == 1);
/* unblock with the same data, callback is called */
gst_pad_set_blocked (pad, FALSE, block_async_full_cb,
@ -869,12 +842,10 @@ GST_START_TEST (test_block_async_full_destroy)
/* now change user_data (to NULL in this case) so destroy_notify should be
* called */
#if 0
state = 1;
gst_pad_set_blocked (pad, FALSE, block_async_full_cb,
NULL, block_async_full_destroy);
fail_unless (state == 2);
#endif
gst_object_unref (pad);
}
@ -897,7 +868,7 @@ GST_START_TEST (test_block_async_full_destroy_dispose)
gst_pad_push (pad, gst_buffer_new ());
/* block_async_full_cb sets state to 1 and then flushes to unblock temporarily
*/
fail_unless_equals_int (state, 2);
fail_unless_equals_int (state, 1);
gst_pad_push_event (pad, gst_event_new_flush_stop ());
/* gst_pad_dispose calls the destroy_notify function if necessary */
@ -926,6 +897,12 @@ unblock_async_no_flush_cb (GstPad * pad, gboolean blocked, gpointer user_data)
}
static void
unblock_async_not_called (GstPad * pad, gboolean blocked, gpointer user_data)
{
g_warn_if_reached ();
}
static void
block_async_second_no_flush (GstPad * pad, gboolean blocked, gpointer user_data)
{
@ -961,7 +938,8 @@ block_async_first_no_flush (GstPad * pad, gboolean blocked, gpointer user_data)
fail_unless (bool_user_data[1] == FALSE);
fail_unless (bool_user_data[2] == FALSE);
fail_unless (gst_pad_set_blocked (pad, FALSE, NULL, NULL, NULL));
fail_unless (gst_pad_set_blocked (pad, FALSE, unblock_async_not_called,
NULL, NULL));
/* replace block_async_first with block_async_second so next time the pad is
* blocked the latter should be called */