pad: change the semantics of get/pull_range a little

Make it so that one can specify a buffer for get/pull_range where the downstream
element should write into. When passing NULL, upstream should allocate a buffer,
like in 0.10.
We also need to change the probes a little because before the pull probe, there
could already be a buffer passed. This then allows us to use the same PROBE
macro for before and after pulling.
While we're at the probes, make the query probe more powerful by handling the
GST_PAD_PROBE_DROP return value. Returning _DROP from a query probe will now
return TRUE upstream and will not forward the probe to the peer or handler.
Also handle _DROP for get/pull_range properly by not dispatching to the
peer/handler or by generating EOS when the probe returns DROP and no buffer.
Make filesrc handle the non-NULL buffer passed in the get_range function and
skip the allocation in that case, writing directly into the downstream provided
buffer.
Update tests because now we need to make sure to not pass a random value in the
buffer pointer to get/pull_range
This commit is contained in:
Wim Taymans 2012-03-16 21:37:10 +01:00
parent b54b886fc5
commit db1318ab4a
6 changed files with 111 additions and 46 deletions

View file

@ -2937,24 +2937,6 @@ no_match:
}
}
#define PROBE_PRE_PULL(pad,mask,data,offs,size,label,probed,defaultval) \
G_STMT_START { \
if (G_UNLIKELY (pad->num_probes)) { \
/* we start with passing NULL as the data item */ \
GstPadProbeInfo info = { mask, 0, NULL, offs, size }; \
ret = do_probe_callbacks (pad, &info, defaultval); \
/* store the possibly updated data item */ \
data = GST_PAD_PROBE_INFO_DATA (&info); \
/* if something went wrong, exit */ \
if (G_UNLIKELY (ret != defaultval && ret != GST_FLOW_OK)) \
goto label; \
/* otherwise check if the probe retured data */ \
if (G_UNLIKELY (data != NULL)) \
goto probed; \
} \
} G_STMT_END
/* a probe that does not take or return any data */
#define PROBE_NO_DATA(pad,mask,label,defaultval) \
G_STMT_START { \
@ -2967,21 +2949,24 @@ no_match:
} \
} G_STMT_END
#define PROBE_FULL(pad,mask,data,offs,size,label,defaultval) \
#define PROBE_FULL(pad,mask,data,offs,size,label) \
G_STMT_START { \
if (G_UNLIKELY (pad->num_probes)) { \
/* pass the data item */ \
GstPadProbeInfo info = { mask, 0, data, offs, size }; \
ret = do_probe_callbacks (pad, &info, defaultval); \
ret = do_probe_callbacks (pad, &info, GST_FLOW_OK); \
/* store the possibly updated data item */ \
data = GST_PAD_PROBE_INFO_DATA (&info); \
if (G_UNLIKELY (ret != defaultval && ret != GST_FLOW_OK)) \
/* if something went wrong, exit */ \
if (G_UNLIKELY (ret != GST_FLOW_OK)) \
goto label; \
} \
} G_STMT_END
#define PROBE_PUSH(pad,mask,data,label) \
PROBE_FULL(pad, mask, data, -1, -1, label, GST_FLOW_OK);
#define PROBE_PULL(pad,mask,data,offs,size,label) \
PROBE_FULL(pad, mask, data, offs, size, label, GST_FLOW_OK);
#define PROBE_PUSH(pad,mask,data,label) \
PROBE_FULL(pad, mask, data, -1, -1, label);
#define PROBE_PULL(pad,mask,data,offs,size,label) \
PROBE_FULL(pad, mask, data, offs, size, label);
static GstFlowReturn
do_probe_callbacks (GstPad * pad, GstPadProbeInfo * info,
@ -3310,7 +3295,15 @@ probe_stopped:
GST_OBJECT_UNLOCK (pad);
if (G_UNLIKELY (serialized))
GST_PAD_STREAM_UNLOCK (pad);
return FALSE;
/* if a probe dropped, we don't sent it further but assume that the probe
* answered the query and return TRUE */
if (ret == GST_FLOW_CUSTOM_SUCCESS)
res = TRUE;
else
res = FALSE;
return res;
}
}
@ -3421,7 +3414,15 @@ probe_stopped:
{
GST_DEBUG_OBJECT (pad, "probe stopped: %s", gst_flow_get_name (ret));
GST_OBJECT_UNLOCK (pad);
return FALSE;
/* if a probe dropped, we don't sent it further but assume that the probe
* answered the query and return TRUE */
if (ret == GST_FLOW_CUSTOM_SUCCESS)
res = TRUE;
else
res = FALSE;
return res;
}
}
@ -3830,10 +3831,11 @@ gst_pad_get_range_unchecked (GstPad * pad, guint64 offset, guint size,
if (G_UNLIKELY ((ret = check_sticky (pad))) != GST_FLOW_OK)
goto events_error;
/* when one of the probes returns a buffer, probed_data will be called and we
* skip calling the getrange function */
PROBE_PRE_PULL (pad, GST_PAD_PROBE_TYPE_PULL | GST_PAD_PROBE_TYPE_BLOCK,
*buffer, offset, size, probe_stopped, probed_data, GST_FLOW_OK);
/* when one of the probes returns DROPPED, probe_stopped will be called
* and we skip calling the getrange function, *buffer should then contain a
* valid result buffer */
PROBE_PULL (pad, GST_PAD_PROBE_TYPE_PULL | GST_PAD_PROBE_TYPE_BLOCK,
*buffer, offset, size, probe_stopped);
ACQUIRE_PARENT (pad, parent, no_parent);
GST_OBJECT_UNLOCK (pad);
@ -3907,14 +3909,31 @@ probe_stopped:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"probe returned %s", gst_flow_get_name (ret));
if (ret == GST_FLOW_CUSTOM_SUCCESS) {
if (*buffer) {
/* the probe filled the buffer and asks us to not call the getrange
* anymore, we continue with the post probes then. */
GST_DEBUG_OBJECT (pad, "handled buffer");
ret = GST_FLOW_OK;
goto probed_data;
} else {
/* no buffer, we are EOS */
GST_DEBUG_OBJECT (pad, "no buffer, return EOS");
ret = GST_FLOW_EOS;
}
}
GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
return ret;
}
probe_stopped_unref:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"probe returned %s", gst_flow_get_name (ret));
/* if we drop here, it signals EOS */
if (ret == GST_FLOW_CUSTOM_SUCCESS)
ret = GST_FLOW_EOS;
GST_OBJECT_UNLOCK (pad);
GST_PAD_STREAM_UNLOCK (pad);
gst_buffer_unref (*buffer);
@ -3948,6 +3967,9 @@ get_range_failed:
* installed (see gst_pad_set_getrange_function()) this function returns
* #GST_FLOW_NOT_SUPPORTED.
*
* @buffer must point to a variable holding NULL or a variable that
* points to a #GstBuffer that will be filled with the result data.
*
* This is a lowlevel function. Usualy gst_pad_pull_range() is used.
*
* Returns: a #GstFlowReturn from the pad.
@ -3961,6 +3983,8 @@ gst_pad_get_range (GstPad * pad, guint64 offset, guint size,
g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_PAD_IS_SRC (pad), GST_FLOW_ERROR);
g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
g_return_val_if_fail (*buffer == NULL
|| GST_IS_BUFFER (*buffer), GST_FLOW_ERROR);
return gst_pad_get_range_unchecked (pad, offset, size, buffer);
}
@ -3983,6 +4007,9 @@ gst_pad_get_range (GstPad * pad, guint64 offset, guint size,
* See gst_pad_get_range() for a list of return values and for the
* semantics of the arguments of this function.
*
* @buffer must point to a variable holding NULL or a variable that
* points to a #GstBuffer that will be filled with the result data.
*
* Returns: a #GstFlowReturn from the peer pad.
* When this function returns #GST_FLOW_OK, @buffer will contain a valid
* #GstBuffer that should be freed with gst_buffer_unref() after usage.
@ -4001,6 +4028,8 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
g_return_val_if_fail (GST_IS_PAD (pad), GST_FLOW_ERROR);
g_return_val_if_fail (GST_PAD_IS_SINK (pad), GST_FLOW_ERROR);
g_return_val_if_fail (buffer != NULL, GST_FLOW_ERROR);
g_return_val_if_fail (*buffer == NULL
|| GST_IS_BUFFER (*buffer), GST_FLOW_ERROR);
GST_OBJECT_LOCK (pad);
if (G_UNLIKELY (GST_PAD_IS_FLUSHING (pad)))
@ -4009,10 +4038,11 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
if (G_UNLIKELY (GST_PAD_MODE (pad) != GST_PAD_MODE_PULL))
goto wrong_mode;
/* when one of the probes returns a buffer, probed_data will be called and we
* skip calling the peer getrange function */
PROBE_PRE_PULL (pad, GST_PAD_PROBE_TYPE_PULL | GST_PAD_PROBE_TYPE_BLOCK,
*buffer, offset, size, pre_probe_stopped, probed_data, GST_FLOW_OK);
/* when one of the probes returns DROPPED, probe_stopped will be
* called and we skip calling the peer getrange function. *buffer should then
* contain a valid buffer */
PROBE_PULL (pad, GST_PAD_PROBE_TYPE_PULL | GST_PAD_PROBE_TYPE_BLOCK,
*buffer, offset, size, probe_stopped);
if (G_UNLIKELY ((peer = GST_PAD_PEER (pad)) == NULL))
goto not_linked;
@ -4030,7 +4060,7 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
if (pad->priv->using == 0) {
/* pad is not active anymore, trigger idle callbacks */
PROBE_NO_DATA (pad, GST_PAD_PROBE_TYPE_PULL | GST_PAD_PROBE_TYPE_IDLE,
post_probe_stopped, ret);
probe_stopped_unref, ret);
}
if (G_UNLIKELY (ret != GST_FLOW_OK))
@ -4038,7 +4068,7 @@ gst_pad_pull_range (GstPad * pad, guint64 offset, guint size,
probed_data:
PROBE_PULL (pad, GST_PAD_PROBE_TYPE_PULL | GST_PAD_PROBE_TYPE_BUFFER,
*buffer, offset, size, post_probe_stopped);
*buffer, offset, size, probe_stopped_unref);
GST_OBJECT_UNLOCK (pad);
@ -4059,10 +4089,23 @@ wrong_mode:
GST_OBJECT_UNLOCK (pad);
return GST_FLOW_ERROR;
}
pre_probe_stopped:
probe_stopped:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad, "pre probe returned %s",
gst_flow_get_name (ret));
if (ret == GST_FLOW_CUSTOM_SUCCESS) {
if (*buffer) {
/* the probe filled the buffer and asks us to not forward to the peer
* anymore, we continue with the post probes then */
GST_DEBUG_OBJECT (pad, "handled buffer");
ret = GST_FLOW_OK;
goto probed_data;
} else {
/* no buffer, we are EOS then */
GST_DEBUG_OBJECT (pad, "no buffer, return EOS");
ret = GST_FLOW_EOS;
}
}
GST_OBJECT_UNLOCK (pad);
return ret;
}
@ -4082,12 +4125,16 @@ pull_range_failed:
pad, "pullrange failed, flow: %s", gst_flow_get_name (ret));
return ret;
}
post_probe_stopped:
probe_stopped_unref:
{
GST_CAT_LOG_OBJECT (GST_CAT_SCHEDULING, pad,
"post probe returned %s", gst_flow_get_name (ret));
GST_OBJECT_UNLOCK (pad);
if (ret == GST_FLOW_OK)
/* if we drop here, it signals EOS */
if (ret == GST_FLOW_CUSTOM_SUCCESS) {
ret = GST_FLOW_EOS;
gst_buffer_unref (*buffer);
} else if (ret == GST_FLOW_OK)
gst_buffer_unref (*buffer);
*buffer = NULL;
return ret;

View file

@ -495,7 +495,10 @@ typedef enum
/**
* GstPadProbeReturn:
* @GST_PAD_PROBE_OK: normal probe return value
* @GST_PAD_PROBE_DROP: drop data in data probes
* @GST_PAD_PROBE_DROP: drop data in data probes. For push mode this means that
* the data item is not sent downstream. For pull mode, it means that the
* data item is not passed upstream. In both cases, this result code
* returns #GST_FLOW_OK or %TRUE to the caller.
* @GST_PAD_PROBE_REMOVE: remove probe
* @GST_PAD_PROBE_PASS: pass the data item in the block probe and block on
* the next item

View file

@ -1387,9 +1387,13 @@ gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
if (G_UNLIKELY (!bclass->fill))
goto no_function;
ret = bclass->alloc (src, offset, size, buffer);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto alloc_failed;
if (*buffer == NULL) {
/* downstream did not provide us with a buffer to fill, allocate one
* ourselves */
ret = bclass->alloc (src, offset, size, buffer);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto alloc_failed;
}
if (G_LIKELY (size > 0)) {
/* only call fill when there is a size */

View file

@ -2048,7 +2048,7 @@ gst_base_transform_getrange (GstPad * pad, GstObject * parent, guint64 offset,
GstBaseTransform *trans;
GstBaseTransformClass *klass;
GstFlowReturn ret;
GstBuffer *inbuf;
GstBuffer *inbuf = NULL;
trans = GST_BASE_TRANSFORM (parent);

View file

@ -1064,7 +1064,7 @@ gst_type_find_element_loop (GstPad * pad)
0, probability, found_caps);
typefind->mode = MODE_NORMAL;
} else if (typefind->mode == MODE_NORMAL) {
GstBuffer *outbuf;
GstBuffer *outbuf = NULL;
if (typefind->need_segment) {
typefind->need_segment = FALSE;

View file

@ -206,11 +206,13 @@ GST_START_TEST (test_pull)
gst_query_unref (seeking_query);
/* do some pulls */
buffer1 = NULL;
ret = gst_pad_get_range (pad, 0, 100, &buffer1);
fail_unless (ret == GST_FLOW_OK);
fail_unless (buffer1 != NULL);
fail_unless (gst_buffer_get_size (buffer1) == 100);
buffer2 = NULL;
ret = gst_pad_get_range (pad, 0, 50, &buffer2);
fail_unless (ret == GST_FLOW_OK);
fail_unless (buffer2 != NULL);
@ -225,6 +227,7 @@ GST_START_TEST (test_pull)
gst_buffer_unref (buffer2);
/* read next 50 bytes */
buffer2 = NULL;
ret = gst_pad_get_range (pad, 50, 50, &buffer2);
fail_unless (ret == GST_FLOW_OK);
fail_unless (buffer2 != NULL);
@ -240,6 +243,7 @@ GST_START_TEST (test_pull)
gst_buffer_unref (buffer2);
/* read 10 bytes at end-10 should give exactly 10 bytes */
buffer1 = NULL;
ret = gst_pad_get_range (pad, stop - 10, 10, &buffer1);
fail_unless (ret == GST_FLOW_OK);
fail_unless (buffer1 != NULL);
@ -247,6 +251,7 @@ GST_START_TEST (test_pull)
gst_buffer_unref (buffer1);
/* read 20 bytes at end-10 should give exactly 10 bytes */
buffer1 = NULL;
ret = gst_pad_get_range (pad, stop - 10, 20, &buffer1);
fail_unless (ret == GST_FLOW_OK);
fail_unless (buffer1 != NULL);
@ -254,6 +259,7 @@ GST_START_TEST (test_pull)
gst_buffer_unref (buffer1);
/* read 0 bytes at end-1 should return 0 bytes */
buffer1 = NULL;
ret = gst_pad_get_range (pad, stop - 1, 0, &buffer1);
fail_unless (ret == GST_FLOW_OK);
fail_unless (buffer1 != NULL);
@ -261,6 +267,7 @@ GST_START_TEST (test_pull)
gst_buffer_unref (buffer1);
/* read 10 bytes at end-1 should return 1 byte */
buffer1 = NULL;
ret = gst_pad_get_range (pad, stop - 1, 10, &buffer1);
fail_unless (ret == GST_FLOW_OK);
fail_unless (buffer1 != NULL);
@ -268,18 +275,22 @@ GST_START_TEST (test_pull)
gst_buffer_unref (buffer1);
/* read 0 bytes at end should EOS */
buffer1 = NULL;
ret = gst_pad_get_range (pad, stop, 0, &buffer1);
fail_unless (ret == GST_FLOW_EOS);
/* read 10 bytes before end should EOS */
buffer1 = NULL;
ret = gst_pad_get_range (pad, stop, 10, &buffer1);
fail_unless (ret == GST_FLOW_EOS);
/* read 0 bytes after end should EOS */
buffer1 = NULL;
ret = gst_pad_get_range (pad, stop + 10, 0, &buffer1);
fail_unless (ret == GST_FLOW_EOS);
/* read 10 bytes after end should EOS too */
buffer1 = NULL;
ret = gst_pad_get_range (pad, stop + 10, 10, &buffer1);
fail_unless (ret == GST_FLOW_EOS);