gst/gstpad.c: Implement pad blocking on events according to part-block.txt.

Original commit message from CVS:
* gst/gstpad.c: (handle_pad_block), (gst_pad_push_event):
Implement pad blocking on events according to part-block.txt.
More comments on behaviour.
* tests/check/gst/gstevent.c: (test_event):
Send event to peer pad of blocked pad (else it will block).
This commit is contained in:
Edward Hervey 2006-07-03 10:30:49 +00:00
parent 904f7041d0
commit 4999836f48
3 changed files with 112 additions and 9 deletions

View file

@ -1,3 +1,11 @@
2006-07-03 Edward Hervey <edward@fluendo.com>
* gst/gstpad.c: (handle_pad_block), (gst_pad_push_event):
Implement pad blocking on events according to part-block.txt.
More comments on behaviour.
* tests/check/gst/gstevent.c: (test_event):
Send event to peer pad of blocked pad (else it will block).
2006-07-03 Thomas Vander Stichele <thomas at apestaart dot org> 2006-07-03 Thomas Vander Stichele <thomas at apestaart dot org>
* libs/gst/check/gstcheck.c: (gst_check_message_error), * libs/gst/check/gstcheck.c: (gst_check_message_error),

View file

@ -3070,7 +3070,28 @@ gst_ghost_pad_save_thyself (GstPad * pad, xmlNodePtr parent)
#endif /* GST_DISABLE_LOADSAVE */ #endif /* GST_DISABLE_LOADSAVE */
/* /*
* should be called with pad lock held * should be called with pad OBJECT_LOCK and STREAM_LOCK helt.
* GST_PAD_IS_BLOCK (pad) == TRUE when this function is
* called.
*
* This function perform the pad blocking when an event, buffer push
* or buffer_alloc is performed on a _SRC_ pad. It blocks the
* streaming thread after informing the pad has been blocked.
*
* An application can with this method wait and block any streaming
* thread and perform operations such as seeking or linking.
*
* Two methods are available for notifying the application of the
* block:
* - the callback method, which happens in the STREAMING thread with
* the STREAM_LOCK helt. With this method, the most usefull way of
* dealing with the callback is to post a message to the main thread
* where the pad block can then be handled outside of the streaming
* thread. With the last method one can perform all operations such
* as doing a state change, linking, unblocking, seeking etc on the
* pad.
* - the GCond signal method, which makes any thread unblock when
* the pad block happens.
* *
* MT safe. * MT safe.
*/ */
@ -3084,36 +3105,67 @@ handle_pad_block (GstPad * pad)
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"signal block taken on pad %s:%s", GST_DEBUG_PAD_NAME (pad)); "signal block taken on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
/* need to grab extra ref for the callbacks */ /* flushing, don't bother trying to block and return WRONG_STATE
* right away */
if (GST_PAD_IS_FLUSHING (pad))
goto flushingnonref;
/* we grab an extra ref for the callbacks, this is probably not
* needed (callback code does not have a ref and cannot unref). I
* think this was done to make it possible to unref the element in
* the callback, which is in the end totally impossible as it
* requires grabbing the STREAM_LOCK and OBJECT_LOCK which are
* all taken when calling this function. */
gst_object_ref (pad); gst_object_ref (pad);
/* we either have a callback installed to notify the block or
* some other thread is doing a GCond wait. */
callback = pad->block_callback; callback = pad->block_callback;
if (callback) { if (callback) {
/* there is a callback installed, call it. We release the
* lock so that the callback can do something usefull with the
* pad */
user_data = pad->block_data; user_data = pad->block_data;
GST_OBJECT_UNLOCK (pad); GST_OBJECT_UNLOCK (pad);
callback (pad, TRUE, user_data); callback (pad, TRUE, user_data);
GST_OBJECT_LOCK (pad); GST_OBJECT_LOCK (pad);
} else { } else {
/* no callback, signal the thread that is doing a GCond wait
* if any. */
GST_PAD_BLOCK_SIGNAL (pad); GST_PAD_BLOCK_SIGNAL (pad);
} }
/* OBJECT_LOCK could have been released when we did the callback, which
* then could have made the pad unblock so we need to check the blocking
* condition again. */
while (GST_PAD_IS_BLOCKED (pad)) { while (GST_PAD_IS_BLOCKED (pad)) {
if (GST_PAD_IS_FLUSHING (pad)) if (GST_PAD_IS_FLUSHING (pad))
goto flushing; goto flushing;
/* now we block the streaming thread. It can be unlocked when we
* deactivate the pad (which will also set the FLUSHING flag) or
* when the pad is unblocked. A flushing event will also unblock
* the pad after setting the FLUSHING flag. */
GST_PAD_BLOCK_WAIT (pad); GST_PAD_BLOCK_WAIT (pad);
/* see if we got unlocked by a flush or not */
if (GST_PAD_IS_FLUSHING (pad)) if (GST_PAD_IS_FLUSHING (pad))
goto flushing; goto flushing;
} }
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "got unblocked"); GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "got unblocked");
/* when we get here, the pad is unblocked again and we perform
* the needed unblock code. */
callback = pad->block_callback; callback = pad->block_callback;
if (callback) { if (callback) {
/* we need to call the callback */
user_data = pad->block_data; user_data = pad->block_data;
GST_OBJECT_UNLOCK (pad); GST_OBJECT_UNLOCK (pad);
callback (pad, FALSE, user_data); callback (pad, FALSE, user_data);
GST_OBJECT_LOCK (pad); GST_OBJECT_LOCK (pad);
} else { } else {
/* we need to signal the thread waiting on the GCond */
GST_PAD_BLOCK_SIGNAL (pad); GST_PAD_BLOCK_SIGNAL (pad);
} }
@ -3121,8 +3173,16 @@ handle_pad_block (GstPad * pad)
return ret; return ret;
flushingnonref:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pad %s:%s was flushing", GST_DEBUG_PAD_NAME (pad));
return GST_FLOW_WRONG_STATE;
}
flushing: flushing:
{ {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"pad %s:%s became flushing", GST_DEBUG_PAD_NAME (pad));
gst_object_unref (pad); gst_object_unref (pad);
return GST_FLOW_WRONG_STATE; return GST_FLOW_WRONG_STATE;
} }
@ -3634,22 +3694,41 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
g_return_val_if_fail (event != NULL, FALSE); g_return_val_if_fail (event != NULL, FALSE);
g_return_val_if_fail (GST_IS_EVENT (event), FALSE); g_return_val_if_fail (GST_IS_EVENT (event), FALSE);
GST_LOG_OBJECT (pad, "event:%s", GST_EVENT_TYPE_NAME (event));
GST_OBJECT_LOCK (pad); GST_OBJECT_LOCK (pad);
/* Two checks to be made:
* . (un)set the FLUSHING flag for flushing events,
* . handle pad blocking */
switch (GST_EVENT_TYPE (event)) { switch (GST_EVENT_TYPE (event)) {
case GST_EVENT_FLUSH_START: case GST_EVENT_FLUSH_START:
GST_PAD_SET_FLUSHING (pad); GST_PAD_SET_FLUSHING (pad);
if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
/* flush start will have set the FLUSHING flag and will then
* unlock all threads doing a GCond wait on the blocked pad. This
* will typically unblock the STREAMING thread blocked on a pad. */
GST_PAD_BLOCK_SIGNAL (pad);
goto flushed;
}
break; break;
case GST_EVENT_FLUSH_STOP: case GST_EVENT_FLUSH_STOP:
GST_PAD_UNSET_FLUSHING (pad); GST_PAD_UNSET_FLUSHING (pad);
if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
goto flushed;
break; break;
default: default:
break;
}
if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) { if (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad))) {
if (GST_EVENT_TYPE (event) == GST_EVENT_FLUSH_START) { if (GST_PAD_IS_FLUSHING (pad))
GST_PAD_BLOCK_SIGNAL (pad); goto flushed;
while (GST_PAD_IS_BLOCKED (pad))
/* else block the event as long as the pad is blocked */
if (handle_pad_block (pad) != GST_FLOW_OK)
goto flushed;
} }
break;
} }
if (G_UNLIKELY (GST_PAD_DO_EVENT_SIGNALS (pad) > 0)) { if (G_UNLIKELY (GST_PAD_DO_EVENT_SIGNALS (pad) > 0)) {
@ -3688,6 +3767,15 @@ not_linked:
GST_OBJECT_UNLOCK (pad); GST_OBJECT_UNLOCK (pad);
return FALSE; return FALSE;
} }
flushed:
{
GST_DEBUG_OBJECT (pad,
"Not forwarding event since we're flushing and blocking");
gst_event_unref (event);
GST_OBJECT_UNLOCK (pad);
return TRUE;
}
} }
/** /**

View file

@ -274,6 +274,7 @@ static void test_event
gboolean expect_before_q, GstPad * fake_srcpad) gboolean expect_before_q, GstPad * fake_srcpad)
{ {
GstEvent *event; GstEvent *event;
GstPad *peer;
gint i; gint i;
got_event_before_q = got_event_after_q = NULL; got_event_before_q = got_event_after_q = NULL;
@ -288,8 +289,14 @@ static void test_event
got_event_time.tv_sec = 0; got_event_time.tv_sec = 0;
got_event_time.tv_usec = 0; got_event_time.tv_usec = 0;
/* We block the pad so the stream lock is released and we can send the event */
fail_unless (gst_pad_set_blocked (fake_srcpad, TRUE) == TRUE); fail_unless (gst_pad_set_blocked (fake_srcpad, TRUE) == TRUE);
gst_pad_push_event (pad, event);
/* We send on the peer pad, since the pad is blocked */
fail_unless ((peer = gst_pad_get_peer (pad)) != NULL);
gst_pad_send_event (peer, event);
gst_object_unref (peer);
fail_unless (gst_pad_set_blocked (fake_srcpad, FALSE) == TRUE); fail_unless (gst_pad_set_blocked (fake_srcpad, FALSE) == TRUE);
/* Wait up to 5 seconds for the event to appear */ /* Wait up to 5 seconds for the event to appear */