pad: implement pad block with probes

This commit is contained in:
Wim Taymans 2011-05-31 19:16:09 +02:00
parent a5bbf7f369
commit 8abc457a3b
10 changed files with 518 additions and 703 deletions

File diff suppressed because it is too large Load diff

View file

@ -471,32 +471,62 @@ typedef void (*GstPadFixateCapsFunction) (GstPad *pad, GstCaps *caps);
typedef gboolean (*GstPadDispatcherFunction) (GstPad *pad, gpointer data);
/**
* GstBlockType:
* @GST_BLOCK_TYPE_IDLE: The pad is idle
* @GST_BLOCK_TYPE_DATA: Data is queued on the pad
* @GST_BLOCK_TYPE_PUSH: Blocked on a push operation
* @GST_BLOCK_TYPE_PULL: Blocked on a pull operation
* GstProbeType:
* @GST_PROBE_TYPE_INVALID: invalid probe type
* @GST_PROBE_TYPE_IDLE: probe idle pads and block
* @GST_PROBE_TYPE_BLOCK: probe and block pads
* @GST_PROBE_TYPE_BUFFER: probe buffers
* @GST_PROBE_TYPE_BUFFER_LIST: probe buffer lists
* @GST_PROBE_TYPE_EVENT: probe events
* @GST_PROBE_TYPE_PUSH: probe push
* @GST_PROBE_TYPE_PULL: probe pull
*
* The different blocking types that can occur.
* The different probing types that can occur.
*/
typedef enum
{
GST_BLOCK_TYPE_IDLE = (1 << 0),
GST_BLOCK_TYPE_DATA = (1 << 1),
GST_BLOCK_TYPE_PUSH = (1 << 2),
GST_BLOCK_TYPE_PULL = (1 << 3),
} GstBlockType;
GST_PROBE_TYPE_INVALID = (1 << 0),
GST_PROBE_TYPE_IDLE = (1 << 1),
GST_PROBE_TYPE_BLOCK = (1 << 2),
GST_PROBE_TYPE_BUFFER = (1 << 3),
GST_PROBE_TYPE_BUFFER_LIST = (1 << 4),
GST_PROBE_TYPE_EVENT = (1 << 5),
GST_PROBE_TYPE_PUSH = (1 << 6),
GST_PROBE_TYPE_PULL = (1 << 7),
} GstProbeType;
#define GST_PROBE_TYPE_DATA (GST_PROBE_TYPE_BUFFER | GST_PROBE_TYPE_EVENT | GST_PROBE_TYPE_BUFFER_LIST)
/**
* GstPadBlockCallback:
* GstProbeResult:
* @GST_PROBE_RESULT_OK: normal probe return value
* @GST_PROBE_RESULT_DROP: drop data in data probes
* @GST_PROBE_RESULT_REMOVE: remove probe
* @GST_PROBE_RESULT_PASS: pass the data item in the block probe and block on
* the next item
*
* Different return values for the GstPadProbeCallback.
*/
typedef enum
{
GST_PROBE_OK,
GST_PROBE_DROP,
GST_PROBE_REMOVE,
GST_PROBE_PASS,
} GstProbeReturn;
/**
* GstPadProbeCallback
* @pad: the #GstPad that is blocked
* @type: the current blocking type
* @type: the current probe type
* @type_data: type specific data
* @user_data: the gpointer to optional user data.
*
* Callback used by gst_pad_block(). Gets called to notify about the current
* Callback used by gst_pad_add_probe(). Gets called to notify about the current
* blocking type.
*/
typedef void (*GstPadBlockCallback) (GstPad *pad, GstBlockType type, gpointer user_data);
typedef GstProbeReturn (*GstPadProbeCallback) (GstPad *pad, GstProbeType type,
gpointer type_data, gpointer user_data);
/**
* GstPadStickyEventsForeachFunction:
@ -617,11 +647,7 @@ struct _GstPad {
/*< public >*/ /* with LOCK */
/* block cond, mutex is from the object */
GCond *block_cond;
GstBlockType block_type;
GstPadBlockCallback block_callback;
gpointer block_data;
GDestroyNotify block_destroy_data;
gboolean block_callback_called;
GHookList probes;
/* the pad capabilities */
GstPadGetCapsFunction getcapsfunc;
@ -659,6 +685,7 @@ struct _GstPad {
* of handlers attached. */
gint do_buffer_signals;
gint do_event_signals;
gint num_blocked;
/*< private >*/
GstPadPrivate *priv;
@ -822,12 +849,12 @@ gboolean gst_pad_is_active (GstPad *pad);
gboolean gst_pad_activate_pull (GstPad *pad, gboolean active);
gboolean gst_pad_activate_push (GstPad *pad, gboolean active);
gboolean gst_pad_block (GstPad *pad,
GstBlockType type,
GstPadBlockCallback callback,
gulong gst_pad_add_probe (GstPad *pad,
GstProbeType mask,
GstPadProbeCallback callback,
gpointer user_data,
GDestroyNotify destroy_data);
void gst_pad_unblock (GstPad *pad);
void gst_pad_remove_probe (GstPad *pad, gulong id);
gboolean gst_pad_is_blocked (GstPad *pad);
gboolean gst_pad_is_blocking (GstPad *pad);

View file

@ -3127,218 +3127,6 @@ gst_pad_query_peer_convert (GstPad * pad, GstFormat src_format, gint64 src_val,
return ret;
}
/**
* gst_pad_add_data_probe:
* @pad: pad to add the data probe handler to
* @handler: function to call when data is passed over pad
* @data: (closure): data to pass along with the handler
* @notify: (allow-none): function to call when the probe is disconnected,
* or NULL
*
* Adds a "data probe" to a pad. This function will be called whenever data
* passes through a pad. In this case data means both events and buffers. The
* probe will be called with the data as an argument, meaning @handler should
* have the same callback signature as the #GstPad::have-data signal.
* Note that the data will have a reference count greater than 1, so it will
* be immutable -- you must not change it.
*
* For source pads, the probe will be called after the blocking function, if any
* (see gst_pad_set_blocked_async()), but before looking up the peer to chain
* to. For sink pads, the probe function will be called before configuring the
* sink with new caps, if any, and before calling the pad's chain function.
*
* Your data probe should return TRUE to let the data continue to flow, or FALSE
* to drop it. Dropping data is rarely useful, but occasionally comes in handy
* with events.
*
* Although probes are implemented internally by connecting @handler to the
* have-data signal on the pad, if you want to remove a probe it is insufficient
* to only call g_signal_handler_disconnect on the returned handler id. To
* remove a probe, use the appropriate function, such as
* gst_pad_remove_data_probe().
*
* The @notify function is called when the probe is disconnected and usually
* used to free @data.
*
* Returns: The handler id.
*
* Since: 0.10.20
*/
gulong
gst_pad_add_data_probe (GstPad * pad, GCallback handler,
gpointer data, GDestroyNotify notify)
{
gulong sigid;
g_return_val_if_fail (GST_IS_PAD (pad), 0);
g_return_val_if_fail (handler != NULL, 0);
GST_OBJECT_LOCK (pad);
/* we only expose a GDestroyNotify in our API because that's less confusing */
sigid = g_signal_connect_data (pad, "have-data", handler, data,
(GClosureNotify) notify, 0);
GST_PAD_DO_EVENT_SIGNALS (pad)++;
GST_PAD_DO_BUFFER_SIGNALS (pad)++;
GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad,
"adding data probe, now %d data, %d event probes",
GST_PAD_DO_BUFFER_SIGNALS (pad), GST_PAD_DO_EVENT_SIGNALS (pad));
GST_OBJECT_UNLOCK (pad);
return sigid;
}
/**
* gst_pad_add_event_probe:
* @pad: pad to add the event probe handler to
* @handler: function to call when events are passed over pad
* @data: (closure): data to pass along with the handler, or NULL
* @notify: (allow-none): function to call when probe is disconnected, or NULL
*
* Adds a probe that will be called for all events passing through a pad. See
* gst_pad_add_data_probe() for more information.
*
* The @notify function is called when the probe is disconnected and usually
* used to free @data.
*
* Returns: The handler id
*
* Since: 0.10.20
*/
gulong
gst_pad_add_event_probe (GstPad * pad, GCallback handler,
gpointer data, GDestroyNotify notify)
{
gulong sigid;
g_return_val_if_fail (GST_IS_PAD (pad), 0);
g_return_val_if_fail (handler != NULL, 0);
GST_OBJECT_LOCK (pad);
/* we only expose a GDestroyNotify in our API because that's less confusing */
sigid = g_signal_connect_data (pad, "have-data::event", handler, data,
(GClosureNotify) notify, 0);
GST_PAD_DO_EVENT_SIGNALS (pad)++;
GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "adding event probe, now %d probes",
GST_PAD_DO_EVENT_SIGNALS (pad));
GST_OBJECT_UNLOCK (pad);
return sigid;
}
/**
* gst_pad_add_buffer_probe:
* @pad: pad to add the buffer probe handler to
* @handler: function to call when buffer are passed over pad
* @data: (closure): data to pass along with the handler
* @notify: (allow-none): function to call when the probe is disconnected,
* or NULL
*
* Adds a probe that will be called for all buffers passing through a pad. See
* gst_pad_add_data_probe() for more information.
*
* The @notify function is called when the probe is disconnected and usually
* used to free @data.
*
* Returns: The handler id
*
* Since: 0.10.20
*/
gulong
gst_pad_add_buffer_probe (GstPad * pad, GCallback handler,
gpointer data, GDestroyNotify notify)
{
gulong sigid;
g_return_val_if_fail (GST_IS_PAD (pad), 0);
g_return_val_if_fail (handler != NULL, 0);
GST_OBJECT_LOCK (pad);
/* we only expose a GDestroyNotify in our API because that's less confusing */
sigid = g_signal_connect_data (pad, "have-data::buffer", handler, data,
(GClosureNotify) notify, 0);
GST_PAD_DO_BUFFER_SIGNALS (pad)++;
GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad, "adding buffer probe, now %d probes",
GST_PAD_DO_BUFFER_SIGNALS (pad));
GST_OBJECT_UNLOCK (pad);
return sigid;
}
/**
* gst_pad_remove_data_probe:
* @pad: pad to remove the data probe handler from
* @handler_id: handler id returned from gst_pad_add_data_probe
*
* Removes a data probe from @pad.
*/
void
gst_pad_remove_data_probe (GstPad * pad, guint handler_id)
{
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (handler_id > 0);
GST_OBJECT_LOCK (pad);
g_signal_handler_disconnect (pad, handler_id);
GST_PAD_DO_BUFFER_SIGNALS (pad)--;
GST_PAD_DO_EVENT_SIGNALS (pad)--;
GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad,
"removed data probe, now %d event, %d buffer probes",
GST_PAD_DO_EVENT_SIGNALS (pad), GST_PAD_DO_BUFFER_SIGNALS (pad));
GST_OBJECT_UNLOCK (pad);
}
/**
* gst_pad_remove_event_probe:
* @pad: pad to remove the event probe handler from
* @handler_id: handler id returned from gst_pad_add_event_probe
*
* Removes an event probe from @pad.
*/
void
gst_pad_remove_event_probe (GstPad * pad, guint handler_id)
{
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (handler_id > 0);
GST_OBJECT_LOCK (pad);
g_signal_handler_disconnect (pad, handler_id);
GST_PAD_DO_EVENT_SIGNALS (pad)--;
GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad,
"removed event probe, now %d event probes",
GST_PAD_DO_EVENT_SIGNALS (pad));
GST_OBJECT_UNLOCK (pad);
}
/**
* gst_pad_remove_buffer_probe:
* @pad: pad to remove the buffer probe handler from
* @handler_id: handler id returned from gst_pad_add_buffer_probe
*
* Removes a buffer probe from @pad.
*/
void
gst_pad_remove_buffer_probe (GstPad * pad, guint handler_id)
{
g_return_if_fail (GST_IS_PAD (pad));
g_return_if_fail (handler_id > 0);
GST_OBJECT_LOCK (pad);
g_signal_handler_disconnect (pad, handler_id);
GST_PAD_DO_BUFFER_SIGNALS (pad)--;
GST_CAT_DEBUG_OBJECT (GST_CAT_PADS, pad,
"removed buffer probe, now %d buffer probes",
GST_PAD_DO_BUFFER_SIGNALS (pad));
GST_OBJECT_UNLOCK (pad);
}
/**
* gst_element_found_tags_for_pad:
* @element: element for which to post taglist to bus.

View file

@ -936,28 +936,6 @@ GstPad * gst_bin_find_unlinked_pad (GstBin *bin, GstPadDire
GstBuffer * gst_buffer_merge (GstBuffer * buf1, GstBuffer * buf2);
GstBuffer * gst_buffer_join (GstBuffer * buf1, GstBuffer * buf2);
/* probes */
gulong gst_pad_add_data_probe (GstPad * pad,
GCallback handler,
gpointer data,
GDestroyNotify notify);
void gst_pad_remove_data_probe (GstPad * pad, guint handler_id);
gulong gst_pad_add_event_probe (GstPad * pad,
GCallback handler,
gpointer data,
GDestroyNotify notify);
void gst_pad_remove_event_probe (GstPad * pad, guint handler_id);
gulong gst_pad_add_buffer_probe (GstPad * pad,
GCallback handler,
gpointer data,
GDestroyNotify notify);
void gst_pad_remove_buffer_probe (GstPad * pad, guint handler_id);
/* tag emission utility functions */
void gst_element_found_tags_for_pad (GstElement * element,
GstPad * pad,

View file

@ -38,7 +38,8 @@ static gulong id;
/* called for every buffer. Waits until the global "buf" variable is unset,
* then sets it to the buffer received, and signals. */
static gboolean
buffer_probe (GstPad * pad, GstBuffer * buffer, gpointer unused)
buffer_probe (GstPad * pad, GstProbeType type, GstBuffer * buffer,
gpointer unused)
{
g_mutex_lock (lock);
@ -81,7 +82,8 @@ gst_buffer_straw_start_pipeline (GstElement * bin, GstPad * pad)
{
GstStateChangeReturn ret;
id = gst_pad_add_buffer_probe (pad, G_CALLBACK (buffer_probe), NULL, NULL);
id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) buffer_probe, NULL, NULL);
cond = g_cond_new ();
lock = g_mutex_new ();
@ -149,7 +151,7 @@ gst_buffer_straw_stop_pipeline (GstElement * bin, GstPad * pad)
if (buf)
gst_buffer_unref (buf);
buf = NULL;
gst_pad_remove_buffer_probe (pad, (guint) id);
gst_pad_remove_probe (pad, (guint) id);
id = 0;
g_cond_signal (cond);
g_mutex_unlock (lock);

View file

@ -42,7 +42,7 @@ struct _GstStreamConsistency
};
static gboolean
source_pad_data_cb (GstPad * pad, GstMiniObject * data,
source_pad_data_cb (GstPad * pad, GstProbeType type, GstMiniObject * data,
GstStreamConsistency * consist)
{
if (GST_IS_BUFFER (data)) {
@ -118,8 +118,8 @@ gst_consistency_checker_new (GstPad * pad)
consist = g_new0 (GstStreamConsistency, 1);
consist->pad = g_object_ref (pad);
consist->probeid =
gst_pad_add_data_probe (pad, (GCallback) source_pad_data_cb, consist,
NULL);
gst_pad_add_probe (pad, GST_PROBE_TYPE_DATA,
(GstPadProbeCallback) source_pad_data_cb, consist, NULL);
return consist;
}
@ -154,7 +154,7 @@ void
gst_consistency_checker_free (GstStreamConsistency * consist)
{
/* Remove the data probe */
gst_pad_remove_data_probe (consist->pad, consist->probeid);
gst_pad_remove_probe (consist->pad, consist->probeid);
g_object_unref (consist->pad);
g_free (consist);
}

View file

@ -264,14 +264,15 @@ static GTimeVal sent_event_time;
static GstEvent *got_event_before_q, *got_event_after_q;
static GTimeVal got_event_time;
static gboolean
event_probe (GstPad * pad, GstMiniObject ** data, gpointer user_data)
static GstProbeReturn
event_probe (GstPad * pad, GstProbeType type, GstMiniObject * data,
gpointer user_data)
{
gboolean before_q = (gboolean) GPOINTER_TO_INT (user_data);
fail_unless (GST_IS_EVENT (data));
GST_DEBUG ("event probe called %p", data);
GST_DEBUG ("event probe called");
fail_unless (GST_IS_EVENT (data));
if (before_q) {
switch (GST_EVENT_TYPE (GST_EVENT (data))) {
@ -304,7 +305,7 @@ event_probe (GstPad * pad, GstMiniObject ** data, gpointer user_data)
}
}
return TRUE;
return GST_PROBE_OK;
}
@ -318,6 +319,7 @@ typedef struct
static void
signal_data_init (SignalData * data)
{
GST_DEBUG ("init %p", data);
data->lock = g_mutex_new ();
data->cond = g_cond_new ();
data->signaled = FALSE;
@ -326,6 +328,7 @@ signal_data_init (SignalData * data)
static void
signal_data_cleanup (SignalData * data)
{
GST_DEBUG ("free %p", data);
g_mutex_free (data->lock);
g_cond_free (data->cond);
}
@ -336,6 +339,7 @@ signal_data_signal (SignalData * data)
g_mutex_lock (data->lock);
data->signaled = TRUE;
g_cond_broadcast (data->cond);
GST_DEBUG ("signaling %p", data);
g_mutex_unlock (data->lock);
}
@ -343,17 +347,24 @@ static void
signal_data_wait (SignalData * data)
{
g_mutex_lock (data->lock);
GST_DEBUG ("signal wait %p", data);
while (!data->signaled)
g_cond_wait (data->cond, data->lock);
GST_DEBUG ("signal wait done %p", data);
g_mutex_unlock (data->lock);
}
static void
signal_blocked (GstPad * pad, GstBlockType type, gpointer user_data)
static GstProbeReturn
signal_blocked (GstPad * pad, GstProbeType type, gpointer type_data,
gpointer user_data)
{
SignalData *data = (SignalData *) user_data;
GST_DEBUG ("signal called %p", data);
signal_data_signal (data);
GST_DEBUG ("signal done %p", data);
return GST_PROBE_OK;
}
static void test_event
@ -364,6 +375,7 @@ static void test_event
GstPad *peer;
gint i;
SignalData data;
gulong id;
got_event_before_q = got_event_after_q = NULL;
@ -382,8 +394,9 @@ static void test_event
signal_data_init (&data);
/* We block the pad so the stream lock is released and we can send the event */
fail_unless (gst_pad_block (fake_srcpad, GST_BLOCK_TYPE_DATA,
signal_blocked, &data, NULL) == TRUE);
id = gst_pad_add_probe (fake_srcpad, GST_PROBE_TYPE_BLOCK,
signal_blocked, &data, NULL);
fail_unless (id != 0);
signal_data_wait (&data);
@ -392,8 +405,7 @@ static void test_event
gst_pad_send_event (peer, event);
gst_object_unref (peer);
gst_pad_unblock (fake_srcpad);
signal_data_cleanup (&data);
gst_pad_remove_probe (fake_srcpad, id);
if (expect_before_q) {
/* Wait up to 5 seconds for the event to appear */
@ -428,6 +440,8 @@ static void test_event
gst_event_unref (got_event_after_q);
got_event_before_q = got_event_after_q = NULL;
signal_data_cleanup (&data);
}
static gint64
@ -464,12 +478,12 @@ GST_START_TEST (send_custom_events)
/* add pad-probes to faksrc.src and fakesink.sink */
fail_if ((srcpad = gst_element_get_static_pad (fakesrc, "src")) == NULL);
gst_pad_add_event_probe (srcpad, (GCallback) event_probe,
GINT_TO_POINTER (TRUE), NULL);
gst_pad_add_probe (srcpad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) event_probe, GINT_TO_POINTER (TRUE), NULL);
fail_if ((sinkpad = gst_element_get_static_pad (fakesink, "sink")) == NULL);
gst_pad_add_event_probe (sinkpad, (GCallback) event_probe,
GINT_TO_POINTER (FALSE), NULL);
gst_pad_add_probe (sinkpad, GST_PROBE_TYPE_EVENT,
(GstPadProbeCallback) event_probe, GINT_TO_POINTER (FALSE), NULL);
/* Upstream events */
test_event (pipeline, GST_EVENT_CUSTOM_UPSTREAM, sinkpad, TRUE, srcpad);

View file

@ -478,8 +478,9 @@ typedef struct
GCond *cond;
} BlockData;
static void
block_callback (GstPad * pad, GstBlockType type, gpointer user_data)
static GstProbeReturn
block_callback (GstPad * pad, GstProbeType type, gpointer type_data,
gpointer user_data)
{
BlockData *block_data = (BlockData *) user_data;
@ -487,6 +488,8 @@ block_callback (GstPad * pad, GstBlockType type, gpointer user_data)
GST_DEBUG ("blocked\n");
g_cond_signal (block_data->cond);
g_mutex_unlock (block_data->mutex);
return GST_PROBE_OK;
}
GST_START_TEST (test_ghost_pads_block)
@ -514,7 +517,7 @@ GST_START_TEST (test_ghost_pads_block)
block_data.cond = g_cond_new ();
g_mutex_lock (block_data.mutex);
gst_pad_block (srcghost, GST_BLOCK_TYPE_DATA, block_callback, &block_data,
gst_pad_add_probe (srcghost, GST_PROBE_TYPE_DATA, block_callback, &block_data,
NULL);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
/* and wait now */
@ -556,7 +559,7 @@ GST_START_TEST (test_ghost_pads_probes)
block_data.cond = g_cond_new ();
g_mutex_lock (block_data.mutex);
gst_pad_block (srcghost, GST_BLOCK_TYPE_DATA, block_callback, &block_data,
gst_pad_add_probe (srcghost, GST_PROBE_TYPE_DATA, block_callback, &block_data,
NULL);
gst_element_set_state (GST_ELEMENT (pipeline), GST_STATE_PLAYING);
/* and wait now */

View file

@ -249,14 +249,16 @@ GST_START_TEST (test_name_is_valid)
GST_END_TEST;
static gboolean
_probe_handler (GstPad * pad, GstBuffer * buffer, gpointer userdata)
static GstProbeReturn
_probe_handler (GstPad * pad, GstProbeType type, GstBuffer * buffer,
gpointer userdata)
{
gint ret = GPOINTER_TO_INT (userdata);
if (ret == 1)
return TRUE;
return FALSE;
return GST_PROBE_OK;
return GST_PROBE_DROP;
}
GST_START_TEST (test_push_unlinked)
@ -276,6 +278,15 @@ GST_START_TEST (test_push_unlinked)
gst_pad_set_caps (src, caps);
ASSERT_CAPS_REFCOUNT (caps, "caps", 2);
/* pushing on an inactive pad will return wrong state */
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_WRONG_STATE);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_set_active (src, TRUE);
/* pushing on an unlinked pad will drop the buffer */
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
@ -285,25 +296,25 @@ GST_START_TEST (test_push_unlinked)
/* adding a probe that returns FALSE will drop the buffer without trying
* to chain */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (0), NULL);
id = gst_pad_add_probe (src, GST_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) _probe_handler, GINT_TO_POINTER (0), NULL);
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_remove_buffer_probe (src, id);
gst_pad_remove_probe (src, id);
/* adding a probe that returns TRUE will still chain the buffer,
* and hence drop because pad is unlinked */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (1), NULL);
id = gst_pad_add_probe (src, GST_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) _probe_handler, GINT_TO_POINTER (1), NULL);
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_NOT_LINKED);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_remove_buffer_probe (src, id);
gst_pad_remove_probe (src, id);
/* cleanup */
@ -376,23 +387,23 @@ GST_START_TEST (test_push_linked)
/* adding a probe that returns FALSE will drop the buffer without trying
* to chain */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (0), NULL);
id = gst_pad_add_probe (src, GST_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) _probe_handler, GINT_TO_POINTER (0), NULL);
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 1);
gst_buffer_unref (buffer);
gst_pad_remove_buffer_probe (src, id);
gst_pad_remove_probe (src, id);
fail_unless_equals_int (g_list_length (buffers), 0);
/* adding a probe that returns TRUE will still chain the buffer */
id = gst_pad_add_buffer_probe (src, (GCallback) _probe_handler,
GINT_TO_POINTER (1), NULL);
id = gst_pad_add_probe (src, GST_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) _probe_handler, GINT_TO_POINTER (1), NULL);
buffer = gst_buffer_new ();
gst_buffer_ref (buffer);
fail_unless (gst_pad_push (src, buffer) == GST_FLOW_OK);
gst_pad_remove_buffer_probe (src, id);
gst_pad_remove_probe (src, id);
ASSERT_MINI_OBJECT_REFCOUNT (buffer, "buffer", 2);
gst_buffer_unref (buffer);
@ -681,12 +692,15 @@ GST_START_TEST (test_sink_unref_unlink)
GST_END_TEST;
static void
block_async_cb (GstPad * pad, GstBlockType type, gpointer user_data)
static gulong id;
static GstProbeReturn
block_async_cb (GstPad * pad, GstProbeType type, gpointer type_data,
gpointer user_data)
{
gboolean *bool_user_data = (gboolean *) user_data;
fail_unless ((type & GST_BLOCK_TYPE_DATA) != 0);
fail_unless ((type & GST_PROBE_TYPE_BLOCK) != 0);
/* here we should have blocked == 0 unblocked == 0 */
fail_unless (bool_user_data[0] == FALSE);
@ -694,8 +708,10 @@ block_async_cb (GstPad * pad, GstBlockType type, gpointer user_data)
bool_user_data[0] = TRUE;
gst_pad_unblock (pad);
gst_pad_remove_probe (pad, id);
bool_user_data[1] = TRUE;
return GST_PROBE_OK;
}
GST_START_TEST (test_block_async)
@ -709,7 +725,8 @@ GST_START_TEST (test_block_async)
fail_unless (pad != NULL);
gst_pad_set_active (pad, TRUE);
gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_cb, &data, NULL);
id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK, block_async_cb, &data,
NULL);
fail_unless (data[0] == FALSE);
fail_unless (data[1] == FALSE);
@ -786,13 +803,16 @@ block_async_full_destroy (gpointer user_data)
*state = 2;
}
static void
block_async_full_cb (GstPad * pad, GstBlockType type, gpointer user_data)
static GstProbeReturn
block_async_full_cb (GstPad * pad, GstProbeType type, gpointer type_data,
gpointer user_data)
{
*(gint *) user_data = (gint) TRUE;
gst_pad_push_event (pad, gst_event_new_flush_start ());
GST_DEBUG ("setting state to 1");
return GST_PROBE_OK;
}
GST_START_TEST (test_block_async_full_destroy)
@ -800,12 +820,13 @@ GST_START_TEST (test_block_async_full_destroy)
GstPad *pad;
/* 0 = unblocked, 1 = blocked, 2 = destroyed */
gint state = 0;
gulong id;
pad = gst_pad_new ("src", GST_PAD_SRC);
fail_unless (pad != NULL);
gst_pad_set_active (pad, TRUE);
gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb,
id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK, block_async_full_cb,
&state, block_async_full_destroy);
fail_unless (state == 0);
@ -815,25 +836,8 @@ GST_START_TEST (test_block_async_full_destroy)
fail_unless (state == 1);
gst_pad_push_event (pad, gst_event_new_flush_stop ());
/* pad was already blocked so nothing happens */
gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb,
&state, block_async_full_destroy);
fail_unless (state == 1);
/* unblock callback is called */
gst_pad_unblock (pad);
fail_unless (state == 2);
/* block with the same data, nothing is called */
state = 1;
gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb,
&state, block_async_full_destroy);
fail_unless (state == 1);
/* now change user_data (to NULL in this case) so destroy_notify should be
* called */
state = 1;
gst_pad_unblock (pad);
gst_pad_remove_probe (pad, id);
fail_unless (state == 2);
gst_object_unref (pad);
@ -846,12 +850,13 @@ GST_START_TEST (test_block_async_full_destroy_dispose)
GstPad *pad;
/* 0 = unblocked, 1 = blocked, 2 = destroyed */
gint state = 0;
gulong id;
pad = gst_pad_new ("src", GST_PAD_SRC);
fail_unless (pad != NULL);
gst_pad_set_active (pad, TRUE);
gst_pad_block (pad, GST_BLOCK_TYPE_DATA, block_async_full_cb,
id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK, block_async_full_cb,
&state, block_async_full_destroy);
gst_pad_push (pad, gst_buffer_new ());
@ -860,7 +865,7 @@ GST_START_TEST (test_block_async_full_destroy_dispose)
fail_unless_equals_int (state, 1);
gst_pad_push_event (pad, gst_event_new_flush_stop ());
/* gst_pad_dispose calls the destroy_notify function if necessary */
/* gst_BLOCK calls the destroy_notify function if necessary */
gst_object_unref (pad);
fail_unless_equals_int (state, 2);
@ -896,13 +901,13 @@ unblock_async_not_called (GstPad * pad, gboolean blocked, gpointer user_data)
}
#endif
static void
block_async_second_no_flush (GstPad * pad, GstBlockType type,
gpointer user_data)
static GstProbeReturn
block_async_second_no_flush (GstPad * pad, GstProbeType type,
gpointer type_data, gpointer user_data)
{
gboolean *bool_user_data = (gboolean *) user_data;
fail_unless (type & GST_BLOCK_TYPE_DATA);
fail_unless (type & GST_PROBE_TYPE_BLOCK);
fail_unless (bool_user_data[0] == TRUE);
fail_unless (bool_user_data[1] == FALSE);
@ -910,16 +915,19 @@ block_async_second_no_flush (GstPad * pad, GstBlockType type,
bool_user_data[1] = TRUE;
gst_pad_unblock (pad);
gst_pad_remove_probe (pad, id);
return GST_PROBE_OK;
}
static void
block_async_first_no_flush (GstPad * pad, GstBlockType type, gpointer user_data)
static GstProbeReturn
block_async_first_no_flush (GstPad * pad, GstProbeType type, gpointer type_data,
gpointer user_data)
{
static int n_calls = 0;
gboolean *bool_user_data = (gboolean *) user_data;
fail_unless (type & GST_BLOCK_TYPE_DATA);
fail_unless (type & GST_PROBE_TYPE_BLOCK);
if (++n_calls > 1)
/* we expect this callback to be called only once */
@ -931,12 +939,14 @@ block_async_first_no_flush (GstPad * pad, GstBlockType type, gpointer user_data)
fail_unless (bool_user_data[1] == FALSE);
fail_unless (bool_user_data[2] == FALSE);
gst_pad_unblock (pad);
gst_pad_remove_probe (pad, id);
/* replace block_async_first with block_async_second so next time the pad is
* blocked the latter should be called */
fail_unless (gst_pad_block (pad, GST_BLOCK_TYPE_DATA,
block_async_second_no_flush, user_data, NULL));
id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK,
block_async_second_no_flush, user_data, NULL);
return GST_PROBE_OK;
}
GST_START_TEST (test_block_async_replace_callback_no_flush)
@ -948,8 +958,9 @@ GST_START_TEST (test_block_async_replace_callback_no_flush)
fail_unless (pad != NULL);
gst_pad_set_active (pad, TRUE);
fail_unless (gst_pad_block (pad, GST_BLOCK_TYPE_DATA,
block_async_first_no_flush, bool_user_data, NULL));
id = gst_pad_add_probe (pad, GST_PROBE_TYPE_BLOCK,
block_async_first_no_flush, bool_user_data, NULL);
fail_if (id == 0);
gst_pad_push (pad, gst_buffer_new ());
fail_unless (bool_user_data[0] == TRUE);

View file

@ -238,7 +238,7 @@ static GMutex *probe_lock;
static GCond *probe_cond;
static gboolean
sink_pad_probe (GstPad * pad, GstBuffer * buffer,
sink_pad_probe (GstPad * pad, GstProbeType type, GstBuffer * buffer,
GstClockTime * first_timestamp)
{
fail_if (GST_BUFFER_TIMESTAMP (buffer) == GST_CLOCK_TIME_NONE,
@ -274,7 +274,8 @@ GST_START_TEST (test_base_time)
gst_element_link (fakesrc, fakesink);
sink = gst_element_get_static_pad (fakesink, "sink");
gst_pad_add_buffer_probe (sink, G_CALLBACK (sink_pad_probe), &observed, NULL);
gst_pad_add_probe (sink, GST_PROBE_TYPE_BUFFER,
(GstPadProbeCallback) sink_pad_probe, &observed, NULL);
fail_unless (gst_element_set_state (pipeline, GST_STATE_PAUSED)
== GST_STATE_CHANGE_NO_PREROLL, "expected no-preroll from live pipeline");