gst/gstpad.c: emit have-data before checking for peers. This allows for probe handlers to connect elements. This he...

Original commit message from CVS:
* gst/gstpad.c: (gst_pad_push), (gst_pad_push_event):
emit have-data before checking for peers.  This allows
for probe handlers to connect elements.  This helps autopluggers.
* check/gst/gstpad.c: (GST_START_TEST), (_probe_handler),
(gst_pad_suite):
add six checks, linked/unlinked with no/true/false probe
This commit is contained in:
Thomas Vander Stichele 2005-10-05 16:16:58 +00:00
parent 593cdc30dd
commit 06c7f2231b
4 changed files with 358 additions and 34 deletions

View file

@ -1,3 +1,12 @@
2005-10-05 Thomas Vander Stichele <thomas at apestaart dot org>
* gst/gstpad.c: (gst_pad_push), (gst_pad_push_event):
emit have-data before checking for peers. This allows
for probe handlers to connect elements. This helps autopluggers.
* check/gst/gstpad.c: (GST_START_TEST), (_probe_handler),
(gst_pad_suite):
add six checks, linked/unlinked with no/true/false probe
2005-10-04 Wim Taymans <wim@fluendo.com>
* gst/elements/gstfakesink.c: (gst_fake_sink_get_property),

View file

@ -227,6 +227,157 @@ GST_START_TEST (test_name_is_valid)
GST_END_TEST;
static gboolean
_probe_handler (GstPad * pad, GstBuffer * buffer, gpointer userdata)
{
gint ret = GPOINTER_TO_INT (userdata);
if (ret == 1)
return TRUE;
return FALSE;
}
GST_START_TEST (test_push_unlinked)
{
GstPad *src;
GstCaps *caps;
GstBuffer *buffer;
gulong id;
src = gst_pad_new ("src", GST_PAD_SRC);
fail_if (src == NULL);
caps = gst_pad_get_allowed_caps (src);
fail_unless (caps == NULL);
caps = gst_caps_from_string ("foo/bar");
gst_pad_set_caps (src, caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
/* pushing on an unlinked pad will drop the buffer */
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
/* adding a probe that returns FALSE will drop the buffer without trying
* to chain */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (0));
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_remove_buffer_probe (src, id);
/* adding a probe that returns TRUE will still chain the buffer,
* and hence drop because pad is unlinked */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (1));
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_remove_buffer_probe (src, id);
/* cleanup */
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
ASSERT_OBJECT_REFCOUNT (src, "src", 1);
gst_object_unref (src);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_caps_unref (caps);
}
GST_END_TEST;
GST_START_TEST (test_push_linked)
{
GstPad *src, *sink;
GstPadLinkReturn plr;
GstCaps *caps;
GstBuffer *buffer;
ulong id;
/* setup */
sink = gst_pad_new ("sink", GST_PAD_SINK);
fail_if (sink == NULL);
gst_pad_set_chain_function (sink, gst_check_chain_func);
src = gst_pad_new ("src", GST_PAD_SRC);
fail_if (src == NULL);
caps = gst_caps_from_string ("foo/bar");
/* one for me */
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_pad_set_caps (src, caps);
gst_pad_set_caps (sink, caps);
/* one for me and one for each set_caps */
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
plr = gst_pad_link (src, sink);
fail_unless (GST_PAD_LINK_SUCCESSFUL (plr));
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
/* test */
/* pushing on a linked pad will drop the ref to the buffer */
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 2);
gst_buffer_unref (buffer);
fail_unless_equals_int (g_list_length (buffers), 1);
buffer = GST_BUFFER (buffers->data);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
g_list_free (buffers);
buffers = NULL;
/* adding a probe that returns FALSE will drop the buffer without trying
* to chain */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (0));
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_remove_buffer_probe (src, id);
fail_unless_equals_int (g_list_length (buffers), 0);
/* adding a probe that returns TRUE will still chain the buffer */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (1));
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
gst_pad_remove_buffer_probe (src, id);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 2);
gst_buffer_unref (buffer);
fail_unless_equals_int (g_list_length (buffers), 1);
buffer = GST_BUFFER (buffers->data);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
g_list_free (buffers);
buffers = NULL;
/* teardown */
gst_pad_unlink (src, sink);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
gst_object_unref (src);
gst_object_unref (sink);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_caps_unref (caps);
}
GST_END_TEST;
Suite *
gst_pad_suite (void)
@ -243,6 +394,8 @@ gst_pad_suite (void)
tcase_add_test (tc_chain, test_get_allowed_caps);
tcase_add_test (tc_chain, test_link_unlink_threaded);
tcase_add_test (tc_chain, test_name_is_valid);
tcase_add_test (tc_chain, test_push_unlinked);
tcase_add_test (tc_chain, test_push_linked);
return s;
}

View file

@ -766,13 +766,16 @@ gst_pad_is_active (GstPad * pad)
/**
* gst_pad_set_blocked_async:
* @pad: the #GstPad to block or unblock
* @blocked: boolean indicating we should block or unblock
* @blocked: boolean indicating whether the pad should be blocked or unblocked
* @callback: #GstPadBlockCallback that will be called when the
* operation succeeds.
* operation succeeds
* @user_data: user data passed to the callback
*
* Blocks or unblocks the dataflow on a pad. The provided callback
* is called when the operation succeeds. This can take a while as
* is called when the operation succeeds; this happens right before the next
* attempt at pushing a buffer on the pad.
*
* This can take a while as
* the pad can only become blocked when real dataflow is happening.
* When the pipeline is stalled, for example in PAUSED, this can
* take an indeterminate amount of time.
@ -3075,7 +3078,8 @@ no_function:
* @pad: a source #GstPad.
* @buffer: the #GstBuffer to push.
*
* Pushes a buffer to the peer of @pad. @pad must be linked.
* Pushes a buffer to the peer of @pad.
* buffer probes will be triggered before the buffer gets pushed.
*
* Returns: a #GstFlowReturn from the peer pad.
*
@ -3087,6 +3091,7 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer)
GstPad *peer;
GstFlowReturn ret;
gboolean emit_signal;
gboolean signal_ret = TRUE;
g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_PAD_DIRECTION (pad) == GST_PAD_SRC, GST_FLOW_ERROR);
@ -3094,24 +3099,36 @@ gst_pad_push (GstPad * pad, GstBuffer * buffer)
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
GST_LOCK (pad);
/* FIXME: this check can go away; pad_set_blocked could be implemented with
* probes completely */
while (G_UNLIKELY (GST_PAD_IS_BLOCKED (pad)))
handle_pad_block (pad);
if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
goto not_linked;
/* we emit signals on the pad areg, the peer will have a chance to
/* we emit signals on the pad arg, the peer will have a chance to
* emit in the _chain() function */
emit_signal = GST_PAD_DO_BUFFER_SIGNALS (pad) > 0;
gst_object_ref (peer);
GST_UNLOCK (pad);
if (G_UNLIKELY (emit_signal)) {
if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (buffer)))
goto dropping;
signal_ret = gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (buffer));
}
/* if the signal handler returned FALSE, it means we should just drop the
* buffer */
if (signal_ret == FALSE) {
gst_buffer_unref (buffer);
GST_DEBUG_OBJECT (pad, "Dropping buffer due to FALSE probe return");
return GST_FLOW_OK;
}
GST_LOCK (pad);
if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
goto not_linked;
gst_object_ref (peer);
GST_UNLOCK (pad);
ret = gst_pad_chain (peer, buffer);
gst_object_unref (peer);
@ -3127,13 +3144,6 @@ not_linked:
GST_UNLOCK (pad);
return GST_FLOW_NOT_LINKED;
}
dropping:
{
gst_buffer_unref (buffer);
gst_object_unref (peer);
GST_DEBUG ("Dropping buffer due to FALSE probe return");
return GST_FLOW_OK;
}
}
/**
@ -3374,25 +3384,31 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
GstPad *peerpad;
gboolean result;
gboolean emit_signal;
gboolean signal_ret = TRUE;
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
g_return_val_if_fail (event != NULL, FALSE);
emit_signal = GST_PAD_DO_EVENT_SIGNALS (pad) > 0;
if (G_UNLIKELY (emit_signal)) {
signal_ret = gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (event));
}
if (signal_ret == FALSE) {
GST_DEBUG_OBJECT (pad, "Dropping event after FALSE probe return");
gst_event_unref (event);
return FALSE;
}
GST_LOCK (pad);
peerpad = GST_PAD_PEER (pad);
if (peerpad == NULL)
goto not_linked;
emit_signal = GST_PAD_DO_EVENT_SIGNALS (pad) > 0;
gst_object_ref (peerpad);
GST_UNLOCK (pad);
if (G_UNLIKELY (emit_signal)) {
if (!gst_pad_emit_have_data_signal (pad, GST_MINI_OBJECT (event)))
goto dropping;
}
result = gst_pad_send_event (peerpad, event);
gst_object_unref (peerpad);
@ -3406,13 +3422,6 @@ not_linked:
GST_UNLOCK (pad);
return FALSE;
}
dropping:
{
GST_DEBUG ("Dropping event after FALSE probe return");
gst_object_unref (peerpad);
gst_event_unref (event);
return FALSE;
}
}
/**

View file

@ -227,6 +227,157 @@ GST_START_TEST (test_name_is_valid)
GST_END_TEST;
static gboolean
_probe_handler (GstPad * pad, GstBuffer * buffer, gpointer userdata)
{
gint ret = GPOINTER_TO_INT (userdata);
if (ret == 1)
return TRUE;
return FALSE;
}
GST_START_TEST (test_push_unlinked)
{
GstPad *src;
GstCaps *caps;
GstBuffer *buffer;
gulong id;
src = gst_pad_new ("src", GST_PAD_SRC);
fail_if (src == NULL);
caps = gst_pad_get_allowed_caps (src);
fail_unless (caps == NULL);
caps = gst_caps_from_string ("foo/bar");
gst_pad_set_caps (src, caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
/* pushing on an unlinked pad will drop the buffer */
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
/* adding a probe that returns FALSE will drop the buffer without trying
* to chain */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (0));
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_remove_buffer_probe (src, id);
/* adding a probe that returns TRUE will still chain the buffer,
* and hence drop because pad is unlinked */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (1));
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_remove_buffer_probe (src, id);
/* cleanup */
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
ASSERT_OBJECT_REFCOUNT (src, "src", 1);
gst_object_unref (src);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_caps_unref (caps);
}
GST_END_TEST;
GST_START_TEST (test_push_linked)
{
GstPad *src, *sink;
GstPadLinkReturn plr;
GstCaps *caps;
GstBuffer *buffer;
ulong id;
/* setup */
sink = gst_pad_new ("sink", GST_PAD_SINK);
fail_if (sink == NULL);
gst_pad_set_chain_function (sink, gst_check_chain_func);
src = gst_pad_new ("src", GST_PAD_SRC);
fail_if (src == NULL);
caps = gst_caps_from_string ("foo/bar");
/* one for me */
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_pad_set_caps (src, caps);
gst_pad_set_caps (sink, caps);
/* one for me and one for each set_caps */
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
plr = gst_pad_link (src, sink);
fail_unless (GST_PAD_LINK_SUCCESSFUL (plr));
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
/* test */
/* pushing on a linked pad will drop the ref to the buffer */
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 2);
gst_buffer_unref (buffer);
fail_unless_equals_int (g_list_length (buffers), 1);
buffer = GST_BUFFER (buffers->data);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
g_list_free (buffers);
buffers = NULL;
/* adding a probe that returns FALSE will drop the buffer without trying
* to chain */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (0));
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_remove_buffer_probe (src, id);
fail_unless_equals_int (g_list_length (buffers), 0);
/* adding a probe that returns TRUE will still chain the buffer */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (1));
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
gst_pad_remove_buffer_probe (src, id);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 2);
gst_buffer_unref (buffer);
fail_unless_equals_int (g_list_length (buffers), 1);
buffer = GST_BUFFER (buffers->data);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
g_list_free (buffers);
buffers = NULL;
/* teardown */
gst_pad_unlink (src, sink);
ASSERT_CAPS_REFCOUNT (caps, "caps", 3);
gst_object_unref (src);
gst_object_unref (sink);
ASSERT_CAPS_REFCOUNT (caps, "caps", 1);
gst_caps_unref (caps);
}
GST_END_TEST;
Suite *
gst_pad_suite (void)
@ -243,6 +394,8 @@ gst_pad_suite (void)
tcase_add_test (tc_chain, test_get_allowed_caps);
tcase_add_test (tc_chain, test_link_unlink_threaded);
tcase_add_test (tc_chain, test_name_is_valid);
tcase_add_test (tc_chain, test_push_unlinked);
tcase_add_test (tc_chain, test_push_linked);
return s;
}