basesrc: Allocator buffers from negotiated allocator

Allocate buffers from the negotiated allocator or bufferpool.
Handle the state of the bufferpool when flushing.
Add fill method to pushsrc.
This commit is contained in:
Wim Taymans 2011-06-13 12:07:03 +02:00
parent d837268a77
commit 844e8aefaa
3 changed files with 112 additions and 16 deletions

View file

@ -241,6 +241,8 @@ struct _GstBaseSrcPrivate
GstBufferPool *pool; GstBufferPool *pool;
const GstMemoryAllocator *allocator; const GstMemoryAllocator *allocator;
guint prefix;
guint alignment;
}; };
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
@ -293,6 +295,8 @@ static const GstQueryType *gst_base_src_get_query_types (GstElement * element);
static gboolean gst_base_src_query (GstPad * pad, GstQuery * query); static gboolean gst_base_src_query (GstPad * pad, GstQuery * query);
static gboolean gst_base_src_activate_pool (GstBaseSrc * basesrc,
gboolean active);
static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc); static gboolean gst_base_src_default_negotiate (GstBaseSrc * basesrc);
static gboolean gst_base_src_default_do_seek (GstBaseSrc * src, static gboolean gst_base_src_default_do_seek (GstBaseSrc * src,
GstSegment * segment); GstSegment * segment);
@ -1248,33 +1252,55 @@ gst_base_src_prepare_seek_segment (GstBaseSrc * src, GstEvent * event,
return result; return result;
} }
static GstFlowReturn
gst_base_src_alloc_buffer (GstBaseSrc * src, guint64 offset,
guint size, GstBuffer ** buffer)
{
GstFlowReturn ret;
GstBaseSrcPrivate *priv = src->priv;
if (priv->pool) {
ret = gst_buffer_pool_acquire_buffer (priv->pool, buffer, NULL);
} else {
*buffer = gst_buffer_new_allocate (priv->allocator, size, priv->alignment);
if (G_UNLIKELY (*buffer == NULL))
goto alloc_failed;
ret = GST_FLOW_OK;
}
return ret;
/* ERRORS */
alloc_failed:
{
GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
return GST_FLOW_ERROR;
}
}
static GstFlowReturn static GstFlowReturn
gst_base_src_default_create (GstBaseSrc * src, guint64 offset, gst_base_src_default_create (GstBaseSrc * src, guint64 offset,
guint size, GstBuffer ** buffer) guint size, GstBuffer ** buffer)
{ {
GstBaseSrcClass *bclass; GstBaseSrcClass *bclass;
GstFlowReturn ret; GstFlowReturn ret;
GstBuffer *buf;
bclass = GST_BASE_SRC_GET_CLASS (src); bclass = GST_BASE_SRC_GET_CLASS (src);
if (G_UNLIKELY (!bclass->fill)) if (G_UNLIKELY (!bclass->fill))
goto no_function; goto no_function;
buf = gst_buffer_new_allocate (NULL, size, 0); ret = gst_base_src_alloc_buffer (src, offset, size, buffer);
if (G_UNLIKELY (buf == NULL)) if (G_UNLIKELY (ret != GST_FLOW_OK))
goto alloc_failed; goto alloc_failed;
if (G_LIKELY (size > 0)) { if (G_LIKELY (size > 0)) {
/* only call fill when there is a size */ /* only call fill when there is a size */
ret = bclass->fill (src, offset, size, buf); ret = bclass->fill (src, offset, size, *buffer);
if (G_UNLIKELY (ret != GST_FLOW_OK)) if (G_UNLIKELY (ret != GST_FLOW_OK))
goto not_ok; goto not_ok;
} }
*buffer = buf;
return GST_FLOW_OK; return GST_FLOW_OK;
/* ERRORS */ /* ERRORS */
@ -1286,13 +1312,13 @@ no_function:
alloc_failed: alloc_failed:
{ {
GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size); GST_ERROR_OBJECT (src, "Failed to allocate %u bytes", size);
return GST_FLOW_ERROR; return ret;
} }
not_ok: not_ok:
{ {
GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret, GST_DEBUG_OBJECT (src, "fill returned %d (%s)", ret,
gst_flow_get_name (ret)); gst_flow_get_name (ret));
gst_buffer_unref (buf); gst_buffer_unref (*buffer);
return ret; return ret;
} }
} }
@ -1590,10 +1616,12 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
g_atomic_int_set (&src->priv->pending_eos, TRUE); g_atomic_int_set (&src->priv->pending_eos, TRUE);
GST_DEBUG_OBJECT (src, "EOS marked, calling unlock"); GST_DEBUG_OBJECT (src, "EOS marked, calling unlock");
/* unlock the _create function so that we can check the pending_eos flag /* unlock the _create function so that we can check the pending_eos flag
* and we can do EOS. This will eventually release the LIVE_LOCK again so * and we can do EOS. This will eventually release the LIVE_LOCK again so
* that we can grab it and stop the unlock again. We don't take the stream * that we can grab it and stop the unlock again. We don't take the stream
* lock so that this operation is guaranteed to never block. */ * lock so that this operation is guaranteed to never block. */
gst_base_src_activate_pool (src, FALSE);
if (bclass->unlock) if (bclass->unlock)
bclass->unlock (src); bclass->unlock (src);
@ -1605,6 +1633,7 @@ gst_base_src_send_event (GstElement * element, GstEvent * event)
* lock is enough because that protects the create function. */ * lock is enough because that protects the create function. */
if (bclass->unlock_stop) if (bclass->unlock_stop)
bclass->unlock_stop (src); bclass->unlock_stop (src);
gst_base_src_activate_pool (src, TRUE);
GST_LIVE_UNLOCK (src); GST_LIVE_UNLOCK (src);
result = TRUE; result = TRUE;
@ -2571,13 +2600,46 @@ null_buffer:
static void static void
gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool, gst_base_src_set_allocation (GstBaseSrc * basesrc, GstBufferPool * pool,
const GstMemoryAllocator * allocator) const GstMemoryAllocator * allocator, guint prefix, guint alignment)
{ {
if (basesrc->priv->pool) GstBufferPool *oldpool;
gst_object_unref (basesrc->priv->pool); GstBaseSrcPrivate *priv = basesrc->priv;
basesrc->priv->pool = pool;
basesrc->priv->allocator = allocator; GST_OBJECT_LOCK (basesrc);
if ((oldpool = priv->pool)) {
gst_object_unref (oldpool);
}
priv->pool = pool;
priv->allocator = allocator;
priv->prefix = prefix;
priv->alignment = alignment;
GST_OBJECT_UNLOCK (basesrc);
if (oldpool) {
gst_buffer_pool_set_active (oldpool, FALSE);
gst_object_unref (oldpool);
}
}
static gboolean
gst_base_src_activate_pool (GstBaseSrc * basesrc, gboolean active)
{
GstBaseSrcPrivate *priv = basesrc->priv;
GstBufferPool *pool;
gboolean res = TRUE;
GST_OBJECT_LOCK (basesrc);
if ((pool = priv->pool))
pool = gst_object_ref (pool);
GST_OBJECT_UNLOCK (basesrc);
if (pool) {
res = gst_buffer_pool_set_active (pool, active);
gst_object_unref (pool);
}
return res;
} }
static gboolean static gboolean
@ -2623,9 +2685,11 @@ gst_base_src_prepare_allocation (GstBaseSrc * basesrc, GstCaps * caps)
gst_buffer_pool_config_set (config, caps, size, min, max, prefix, gst_buffer_pool_config_set (config, caps, size, min, max, prefix,
alignment); alignment);
gst_buffer_pool_set_config (pool, config); gst_buffer_pool_set_config (pool, config);
gst_buffer_pool_set_active (pool, TRUE);
} }
gst_base_src_set_allocation (basesrc, pool, allocator); gst_base_src_set_allocation (basesrc, pool, allocator, prefix, alignment);
return result; return result;
} }
@ -2856,6 +2920,8 @@ gst_base_src_stop (GstBaseSrc * basesrc)
if (bclass->stop) if (bclass->stop)
result = bclass->stop (basesrc); result = bclass->stop (basesrc);
gst_base_src_set_allocation (basesrc, NULL, NULL, 0, 0);
if (result) if (result)
GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED); GST_OBJECT_FLAG_UNSET (basesrc, GST_BASE_SRC_STARTED);
@ -2873,6 +2939,7 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc,
bclass = GST_BASE_SRC_GET_CLASS (basesrc); bclass = GST_BASE_SRC_GET_CLASS (basesrc);
if (flushing && unlock) { if (flushing && unlock) {
gst_base_src_activate_pool (basesrc, FALSE);
/* unlock any subclasses, we need to do this before grabbing the /* unlock any subclasses, we need to do this before grabbing the
* LIVE_LOCK since we hold this lock before going into ::create. We pass an * LIVE_LOCK since we hold this lock before going into ::create. We pass an
* unlock to the params because of backwards compat (see seek handler)*/ * unlock to the params because of backwards compat (see seek handler)*/
@ -2904,6 +2971,8 @@ gst_base_src_set_flushing (GstBaseSrc * basesrc,
/* signal the live source that it can start playing */ /* signal the live source that it can start playing */
basesrc->live_running = live_play; basesrc->live_running = live_play;
gst_base_src_activate_pool (basesrc, TRUE);
/* When unlocking drop all delayed events */ /* When unlocking drop all delayed events */
if (unlock) { if (unlock) {
GST_OBJECT_LOCK (basesrc); GST_OBJECT_LOCK (basesrc);
@ -2935,6 +3004,7 @@ gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
/* unlock subclasses locked in ::create, we only do this when we stop playing. */ /* unlock subclasses locked in ::create, we only do this when we stop playing. */
if (!live_play) { if (!live_play) {
GST_DEBUG_OBJECT (basesrc, "unlock"); GST_DEBUG_OBJECT (basesrc, "unlock");
gst_base_src_activate_pool (basesrc, FALSE);
if (bclass->unlock) if (bclass->unlock)
bclass->unlock (basesrc); bclass->unlock (basesrc);
} }
@ -2958,6 +3028,7 @@ gst_base_src_set_playing (GstBaseSrc * basesrc, gboolean live_play)
/* clear our unlock request when going to PLAYING */ /* clear our unlock request when going to PLAYING */
GST_DEBUG_OBJECT (basesrc, "unlock stop"); GST_DEBUG_OBJECT (basesrc, "unlock stop");
gst_base_src_activate_pool (basesrc, TRUE);
if (bclass->unlock_stop) if (bclass->unlock_stop)
bclass->unlock_stop (basesrc); bclass->unlock_stop (basesrc);

View file

@ -72,6 +72,8 @@ G_DEFINE_TYPE_WITH_CODE (GstPushSrc, gst_push_src, GST_TYPE_BASE_SRC, _do_init);
static gboolean gst_push_src_query (GstBaseSrc * src, GstQuery * query); static gboolean gst_push_src_query (GstBaseSrc * src, GstQuery * query);
static GstFlowReturn gst_push_src_create (GstBaseSrc * bsrc, guint64 offset, static GstFlowReturn gst_push_src_create (GstBaseSrc * bsrc, guint64 offset,
guint length, GstBuffer ** ret); guint length, GstBuffer ** ret);
static GstFlowReturn gst_push_src_fill (GstBaseSrc * bsrc, guint64 offset,
guint length, GstBuffer * ret);
static void static void
gst_push_src_class_init (GstPushSrcClass * klass) gst_push_src_class_init (GstPushSrcClass * klass)
@ -79,6 +81,7 @@ gst_push_src_class_init (GstPushSrcClass * klass)
GstBaseSrcClass *gstbasesrc_class = (GstBaseSrcClass *) klass; GstBaseSrcClass *gstbasesrc_class = (GstBaseSrcClass *) klass;
gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_push_src_create); gstbasesrc_class->create = GST_DEBUG_FUNCPTR (gst_push_src_create);
gstbasesrc_class->fill = GST_DEBUG_FUNCPTR (gst_push_src_fill);
gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_push_src_query); gstbasesrc_class->query = GST_DEBUG_FUNCPTR (gst_push_src_query);
} }
@ -123,7 +126,26 @@ gst_push_src_create (GstBaseSrc * bsrc, guint64 offset, guint length,
if (pclass->create) if (pclass->create)
fret = pclass->create (src, ret); fret = pclass->create (src, ret);
else else
fret = GST_FLOW_ERROR; fret =
GST_BASE_SRC_CLASS (parent_class)->create (bsrc, offset, length, ret);
return fret;
}
static GstFlowReturn
gst_push_src_fill (GstBaseSrc * bsrc, guint64 offset, guint length,
GstBuffer * ret)
{
GstFlowReturn fret;
GstPushSrc *src;
GstPushSrcClass *pclass;
src = GST_PUSH_SRC (bsrc);
pclass = GST_PUSH_SRC_GET_CLASS (src);
if (pclass->fill)
fret = pclass->fill (src, ret);
else
fret = GST_BASE_SRC_CLASS (parent_class)->fill (bsrc, offset, length, ret);
return fret; return fret;
} }

View file

@ -57,6 +57,9 @@ struct _GstPushSrcClass {
/* ask the subclass to create a buffer */ /* ask the subclass to create a buffer */
GstFlowReturn (*create) (GstPushSrc *src, GstBuffer **buf); GstFlowReturn (*create) (GstPushSrc *src, GstBuffer **buf);
/* ask the subclass to fill a buffer */
GstFlowReturn (*fill) (GstPushSrc *src, GstBuffer *buf);
/*< private >*/ /*< private >*/
gpointer _gst_reserved[GST_PADDING]; gpointer _gst_reserved[GST_PADDING];
}; };