pad: further improve probes and pad blocking

Keep track of installed number of probes to shortcut emission.
Allow NULL callbacks, this is useful for blocking probes.
Improve probe selection based on the mask, an empty mask for the data or the
scheduling flags equals that all probes match.
Add some more debug info.
Don't check the flushing flag in the probe callback handler, this needs to be
done before calling the handler.
Fix blocking probes.
Fix unit tests
This commit is contained in:
Wim Taymans 2011-06-01 19:27:55 +02:00
parent 8abc457a3b
commit 3f51563319
10 changed files with 262 additions and 207 deletions

View file

@ -1118,7 +1118,6 @@ gst_pad_add_probe (GstPad * pad, GstProbeType mask,
g_return_val_if_fail (GST_IS_PAD (pad), 0);
g_return_val_if_fail (mask != 0, 0);
g_return_val_if_fail (callback != NULL, 0);
GST_OBJECT_LOCK (pad);
/* make a new probe */
@ -1127,6 +1126,13 @@ gst_pad_add_probe (GstPad * pad, GstProbeType mask,
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "adding probe for mask 0x%08x",
mask);
/* when no contraints are given for the types, assume all types are
* acceptable */
if ((mask & GST_PROBE_TYPE_DATA) == 0)
mask |= GST_PROBE_TYPE_DATA;
if ((mask & GST_PROBE_TYPE_SCHEDULING) == 0)
mask |= GST_PROBE_TYPE_SCHEDULING;
/* store our flags and other fields */
hook->flags |= (mask << G_HOOK_FLAG_USER_SHIFT);
hook->func = callback;
@ -1139,33 +1145,41 @@ gst_pad_add_probe (GstPad * pad, GstProbeType mask,
/* add the probe */
g_hook_prepend (&pad->probes, hook);
pad->num_probes++;
/* get the id of the hook, we return this and it can be used to remove the
* probe later */
res = hook->hook_id;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "got probe id %lu", res);
if (mask & (GST_PROBE_TYPE_IDLE | GST_PROBE_TYPE_BLOCK)) {
if (mask & GST_PROBE_TYPE_BLOCKING) {
/* we have a block probe */
pad->num_blocked++;
GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKED);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "added blocking probe, "
"now %d blocking probes", pad->num_blocked);
}
if (pad->priv->using > 0) {
/* the pad is in use, we can't signal the idle callback yet. Since we set the
* flag above, the last thread to leave the push will do the callback. New
* threads going into the push will block. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is in use");
GST_OBJECT_UNLOCK (pad);
} else {
/* the pad is idle now, we can signal the idle callback now */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is idle");
GST_OBJECT_UNLOCK (pad);
/* call the callback if we need to be called for idle callbacks */
if ((mask & GST_PROBE_TYPE_IDLE) && (callback != NULL)) {
if (pad->priv->using > 0) {
/* the pad is in use, we can't signal the idle callback yet. Since we set the
* flag above, the last thread to leave the push will do the callback. New
* threads going into the push will block. */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is in use");
GST_OBJECT_UNLOCK (pad);
} else {
/* the pad is idle now, we can signal the idle callback now */
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pad is idle");
GST_OBJECT_UNLOCK (pad);
/* call the callback if we need to be called for idle callbacks */
if (mask & GST_PROBE_TYPE_IDLE)
callback (pad, GST_PROBE_TYPE_IDLE, NULL, user_data);
}
} else {
GST_OBJECT_UNLOCK (pad);
}
return TRUE;
return res;
}
static void
@ -1175,15 +1189,19 @@ cleanup_hook (GstPad * pad, GHook * hook)
type = (hook->flags) >> G_HOOK_FLAG_USER_SHIFT;
if (type & (GST_PROBE_TYPE_IDLE | GST_PROBE_TYPE_BLOCK)) {
if (type & GST_PROBE_TYPE_BLOCKING) {
/* unblock when we remove the last blocking probe */
pad->num_blocked--;
GST_DEBUG_OBJECT (pad, "remove blocking probe, now %d left",
pad->num_blocked);
if (pad->num_blocked == 0) {
GST_DEBUG_OBJECT (pad, "last blocking probe removed, unblocking");
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKED);
GST_PAD_BLOCK_BROADCAST (pad);
}
}
g_hook_destroy_link (&pad->probes, hook);
pad->num_probes--;
}
/**
@ -1217,8 +1235,8 @@ gst_pad_remove_probe (GstPad * pad, gulong id)
not_found:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "no hook with id %d", id);
GST_OBJECT_UNLOCK (pad);
g_warning ("%s: pad `%p' has no probe with id `%lu'", G_STRLOC, pad, id);
return;
}
}
@ -3486,6 +3504,8 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
{
GstPad *pad = data->pad;
GstProbeType flags;
GstPadProbeCallback callback;
GstProbeReturn ret;
/* if we have called this callback, do nothing */
if (PROBE_COOKIE (hook) == data->cookie)
@ -3495,57 +3515,68 @@ probe_hook_marshal (GHook * hook, ProbeMarshall * data)
flags = hook->flags >> G_HOOK_FLAG_USER_SHIFT;
/* check if type matches */
if ((flags & GST_PROBE_TYPE_DATA & data->mask) == 0)
return;
if ((flags & 0xc0 & data->mask) == 0)
return;
if ((flags & 0x6) != (data->mask & 0x6))
return;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"have hook %lu with flags 0x%08x", hook->hook_id, flags);
"hook %lu with flags 0x%08x matches", hook->hook_id, flags);
if ((flags & data->mask) == flags) {
GstPadProbeCallback callback;
callback = (GstPadProbeCallback) hook->func;
if (callback == NULL)
return;
callback = (GstPadProbeCallback) hook->func;
if (callback) {
GstProbeReturn ret;
GST_OBJECT_UNLOCK (pad);
GST_OBJECT_UNLOCK (pad);
ret = callback (pad, data->mask, data->type_data, hook->data);
ret = callback (pad, data->mask, data->type_data, hook->data);
GST_OBJECT_LOCK (pad);
GST_OBJECT_LOCK (pad);
switch (ret) {
case GST_PROBE_REMOVE:
/* remove the probe */
GST_DEBUG_OBJECT (pad, "asked to remove hook");
cleanup_hook (pad, hook);
break;
case GST_PROBE_DROP:
/* need to drop the data, make sure other probes don't get called
* anymore */
GST_DEBUG_OBJECT (pad, "asked to drop item");
data->mask = GST_PROBE_TYPE_INVALID;
data->ret = GST_PROBE_DROP;
break;
case GST_PROBE_PASS:
/* inform the pad block to let things pass */
GST_DEBUG_OBJECT (pad, "asked to pass item");
data->pass = TRUE;
break;
default:
GST_DEBUG_OBJECT (pad, "probe returned %d", ret);
break;
}
}
switch (ret) {
case GST_PROBE_REMOVE:
/* remove the probe */
GST_DEBUG_OBJECT (pad, "asked to remove hook");
cleanup_hook (pad, hook);
break;
case GST_PROBE_DROP:
/* need to drop the data, make sure other probes don't get called
* anymore */
GST_DEBUG_OBJECT (pad, "asked to drop item");
data->mask = GST_PROBE_TYPE_INVALID;
data->ret = GST_PROBE_DROP;
break;
case GST_PROBE_PASS:
/* inform the pad block to let things pass */
GST_DEBUG_OBJECT (pad, "asked to pass item");
data->pass = TRUE;
break;
default:
GST_DEBUG_OBJECT (pad, "probe returned %d", ret);
break;
}
}
#define PROBE(pad,mask,data,label) \
G_STMT_START { \
if (G_UNLIKELY (pad->num_probes)) { \
ret = do_probe_callbacks (pad, mask, data); \
if (G_UNLIKELY (ret != GST_FLOW_OK)) \
goto label; \
} \
} G_STMT_END
static GstFlowReturn
do_probe_callbacks (GstPad * pad, GstProbeType mask, gpointer type_data)
{
ProbeMarshall data;
guint cookie;
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto flushing;
data.pad = pad;
data.mask = mask;
data.type_data = type_data;
@ -3556,19 +3587,16 @@ do_probe_callbacks (GstPad * pad, GstProbeType mask, gpointer type_data)
again:
cookie = pad->priv->probe_cookie;
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"doing callbacks for probe 0x%08x", data.mask);
g_hook_list_marshal (&pad->probes, FALSE,
(GHookMarshaller) probe_hook_marshal, &data);
/* if the list changed, call the new callbacks (they will not have their
* cookie set to data.cookie */
if (cookie != pad->priv->probe_cookie)
if (cookie != pad->priv->probe_cookie) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"probe list changed, restarting");
goto again;
/* we might have released the lock */
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto flushing;
}
if (data.ret == GST_PROBE_DROP)
goto dropped;
@ -3576,8 +3604,15 @@ again:
if (data.pass)
goto passed;
if (mask & (GST_PROBE_TYPE_BLOCK)) {
while (GST_PAD_IS_BLOCKING (pad)) {
if (mask & GST_PROBE_TYPE_BLOCK) {
while (GST_PAD_IS_BLOCKED (pad)) {
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"we are blocked %d times", pad->num_blocked);
/* we might have released the lock */
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto flushing;
/* now we block the streaming thread. It can be unlocked when we
* deactivate the pad (which will also set the FLUSHING flag) or
* when the pad is unblocked. A flushing event will also unblock
@ -3587,6 +3622,7 @@ again:
GST_OBJECT_FLAG_SET (pad, GST_PAD_BLOCKING);
GST_PAD_BLOCK_WAIT (pad);
GST_OBJECT_FLAG_UNSET (pad, GST_PAD_BLOCKING);
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "We got unblocked");
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto flushing;
@ -3782,11 +3818,7 @@ gst_pad_chain_data_unchecked (GstPad * pad, gboolean is_buffer, void *data)
goto events_error;
}
ret =
do_probe_callbacks (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_BUFFER,
data);
if (ret != GST_FLOW_OK)
goto probe_stopped;
PROBE (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_BUFFER, data, probe_stopped);
GST_OBJECT_UNLOCK (pad);
@ -3976,18 +4008,6 @@ gst_pad_chain_list (GstPad * pad, GstBufferList * list)
return gst_pad_chain_data_unchecked (pad, FALSE, list);
}
static void
pad_post_push (GstPad * pad)
{
GST_OBJECT_LOCK (pad);
pad->priv->using--;
if (pad->priv->using == 0) {
/* pad is not active anymore, trigger idle callbacks */
do_probe_callbacks (pad, GST_PROBE_TYPE_IDLE, NULL);
}
GST_OBJECT_UNLOCK (pad);
}
static GstFlowReturn
gst_pad_push_data (GstPad * pad, gboolean is_buffer, void *data)
{
@ -4003,14 +4023,10 @@ gst_pad_push_data (GstPad * pad, gboolean is_buffer, void *data)
goto flushing;
/* do block probes */
ret = do_probe_callbacks (pad, type | GST_PROBE_TYPE_BLOCK, data);
if (ret != GST_FLOW_OK)
goto probe_stopped;
PROBE (pad, type | GST_PROBE_TYPE_BLOCK, data, probe_stopped);
/* do post-blocking probes */
ret = do_probe_callbacks (pad, type, data);
if (ret != GST_FLOW_OK)
goto probe_stopped;
PROBE (pad, type, data, probe_stopped);
if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
goto not_linked;
@ -4024,7 +4040,13 @@ gst_pad_push_data (GstPad * pad, gboolean is_buffer, void *data)
gst_object_unref (peer);
pad_post_push (pad);
GST_OBJECT_LOCK (pad);
pad->priv->using--;
if (pad->priv->using == 0) {
/* pad is not active anymore, trigger idle callbacks */
PROBE (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_IDLE, NULL, probe_stopped);
}
GST_OBJECT_UNLOCK (pad);
return ret;
@ -4176,12 +4198,9 @@ gst_pad_get_range_unchecked (GstPad * pad, guint64 offset, guint size,
/* can only fire the signal if we have a valid buffer */
GST_OBJECT_LOCK (pad);
ret =
do_probe_callbacks (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER,
*buffer);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto probe_stopped;
GST_OBJECT_LOCK (pad);
PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, *buffer,
probe_stopped);
GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
@ -4302,11 +4321,8 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
GST_OBJECT_LOCK (pad);
ret =
do_probe_callbacks (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BLOCK,
NULL);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto pre_probe_stopped;
PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BLOCK, NULL,
pre_probe_stopped);
if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
goto not_linked;
@ -4322,11 +4338,8 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
goto pull_range_failed;
GST_OBJECT_LOCK (pad);
ret =
do_probe_callbacks (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER,
buffer);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto post_probe_stopped;
PROBE (pad, GST_PROBE_TYPE_PULL | GST_PROBE_TYPE_BUFFER, buffer,
post_probe_stopped);
needs_events = GST_PAD_NEEDS_EVENTS (pad);
if (G_UNLIKELY (needs_events)) {
@ -4405,7 +4418,7 @@ gst_pad_push_event (GstPad * pad, GstEvent * event)
{
GstFlowReturn ret;
GstPad *peerpad;
gboolean result, need_probes, do_probes = TRUE, do_event_actions = TRUE;
gboolean result, do_event_actions = TRUE;
gboolean stored = FALSE;
g_return_val_if_fail (GST_IS_PAD (pad), FALSE);
@ -4418,7 +4431,6 @@ again:
GST_OBJECT_LOCK (pad);
peerpad = GST_PAD_PEER (pad);
need_probes = do_probes && (GST_PAD_DO_EVENT_SIGNALS (pad) > 0);
/* Two checks to be made:
* . (un)set the FLUSHING flag for flushing events,
@ -4487,7 +4499,7 @@ again:
offset = pad->offset;
/* check if we need to adjust the segment */
if (offset != 0 && (need_probes || peerpad != NULL)) {
if (offset != 0 && (peerpad != NULL)) {
GstSegment segment;
/* copy segment values */
@ -4511,21 +4523,15 @@ again:
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto flushed;
ret = do_probe_callbacks (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_EVENT
| GST_PROBE_TYPE_BLOCK, event);
if (ret != GST_FLOW_OK)
goto probe_stopped;
PROBE (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_EVENT
| GST_PROBE_TYPE_BLOCK, event, probe_stopped);
break;
}
}
/* send probes after modifying the events above */
ret =
do_probe_callbacks (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_EVENT,
event);
if (ret != GST_FLOW_OK)
goto probe_stopped;
PROBE (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_EVENT, event, probe_stopped);
/* now check the peer pad */
if (peerpad == NULL)
@ -4546,7 +4552,13 @@ again:
gst_object_unref (peerpad);
pad_post_push (pad);
GST_OBJECT_LOCK (pad);
pad->priv->using--;
if (pad->priv->using == 0) {
/* pad is not active anymore, trigger idle callbacks */
PROBE (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_IDLE, NULL, probe_stopped);
}
GST_OBJECT_UNLOCK (pad);
return result | stored;
@ -4715,9 +4727,8 @@ gst_pad_send_event (GstPad * pad, GstEvent * event)
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
goto flushing;
ret = do_probe_callbacks (pad, GST_PROBE_TYPE_EVENT, event);
if (ret != GST_FLOW_OK)
goto probe_stopped;
PROBE (pad, GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_EVENT, event,
probe_stopped);
break;
}

View file

@ -486,16 +486,22 @@ typedef gboolean (*GstPadDispatcherFunction) (GstPad *pad, gpointer data);
typedef enum
{
GST_PROBE_TYPE_INVALID = (1 << 0),
GST_PROBE_TYPE_IDLE = (1 << 1),
GST_PROBE_TYPE_BLOCK = (1 << 2),
GST_PROBE_TYPE_BUFFER = (1 << 3),
GST_PROBE_TYPE_BUFFER_LIST = (1 << 4),
GST_PROBE_TYPE_EVENT = (1 << 5),
GST_PROBE_TYPE_PUSH = (1 << 6),
GST_PROBE_TYPE_PULL = (1 << 7),
} GstProbeType;
#define GST_PROBE_TYPE_DATA (GST_PROBE_TYPE_BUFFER | GST_PROBE_TYPE_EVENT | GST_PROBE_TYPE_BUFFER_LIST)
#define GST_PROBE_TYPE_BLOCKING (GST_PROBE_TYPE_IDLE | GST_PROBE_TYPE_BLOCK)
#define GST_PROBE_TYPE_DATA (GST_PROBE_TYPE_BUFFER | GST_PROBE_TYPE_EVENT | \
GST_PROBE_TYPE_BUFFER_LIST)
#define GST_PROBE_TYPE_SCHEDULING (GST_PROBE_TYPE_PUSH | GST_PROBE_TYPE_PULL)
/**
* GstProbeResult:
@ -509,8 +515,8 @@ typedef enum
*/
typedef enum
{
GST_PROBE_OK,
GST_PROBE_DROP,
GST_PROBE_OK,
GST_PROBE_REMOVE,
GST_PROBE_PASS,
} GstProbeReturn;
@ -685,6 +691,7 @@ struct _GstPad {
* of handlers attached. */
gint do_buffer_signals;
gint do_event_signals;
gint num_probes;
gint num_blocked;
/*< private >*/

View file

@ -34,8 +34,9 @@ static GstStaticPadTemplate srctemplate = GST_STATIC_PAD_TEMPLATE ("src",
GST_STATIC_CAPS_ANY);
/* Data probe cb to drop everything but count buffers and events */
static gboolean
probe_cb (GstPad * pad, GstMiniObject * obj, gpointer user_data)
static GstProbeReturn
probe_cb (GstPad * pad, GstProbeType type, GstMiniObject * obj,
gpointer user_data)
{
gint count = 0;
const gchar *count_type = NULL;
@ -56,7 +57,7 @@ probe_cb (GstPad * pad, GstMiniObject * obj, gpointer user_data)
g_object_set_data (G_OBJECT (pad), count_type, GINT_TO_POINTER (count));
/* drop everything */
return FALSE;
return GST_PROBE_DROP;
}
/* Create and link output pad: selector:src%d ! output_pad */
@ -75,7 +76,8 @@ setup_output_pad (GstElement * element, GstStaticPadTemplate * tmpl)
/* add probe */
probe_id =
gst_pad_add_data_probe (output_pad, G_CALLBACK (probe_cb), NULL, NULL);
gst_pad_add_probe (output_pad, GST_PROBE_TYPE_DATA,
(GstPadProbeCallback) probe_cb, NULL, NULL);
g_object_set_data (G_OBJECT (output_pad), "probe_id",
GINT_TO_POINTER (probe_id));
@ -111,7 +113,7 @@ cleanup_pad (GstPad * pad, GstElement * element)
/* remove probe if necessary */
probe_id = GPOINTER_TO_INT (g_object_get_data (G_OBJECT (pad), "probe_id"));
if (probe_id)
gst_pad_remove_data_probe (pad, probe_id);
gst_pad_remove_probe (pad, probe_id);
/* unlink */
selpad = gst_pad_get_peer (pad);
@ -315,7 +317,8 @@ run_input_selector_buffer_count (gint num_input_pads,
}
/* add probe */
probe_id =
gst_pad_add_data_probe (output_pad, G_CALLBACK (probe_cb), NULL, NULL);
gst_pad_add_probe (output_pad, GST_PROBE_TYPE_DATA,
(GstPadProbeCallback) probe_cb, NULL, NULL);
g_object_set_data (G_OBJECT (output_pad), "probe_id",
GINT_TO_POINTER (probe_id));
@ -329,7 +332,7 @@ run_input_selector_buffer_count (gint num_input_pads,
GST_STATE_NULL) == GST_STATE_CHANGE_SUCCESS, "could not set to null");
/* clean up */
gst_pad_remove_data_probe (output_pad, probe_id);
gst_pad_remove_probe (output_pad, probe_id);
gst_pad_set_active (output_pad, FALSE);
gst_check_teardown_sink_pad (sel);
GST_DEBUG ("setting selector pad to NULL");

View file

@ -763,13 +763,16 @@ GST_END_TEST;
static GMutex *blocked_lock;
static GCond *blocked_cond;
static void
pad_blocked_cb (GstPad * pad, GstBlockType type, gpointer user_data)
static GstProbeReturn
pad_blocked_cb (GstPad * pad, GstProbeType type, gpointer type_data,
gpointer user_data)
{
g_mutex_lock (blocked_lock);
GST_DEBUG ("srcpad blocked: %d, sending signal", type);
g_cond_signal (blocked_cond);
g_mutex_unlock (blocked_lock);
return GST_PROBE_OK;
}
GST_START_TEST (test_add_live2)
@ -778,6 +781,7 @@ GST_START_TEST (test_add_live2)
GstStateChangeReturn ret;
GstState current, pending;
GstPad *srcpad, *sinkpad;
gulong id;
blocked_lock = g_mutex_new ();
blocked_cond = g_cond_new ();
@ -799,7 +803,8 @@ GST_START_TEST (test_add_live2)
GST_DEBUG ("blocking srcpad");
/* block source pad */
srcpad = gst_element_get_static_pad (src, "src");
gst_pad_block (srcpad, GST_BLOCK_TYPE_DATA, pad_blocked_cb, NULL, NULL);
id = gst_pad_add_probe (srcpad, GST_PROBE_TYPE_BLOCK, pad_blocked_cb, NULL,
NULL);
/* set source to PAUSED without adding it to the pipeline */
ret = gst_element_set_state (src, GST_STATE_PAUSED);
@ -827,7 +832,7 @@ GST_START_TEST (test_add_live2)
GST_DEBUG ("unblocking srcpad");
/* and unblock */
gst_pad_unblock (srcpad);
gst_pad_remove_probe (srcpad, id);
GST_DEBUG ("getting state");

View file

@ -265,9 +265,10 @@ static GstEvent *got_event_before_q, *got_event_after_q;
static GTimeVal got_event_time;
static GstProbeReturn
event_probe (GstPad * pad, GstProbeType type, GstMiniObject * data,
event_probe (GstPad * pad, GstProbeType type, gpointer type_data,
gpointer user_data)
{
GstMiniObject *data = type_data;
gboolean before_q = (gboolean) GPOINTER_TO_INT (user_data);
GST_DEBUG ("event probe called %p", data);
@ -401,6 +402,7 @@ static void test_event
signal_data_wait (&data);
/* We send on the peer pad, since the pad is blocked */
GST_DEBUG ("sending event %p", event);
fail_unless ((peer = gst_pad_get_peer (pad)) != NULL);
gst_pad_send_event (peer, event);
gst_object_unref (peer);
@ -479,11 +481,11 @@ GST_START_TEST (send_custom_events)
/* add pad-probes to faksrc.src and fakesink.sink */
fail_if ((srcpad = gst_element_get_static_pad (fakesrc, "src")) == NULL);
gst_pad_add_probe (srcpad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) event_probe, GINT_TO_POINTER (TRUE), NULL);
event_probe, GINT_TO_POINTER (TRUE), NULL);
fail_if ((sinkpad = gst_element_get_static_pad (fakesink, "sink")) == NULL);
gst_pad_add_probe (sinkpad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) event_probe, GINT_TO_POINTER (FALSE), NULL);
event_probe, GINT_TO_POINTER (FALSE), NULL);
/* Upstream events */
test_event (pipeline, GST_EVENT_CUSTOM_UPSTREAM, sinkpad, TRUE, srcpad);

View file

@ -517,8 +517,8 @@ GST_START_TEST (test_ghost_pads_block)
block_data.cond = g_cond_new ();
g_mutex_lock (block_data.mutex);
gst_pad_add_probe (srcghost, GST_PROBE_TYPE_DATA, block_callback, &block_data,
NULL);
gst_pad_add_probe (srcghost, GST_PROBE_TYPE_BLOCK, block_callback,
&block_data, NULL);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
/* and wait now */
g_cond_wait (block_data.cond, block_data.mutex);
@ -559,8 +559,8 @@ GST_START_TEST (test_ghost_pads_probes)
block_data.cond = g_cond_new ();
g_mutex_lock (block_data.mutex);
gst_pad_add_probe (srcghost, GST_PROBE_TYPE_DATA, block_callback, &block_data,
NULL);
gst_pad_add_probe (srcghost, GST_PROBE_TYPE_BLOCK, block_callback,
&block_data, NULL);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
/* and wait now */
g_cond_wait (block_data.cond, block_data.mutex);

View file

@ -907,6 +907,8 @@ block_async_second_no_flush (GstPad * pad, GstProbeType type,
{
gboolean *bool_user_data = (gboolean *) user_data;
GST_DEBUG ("second probe called");
fail_unless (type & GST_PROBE_TYPE_BLOCK);
fail_unless (bool_user_data[0] == TRUE);
@ -915,6 +917,7 @@ block_async_second_no_flush (GstPad * pad, GstProbeType type,
bool_user_data[1] = TRUE;
GST_DEBUG ("removing second probe with id %lu", id);
gst_pad_remove_probe (pad, id);
return GST_PROBE_OK;
@ -929,6 +932,8 @@ block_async_first_no_flush (GstPad * pad, GstProbeType type, gpointer type_data,
fail_unless (type & GST_PROBE_TYPE_BLOCK);
GST_DEBUG ("first probe called");
if (++n_calls > 1)
/* we expect this callback to be called only once */
g_warn_if_reached ();
@ -939,12 +944,15 @@ block_async_first_no_flush (GstPad * pad, GstProbeType type, gpointer type_data,
fail_unless (bool_user_data[1] == FALSE);
fail_unless (bool_user_data[2] == FALSE);
GST_DEBUG ("removing first probe with id %lu", id);
gst_pad_remove_probe (pad, id);
GST_DEBUG ("adding second probe");
/* replace block_async_first with block_async_second so next time the pad is
* blocked the latter should be called */
id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK,
block_async_second_no_flush, user_data, NULL);
GST_DEBUG ("added probe with id %lu", id);
return GST_PROBE_OK;
}
@ -958,10 +966,13 @@ GST_START_TEST (test_block_async_replace_callback_no_flush)
fail_unless (pad != NULL);
gst_pad_set_active (pad, TRUE);
GST_DEBUG ("adding probe");
id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK,
block_async_first_no_flush, bool_user_data, NULL);
GST_DEBUG ("added probe with id %lu", id);
fail_if (id == 0);
GST_DEBUG ("pushing buffer");
gst_pad_push (pad, gst_buffer_new ());
fail_unless (bool_user_data[0] == TRUE);
fail_unless (bool_user_data[1] == TRUE);

View file

@ -32,42 +32,48 @@ static int n_data_probes = 0;
static int n_buffer_probes = 0;
static int n_event_probes = 0;
static gboolean
probe_do_nothing (GstPad * pad, GstMiniObject * obj, gpointer data)
static GstProbeReturn
probe_do_nothing (GstPad * pad, GstProbeType type, gpointer type_data,
gpointer data)
{
GstMiniObject *obj = type_data;
GST_DEBUG_OBJECT (pad, "is buffer:%d", GST_IS_BUFFER (obj));
return TRUE;
return GST_PROBE_OK;
}
static gboolean
data_probe (GstPad * pad, GstMiniObject * obj, gpointer data)
static GstProbeReturn
data_probe (GstPad * pad, GstProbeType type, gpointer type_data, gpointer data)
{
GstMiniObject *obj = type_data;
n_data_probes++;
GST_DEBUG_OBJECT (pad, "data probe %d", n_data_probes);
g_assert (GST_IS_BUFFER (obj) || GST_IS_EVENT (obj));
g_assert (data == SPECIAL_POINTER (0));
return TRUE;
return GST_PROBE_OK;
}
static gboolean
buffer_probe (GstPad * pad, GstBuffer * obj, gpointer data)
static GstProbeReturn
buffer_probe (GstPad * pad, GstProbeType type, gpointer type_data,
gpointer data)
{
GstBuffer *obj = type_data;
n_buffer_probes++;
GST_DEBUG_OBJECT (pad, "buffer probe %d", n_buffer_probes);
g_assert (GST_IS_BUFFER (obj));
g_assert (data == SPECIAL_POINTER (1));
return TRUE;
return GST_PROBE_OK;
}
static gboolean
event_probe (GstPad * pad, GstEvent * obj, gpointer data)
static GstProbeReturn
event_probe (GstPad * pad, GstProbeType type, gpointer type_data, gpointer data)
{
GstEvent *obj = type_data;
n_event_probes++;
GST_DEBUG_OBJECT (pad, "event probe %d [%s]",
n_event_probes, GST_EVENT_TYPE_NAME (obj));
g_assert (GST_IS_EVENT (obj));
g_assert (data == SPECIAL_POINTER (2));
return TRUE;
return GST_PROBE_OK;
}
GST_START_TEST (test_buffer_probe_n_times)
@ -89,20 +95,20 @@ GST_START_TEST (test_buffer_probe_n_times)
pad = gst_element_get_static_pad (fakesink, "sink");
/* add the probes we need for the test */
gst_pad_add_data_probe (pad, G_CALLBACK (data_probe), SPECIAL_POINTER (0),
NULL);
gst_pad_add_buffer_probe (pad, G_CALLBACK (buffer_probe), SPECIAL_POINTER (1),
NULL);
gst_pad_add_event_probe (pad, G_CALLBACK (event_probe), SPECIAL_POINTER (2),
gst_pad_add_probe (pad, GST_PROBE_TYPE_DATA, data_probe, SPECIAL_POINTER (0),
NULL);
gst_pad_add_probe (pad, GST_PROBE_TYPE_BUFFER, buffer_probe,
SPECIAL_POINTER (1), NULL);
gst_pad_add_probe (pad, GST_PROBE_TYPE_EVENT, event_probe,
SPECIAL_POINTER (2), NULL);
/* add some string probes just to test that the data is free'd
* properly as it should be */
gst_pad_add_data_probe (pad, G_CALLBACK (probe_do_nothing),
gst_pad_add_probe (pad, GST_PROBE_TYPE_DATA, probe_do_nothing,
g_strdup ("data probe string"), (GDestroyNotify) g_free);
gst_pad_add_buffer_probe (pad, G_CALLBACK (probe_do_nothing),
gst_pad_add_probe (pad, GST_PROBE_TYPE_BUFFER, probe_do_nothing,
g_strdup ("buffer probe string"), (GDestroyNotify) g_free);
gst_pad_add_event_probe (pad, G_CALLBACK (probe_do_nothing),
gst_pad_add_probe (pad, GST_PROBE_TYPE_EVENT, probe_do_nothing,
g_strdup ("event probe string"), (GDestroyNotify) g_free);
gst_object_unref (pad);
@ -131,37 +137,39 @@ static int n_data_probes_once = 0;
static int n_buffer_probes_once = 0;
static int n_event_probes_once = 0;
static gboolean
data_probe_once (GstPad * pad, GstMiniObject * obj, guint * data)
static GstProbeReturn
data_probe_once (GstPad * pad, GstProbeType type, GstMiniObject * obj,
guint * data)
{
n_data_probes_once++;
g_assert (GST_IS_BUFFER (obj) || GST_IS_EVENT (obj));
gst_pad_remove_data_probe (pad, *data);
gst_pad_remove_probe (pad, *data);
return TRUE;
return GST_PROBE_OK;
}
static gboolean
buffer_probe_once (GstPad * pad, GstBuffer * obj, guint * data)
static GstProbeReturn
buffer_probe_once (GstPad * pad, GstProbeType type, GstBuffer * obj,
guint * data)
{
n_buffer_probes_once++;
g_assert (GST_IS_BUFFER (obj));
gst_pad_remove_buffer_probe (pad, *data);
gst_pad_remove_probe (pad, *data);
return TRUE;
return GST_PROBE_OK;
}
static gboolean
event_probe_once (GstPad * pad, GstEvent * obj, guint * data)
static GstProbeReturn
event_probe_once (GstPad * pad, GstProbeType type, GstEvent * obj, guint * data)
{
n_event_probes_once++;
g_assert (GST_IS_EVENT (obj));
gst_pad_remove_event_probe (pad, *data);
gst_pad_remove_probe (pad, *data);
return TRUE;
return GST_PROBE_OK;
}
GST_START_TEST (test_buffer_probe_once)
@ -182,12 +190,15 @@ GST_START_TEST (test_buffer_probe_once)
gst_element_link (fakesrc, fakesink);
pad = gst_element_get_static_pad (fakesink, "sink");
id1 = gst_pad_add_data_probe (pad, G_CALLBACK (data_probe_once), &id1, NULL);
id1 =
gst_pad_add_probe (pad, GST_PROBE_TYPE_DATA,
(GstPadProbeCallback) data_probe_once, &id1, NULL);
id2 =
gst_pad_add_buffer_probe (pad, G_CALLBACK (buffer_probe_once), &id2,
NULL);
gst_pad_add_probe (pad, GST_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) buffer_probe_once, &id2, NULL);
id3 =
gst_pad_add_event_probe (pad, G_CALLBACK (event_probe_once), &id3, NULL);
gst_pad_add_probe (pad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) event_probe_once, &id3, NULL);
gst_object_unref (pad);
gst_element_set_state (pipeline, GST_STATE_PLAYING);

View file

@ -27,8 +27,9 @@
#include <gst/check/gstcheck.h>
#include <gst/base/gstbasesrc.h>
static gboolean
eos_event_counter (GstObject * pad, GstEvent * event, guint * p_num_eos)
static GstProbeReturn
eos_event_counter (GstObject * pad, GstProbeType type, GstEvent * event,
guint * p_num_eos)
{
fail_unless (event != NULL);
fail_unless (GST_IS_EVENT (event));
@ -36,7 +37,7 @@ eos_event_counter (GstObject * pad, GstEvent * event, guint * p_num_eos)
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS)
*p_num_eos += 1;
return TRUE;
return GST_PROBE_OK;
}
/* basesrc_eos_events_push_live_op:
@ -76,8 +77,8 @@ GST_START_TEST (basesrc_eos_events_push_live_op)
srcpad = gst_element_get_static_pad (src, "src");
fail_unless (srcpad != NULL);
probe = gst_pad_add_event_probe (srcpad,
G_CALLBACK (eos_event_counter), &num_eos, NULL);
probe = gst_pad_add_probe (srcpad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) eos_event_counter, &num_eos, NULL);
bus = gst_element_get_bus (pipe);
@ -106,7 +107,7 @@ GST_START_TEST (basesrc_eos_events_push_live_op)
/* make sure source hasn't sent a second one when going PAUSED => READY */
fail_unless (num_eos == 1);
gst_pad_remove_event_probe (srcpad, probe);
gst_pad_remove_probe (srcpad, probe);
gst_object_unref (srcpad);
gst_message_unref (msg);
gst_object_unref (bus);
@ -154,8 +155,8 @@ GST_START_TEST (basesrc_eos_events_push)
srcpad = gst_element_get_static_pad (src, "src");
fail_unless (srcpad != NULL);
probe = gst_pad_add_event_probe (srcpad,
G_CALLBACK (eos_event_counter), &num_eos, NULL);
probe = gst_pad_add_probe (srcpad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) eos_event_counter, &num_eos, NULL);
bus = gst_element_get_bus (pipe);
@ -177,7 +178,7 @@ GST_START_TEST (basesrc_eos_events_push)
/* make sure source hasn't sent a second one when going PAUSED => READY */
fail_unless (num_eos == 1);
gst_pad_remove_event_probe (srcpad, probe);
gst_pad_remove_probe (srcpad, probe);
gst_object_unref (srcpad);
gst_message_unref (msg);
gst_object_unref (bus);
@ -221,8 +222,8 @@ GST_START_TEST (basesrc_eos_events_pull_live_op)
srcpad = gst_element_get_static_pad (src, "src");
fail_unless (srcpad != NULL);
probe = gst_pad_add_event_probe (srcpad,
G_CALLBACK (eos_event_counter), &num_eos, NULL);
probe = gst_pad_add_probe (srcpad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) eos_event_counter, &num_eos, NULL);
gst_element_set_state (pipe, GST_STATE_PLAYING);
state_ret = gst_element_get_state (pipe, NULL, NULL, -1);
@ -247,7 +248,7 @@ GST_START_TEST (basesrc_eos_events_pull_live_op)
/* make sure source hasn't sent an EOS when going PAUSED => READY either */
fail_unless (num_eos == 0);
gst_pad_remove_event_probe (srcpad, probe);
gst_pad_remove_probe (srcpad, probe);
gst_object_unref (srcpad);
gst_object_unref (pipe);
}
@ -293,8 +294,8 @@ GST_START_TEST (basesrc_eos_events_pull)
srcpad = gst_element_get_static_pad (src, "src");
fail_unless (srcpad != NULL);
probe = gst_pad_add_event_probe (srcpad,
G_CALLBACK (eos_event_counter), &num_eos, NULL);
probe = gst_pad_add_probe (srcpad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) eos_event_counter, &num_eos, NULL);
bus = gst_element_get_bus (pipe);
@ -316,7 +317,7 @@ GST_START_TEST (basesrc_eos_events_pull)
/* make sure source hasn't sent an EOS when going PAUSED => READY either */
fail_unless (num_eos == 0);
gst_pad_remove_event_probe (srcpad, probe);
gst_pad_remove_probe (srcpad, probe);
gst_object_unref (srcpad);
gst_message_unref (msg);
gst_object_unref (bus);
@ -363,8 +364,8 @@ GST_START_TEST (basesrc_eos_events_push_live_eos)
srcpad = gst_element_get_static_pad (src, "src");
fail_unless (srcpad != NULL);
probe = gst_pad_add_event_probe (srcpad,
G_CALLBACK (eos_event_counter), &num_eos, NULL);
probe = gst_pad_add_probe (srcpad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) eos_event_counter, &num_eos, NULL);
bus = gst_element_get_bus (pipe);
@ -394,7 +395,7 @@ GST_START_TEST (basesrc_eos_events_push_live_eos)
/* make sure source hasn't sent a second one when going PAUSED => READY */
fail_unless (num_eos == 1);
gst_pad_remove_event_probe (srcpad, probe);
gst_pad_remove_probe (srcpad, probe);
gst_object_unref (srcpad);
gst_message_unref (msg);
gst_object_unref (bus);
@ -440,8 +441,8 @@ GST_START_TEST (basesrc_eos_events_pull_live_eos)
srcpad = gst_element_get_static_pad (src, "src");
fail_unless (srcpad != NULL);
probe = gst_pad_add_event_probe (srcpad,
G_CALLBACK (eos_event_counter), &num_eos, NULL);
probe = gst_pad_add_probe (srcpad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) eos_event_counter, &num_eos, NULL);
bus = gst_element_get_bus (pipe);
@ -471,7 +472,7 @@ GST_START_TEST (basesrc_eos_events_pull_live_eos)
/* make sure source hasn't sent a second one when going PAUSED => READY */
fail_unless (num_eos == 0);
gst_pad_remove_event_probe (srcpad, probe);
gst_pad_remove_probe (srcpad, probe);
gst_object_unref (srcpad);
gst_message_unref (msg);
gst_object_unref (bus);
@ -481,8 +482,9 @@ GST_START_TEST (basesrc_eos_events_pull_live_eos)
GST_END_TEST;
static gboolean
segment_event_catcher (GstObject * pad, GstEvent * event, gpointer * user_data)
static GstProbeReturn
segment_event_catcher (GstObject * pad, GstProbeType type, GstEvent * event,
gpointer * user_data)
{
GstEvent **last_event = (GstEvent **) user_data;
fail_unless (event != NULL);
@ -495,7 +497,7 @@ segment_event_catcher (GstObject * pad, GstEvent * event, gpointer * user_data)
*last_event = gst_event_copy (event);
}
return TRUE;
return GST_PROBE_OK;
}
/* basesrc_seek_events_rate_update:
@ -533,8 +535,8 @@ GST_START_TEST (basesrc_seek_events_rate_update)
probe_pad = gst_element_get_static_pad (sink, "sink");
fail_unless (probe_pad != NULL);
probe = gst_pad_add_event_probe (probe_pad,
G_CALLBACK (segment_event_catcher), &seg_event, NULL);
probe = gst_pad_add_probe (probe_pad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) segment_event_catcher, &seg_event, NULL);
/* prepare the seek */
rate_seek = gst_event_new_seek (0.5, GST_FORMAT_TIME, GST_SEEK_FLAG_NONE,
@ -577,7 +579,7 @@ GST_START_TEST (basesrc_seek_events_rate_update)
gst_event_parse_segment (seg_event, &segment);
fail_unless (segment->rate == 0.5);
gst_pad_remove_event_probe (probe_pad, probe);
gst_pad_remove_probe (probe_pad, probe);
gst_object_unref (probe_pad);
gst_message_unref (msg);
gst_event_unref (seg_event);

View file

@ -23,8 +23,9 @@
#include <gst/gst.h>
static gboolean
modify_caps (GstObject * pad, GstEvent * event, gpointer data)
static GstProbeReturn
modify_caps (GstObject * pad, GstProbeType type, GstEvent * event,
gpointer data)
{
GstElement *filter = GST_ELEMENT (data);
GstCaps *caps;
@ -33,14 +34,14 @@ modify_caps (GstObject * pad, GstEvent * event, gpointer data)
fail_unless (GST_IS_EVENT (event));
if (GST_EVENT_TYPE (event) != GST_EVENT_EOS)
return TRUE;
return GST_PROBE_OK;
/* trigger caps negotiation error */
caps = gst_caps_new_simple ("video/x-raw-rgb", NULL);
g_object_set (filter, "caps", caps, NULL);
gst_caps_unref (caps);
return TRUE;
return GST_PROBE_OK;
}
GST_START_TEST (test_queue)
@ -69,7 +70,9 @@ GST_START_TEST (test_queue)
fail_unless (queue != NULL);
pad = gst_element_get_static_pad (queue, "sink");
fail_unless (pad != NULL);
probe = gst_pad_add_event_probe (pad, G_CALLBACK (modify_caps), filter, NULL);
probe =
gst_pad_add_probe (pad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) modify_caps, filter, NULL);
bus = gst_element_get_bus (pipeline);
@ -85,7 +88,7 @@ GST_START_TEST (test_queue)
fail_unless_equals_int (gst_element_set_state (pipeline, GST_STATE_NULL),
GST_STATE_CHANGE_SUCCESS);
gst_pad_remove_event_probe (pad, probe);
gst_pad_remove_probe (pad, probe);
gst_object_unref (queue);
gst_object_unref (pad);
gst_object_unref (filter);