pad: allow probes to remove the data item whilst returning PROBE_OK

Use case: we want to block the source pad of a leaky queue and
drop the buffer that causes the block. If we return PROBE_DROP
then the buffer gets dropped, but we get called again. If we
return PROBE_OK we can't easily drop the buffer. If we just
replace the item into the GstPadProbeInfo structure with NULL,
GStreamer will push a NULL buffer to the next element when we
unblock the pad probe. This patch ensures it doesn't do that.

https://bugzilla.gnome.org/show_bug.cgi?id=734342
This commit is contained in:
Tim-Philipp Müller 2014-08-06 10:32:39 +01:00
parent 518babf6cb
commit 5deb4f658e
2 changed files with 88 additions and 1 deletions

View file

@ -3270,6 +3270,7 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
GstPadProbeType type, flags; GstPadProbeType type, flags;
GstPadProbeCallback callback; GstPadProbeCallback callback;
GstPadProbeReturn ret; GstPadProbeReturn ret;
gpointer original_data;
/* if we have called this callback, do nothing */ /* if we have called this callback, do nothing */
if (PROBE_COOKIE (hook) == data->cookie) { if (PROBE_COOKIE (hook) == data->cookie) {
@ -3283,6 +3284,7 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
flags = hook->flags >> G_HOOK_FLAG_USER_SHIFT; flags = hook->flags >> G_HOOK_FLAG_USER_SHIFT;
type = info->type; type = info->type;
original_data = info->data;
/* one of the data types for non-idle probes */ /* one of the data types for non-idle probes */
if ((type & GST_PAD_PROBE_TYPE_IDLE) == 0 if ((type & GST_PAD_PROBE_TYPE_IDLE) == 0
@ -3321,6 +3323,12 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
GST_OBJECT_LOCK (pad); GST_OBJECT_LOCK (pad);
if (original_data != NULL && info->data == NULL) {
GST_DEBUG_OBJECT (pad, "data item in pad probe info was dropped");
info->type = GST_PAD_PROBE_TYPE_INVALID;
data->dropped = TRUE;
}
switch (ret) { switch (ret) {
case GST_PAD_PROBE_REMOVE: case GST_PAD_PROBE_REMOVE:
/* remove the probe */ /* remove the probe */
@ -4265,7 +4273,8 @@ probe_stopped:
GST_OBJECT_UNLOCK (pad); GST_OBJECT_UNLOCK (pad);
pad->ABI.abi.last_flowret = pad->ABI.abi.last_flowret =
ret == GST_FLOW_CUSTOM_SUCCESS ? GST_FLOW_OK : ret; ret == GST_FLOW_CUSTOM_SUCCESS ? GST_FLOW_OK : ret;
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data)); if (data != NULL)
gst_mini_object_unref (GST_MINI_OBJECT_CAST (data));
switch (ret) { switch (ret) {
case GST_FLOW_CUSTOM_SUCCESS: case GST_FLOW_CUSTOM_SUCCESS:

View file

@ -1191,6 +1191,83 @@ GST_START_TEST (test_pad_probe_remove)
GST_END_TEST; GST_END_TEST;
typedef struct
{
gulong probe_id;
GstPad *probe_pad;
GThread *thread;
} BlockReplaceProbeHelper;
static gpointer
unblock_probe_thread (gpointer user_data)
{
BlockReplaceProbeHelper *helper = user_data;
GST_INFO_OBJECT (helper->probe_pad, "removing probe to unblock pad");
gst_pad_remove_probe (helper->probe_pad, helper->probe_id);
return NULL;
}
static GstPadProbeReturn
block_and_replace_buffer_probe_cb (GstPad * pad, GstPadProbeInfo * info,
gpointer user_data)
{
BlockReplaceProbeHelper *helper = user_data;
GST_INFO_OBJECT (pad, "about to block pad, replacing buffer");
/* we want to block, but also drop this buffer */
gst_buffer_unref (GST_BUFFER (info->data));
info->data = NULL;
helper->thread =
g_thread_new ("gst-pad-test-thread", unblock_probe_thread, helper);
return GST_PAD_PROBE_OK;
}
GST_START_TEST (test_pad_probe_block_and_drop_buffer)
{
BlockReplaceProbeHelper helper;
GstFlowReturn flow;
GstPad *src, *sink;
src = gst_pad_new ("src", GST_PAD_SRC);
gst_pad_set_active (src, TRUE);
sink = gst_pad_new ("sink", GST_PAD_SINK);
gst_pad_set_chain_function (sink, gst_check_chain_func);
gst_pad_set_active (sink, TRUE);
fail_unless (gst_pad_push_event (src,
gst_event_new_stream_start ("test")) == TRUE);
fail_unless (gst_pad_push_event (src,
gst_event_new_segment (&dummy_segment)) == TRUE);
fail_unless_equals_int (gst_pad_link (src, sink), GST_PAD_LINK_OK);
helper.probe_id = gst_pad_add_probe (src,
GST_PAD_PROBE_TYPE_BLOCK | GST_PAD_PROBE_TYPE_BUFFER,
block_and_replace_buffer_probe_cb, &helper, NULL);
helper.probe_pad = src;
/* push a buffer so the events are propagated downstream */
flow = gst_pad_push (src, gst_buffer_new ());
g_thread_join (helper.thread);
fail_unless_equals_int (flow, GST_FLOW_OK);
/* no buffer should have made it through to the sink pad, and especially
* not a NULL pointer buffer */
fail_if (buffers && buffers->data == NULL);
fail_unless (buffers == NULL);
gst_object_unref (src);
gst_object_unref (sink);
}
GST_END_TEST;
static GstPadProbeReturn static GstPadProbeReturn
probe_block_a (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) probe_block_a (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{ {
@ -2085,6 +2162,7 @@ gst_pad_suite (void)
tcase_add_test (tc_chain, test_pad_blocking_with_probe_type_blocking); tcase_add_test (tc_chain, test_pad_blocking_with_probe_type_blocking);
tcase_add_test (tc_chain, test_pad_probe_remove); tcase_add_test (tc_chain, test_pad_probe_remove);
tcase_add_test (tc_chain, test_pad_probe_block_add_remove); tcase_add_test (tc_chain, test_pad_probe_block_add_remove);
tcase_add_test (tc_chain, test_pad_probe_block_and_drop_buffer);
tcase_add_test (tc_chain, test_pad_probe_flush_events); tcase_add_test (tc_chain, test_pad_probe_flush_events);
tcase_add_test (tc_chain, test_queue_src_caps_notify_linked); tcase_add_test (tc_chain, test_queue_src_caps_notify_linked);
tcase_add_test (tc_chain, test_queue_src_caps_notify_not_linked); tcase_add_test (tc_chain, test_queue_src_caps_notify_not_linked);