basesrc: add buffer list support

Add a gst_base_src_submit_buffer_list() function that allows subclasses
to produce a bufferlist containing multiple buffers in the ::create()
function. The buffers in the buffer list will then also be pushed out
in one go as a GstBufferList. This can reduce push overhead
significantly for sources with packetised inputs (such as udpsrc)
in high-throughput scenarios.

The _submit_buffer_list() approach was chosen because it is fairly
straight-forward, backwards-compatible, bindings-friendly (as opposed
to e.g. making the create function return a mini object instead),
and it allows the subclass maximum control: the subclass can decide
dynamically at runtime whether to return a list or a single buffer
(which would be messier if we added a create_list virtual method).

https://bugzilla.gnome.org/show_bug.cgi?id=750241
This commit is contained in:
Tim-Philipp Müller 2017-08-30 13:03:28 +01:00
parent 880c573e8d
commit 18fe36a286
5 changed files with 228 additions and 12 deletions

View file

@ -335,6 +335,7 @@ gst_base_src_get_allocator
gst_base_src_get_buffer_pool gst_base_src_get_buffer_pool
gst_base_src_is_async gst_base_src_is_async
gst_base_src_set_async gst_base_src_set_async
gst_base_src_submit_buffer_list
GST_BASE_SRC_PAD GST_BASE_SRC_PAD
GST_BASE_SRC_IS_STARTED GST_BASE_SRC_IS_STARTED

View file

@ -261,8 +261,14 @@ struct _GstBaseSrcPrivate
GstAllocationParams params; /* OBJECT_LOCK */ GstAllocationParams params; /* OBJECT_LOCK */
GCond async_cond; /* OBJECT_LOCK */ GCond async_cond; /* OBJECT_LOCK */
/* for _submit_buffer_list() */
GstBufferList *pending_bufferlist;
}; };
#define BASE_SRC_HAS_PENDING_BUFFER_LIST(src) \
((src)->priv->pending_bufferlist != NULL)
static GstElementClass *parent_class = NULL; static GstElementClass *parent_class = NULL;
static void gst_base_src_class_init (GstBaseSrcClass * klass); static void gst_base_src_class_init (GstBaseSrcClass * klass);
@ -2556,6 +2562,16 @@ again:
res_buf = in_buf; res_buf = in_buf;
} }
if (res_buf == NULL) {
GstBufferList *pending_list = src->priv->pending_bufferlist;
if (pending_list == NULL || gst_buffer_list_length (pending_list) == 0)
goto null_buffer;
res_buf = gst_buffer_list_get_writable (pending_list, 0);
own_res_buf = FALSE;
}
/* no timestamp set and we are at offset 0, we can timestamp with 0 */ /* no timestamp set and we are at offset 0, we can timestamp with 0 */
if (offset == 0 && src->segment.time == 0 if (offset == 0 && src->segment.time == 0
&& GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) { && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) {
@ -2671,6 +2687,14 @@ eos:
GST_DEBUG_OBJECT (src, "we are EOS"); GST_DEBUG_OBJECT (src, "we are EOS");
return GST_FLOW_EOS; return GST_FLOW_EOS;
} }
null_buffer:
{
GST_ELEMENT_ERROR (src, STREAM, FAILED,
(_("Internal data flow error.")),
("Subclass %s neither returned a buffer nor submitted a buffer list "
"from its create function", G_OBJECT_TYPE_NAME (src)));
return GST_FLOW_ERROR;
}
} }
static GstFlowReturn static GstFlowReturn
@ -2803,6 +2827,12 @@ gst_base_src_loop (GstPad * pad)
GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u", GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u",
GST_TIME_ARGS (position), blocksize); GST_TIME_ARGS (position), blocksize);
/* clean up just in case we got interrupted or so last time round */
if (src->priv->pending_bufferlist != NULL) {
gst_buffer_list_unref (src->priv->pending_bufferlist);
src->priv->pending_bufferlist = NULL;
}
ret = gst_base_src_get_range (src, position, blocksize, &buf); ret = gst_base_src_get_range (src, position, blocksize, &buf);
if (G_UNLIKELY (ret != GST_FLOW_OK)) { if (G_UNLIKELY (ret != GST_FLOW_OK)) {
GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s", GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s",
@ -2810,9 +2840,11 @@ gst_base_src_loop (GstPad * pad)
GST_LIVE_UNLOCK (src); GST_LIVE_UNLOCK (src);
goto pause; goto pause;
} }
/* this should not happen */
if (G_UNLIKELY (buf == NULL)) /* Note: at this point buf might be a single buf returned which we own or
goto null_buffer; * the first buf of a pending buffer list submitted via submit_buffer_list(),
* in which case the buffer is owned by the pending buffer list and not us. */
g_assert (buf != NULL);
/* push events to close/start our segment before we push the buffer. */ /* push events to close/start our segment before we push the buffer. */
if (G_UNLIKELY (src->priv->segment_pending)) { if (G_UNLIKELY (src->priv->segment_pending)) {
@ -2917,7 +2949,14 @@ gst_base_src_loop (GstPad * pad)
} }
GST_LIVE_UNLOCK (src); GST_LIVE_UNLOCK (src);
/* push buffer or buffer list */
if (src->priv->pending_bufferlist != NULL) {
ret = gst_pad_push_list (pad, src->priv->pending_bufferlist);
src->priv->pending_bufferlist = NULL;
} else {
ret = gst_pad_push (pad, buf); ret = gst_pad_push (pad, buf);
}
if (G_UNLIKELY (ret != GST_FLOW_OK)) { if (G_UNLIKELY (ret != GST_FLOW_OK)) {
if (ret == GST_FLOW_NOT_NEGOTIATED) { if (ret == GST_FLOW_NOT_NEGOTIATED) {
goto not_negotiated; goto not_negotiated;
@ -3018,13 +3057,6 @@ pause:
} }
goto done; goto done;
} }
null_buffer:
{
GST_ELEMENT_ERROR (src, STREAM, FAILED,
(_("Internal data flow error.")), ("element returned NULL buffer"));
GST_LIVE_UNLOCK (src);
goto done;
}
} }
static gboolean static gboolean
@ -3619,6 +3651,11 @@ gst_base_src_stop (GstBaseSrc * basesrc)
if (bclass->stop) if (bclass->stop)
result = bclass->stop (basesrc); result = bclass->stop (basesrc);
if (basesrc->priv->pending_bufferlist != NULL) {
gst_buffer_list_unref (basesrc->priv->pending_bufferlist);
basesrc->priv->pending_bufferlist = NULL;
}
gst_base_src_set_allocation (basesrc, NULL, NULL, NULL); gst_base_src_set_allocation (basesrc, NULL, NULL, NULL);
return result; return result;
@ -3959,3 +3996,40 @@ gst_base_src_get_allocator (GstBaseSrc * src,
*params = src->priv->params; *params = src->priv->params;
GST_OBJECT_UNLOCK (src); GST_OBJECT_UNLOCK (src);
} }
/**
* gst_base_src_submit_buffer_list:
* @src: a #GstBaseSrc
* @buffer_list: (transfer full): a #GstBufferList
*
* Subclasses can call this from their create virtual method implementation
* to submit a buffer list to be pushed out later. This is useful in
* cases where the create function wants to produce multiple buffers to be
* pushed out in one go in form of a #GstBufferList, which can reduce overhead
* drastically, especially for packetised inputs (for data streams where
* the packetisation/chunking is not important it is usually more efficient
* to return larger buffers instead).
*
* Subclasses that use this function from their create function must return
* %GST_FLOW_OK and no buffer from their create virtual method implementation.
* If a buffer is returned after a buffer list has also been submitted via this
* function the behaviour is undefined.
*
* Subclasses must only call this function once per create function call and
* subclasses must only call this function when the source operates in push
* mode.
*
* Since: 1.14
*/
void
gst_base_src_submit_buffer_list (GstBaseSrc * src, GstBufferList * buffer_list)
{
g_return_if_fail (GST_IS_BASE_SRC (src));
g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list));
g_return_if_fail (BASE_SRC_HAS_PENDING_BUFFER_LIST (src) == FALSE);
src->priv->pending_bufferlist = buffer_list;
GST_LOG_OBJECT (src, "%u buffers submitted in buffer list",
gst_buffer_list_length (buffer_list));
}

View file

@ -299,6 +299,9 @@ void gst_base_src_get_allocator (GstBaseSrc *src,
GstAllocator **allocator, GstAllocator **allocator,
GstAllocationParams *params); GstAllocationParams *params);
GST_EXPORT
void gst_base_src_submit_buffer_list (GstBaseSrc * src,
GstBufferList * buffer_list);
#ifdef G_DEFINE_AUTOPTR_CLEANUP_FUNC #ifdef G_DEFINE_AUTOPTR_CLEANUP_FUNC
G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstBaseSrc, gst_object_unref) G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstBaseSrc, gst_object_unref)

View file

@ -2,7 +2,7 @@
* *
* some unit tests for GstBaseSrc * some unit tests for GstBaseSrc
* *
* Copyright (C) 2006 Tim-Philipp Müller <tim centricular net> * Copyright (C) 2006-2017 Tim-Philipp Müller <tim centricular net>
* *
* This library is free software; you can redistribute it and/or * This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public * modify it under the terms of the GNU Library General Public
@ -768,6 +768,142 @@ GST_START_TEST (basesrc_seek_on_last_buffer)
GST_END_TEST; GST_END_TEST;
typedef GstBaseSrc TestSrc;
typedef GstBaseSrcClass TestSrcClass;
static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src",
GST_PAD_SRC,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
static GType test_src_get_type (void);
G_DEFINE_TYPE (TestSrc, test_src, GST_TYPE_BASE_SRC);
static void
test_src_init (TestSrc * src)
{
}
static GstFlowReturn
test_src_create (GstBaseSrc * src, guint64 offset, guint size,
GstBuffer ** p_buf)
{
GstBuffer *buf;
static int num = 0;
fail_if (*p_buf != NULL);
buf = gst_buffer_new ();
GST_BUFFER_OFFSET (buf) = num++;
if (num == 1 || g_random_boolean ()) {
GstBufferList *buflist = gst_buffer_list_new ();
gst_buffer_list_add (buflist, buf);
buf = gst_buffer_new ();
GST_BUFFER_OFFSET (buf) = num++;
gst_buffer_list_add (buflist, buf);
gst_base_src_submit_buffer_list (src, buflist);
} else {
*p_buf = buf;
}
return GST_FLOW_OK;
}
static void
test_src_class_init (TestSrcClass * klass)
{
GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass);
gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass),
&src_template);
gstbasesrc_class->create = test_src_create;
}
static GstPad *mysinkpad;
static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink",
GST_PAD_SINK,
GST_PAD_ALWAYS,
GST_STATIC_CAPS_ANY);
#define NUM_BUFFERS 100
static gboolean done;
static guint expect_offset;
static GstFlowReturn
chain_____func (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
GST_LOG (" buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
++expect_offset;
if (GST_BUFFER_OFFSET (buf) > NUM_BUFFERS) {
g_mutex_lock (&check_mutex);
done = TRUE;
g_cond_signal (&check_cond);
g_mutex_unlock (&check_mutex);
}
gst_buffer_unref (buf);
return GST_FLOW_OK;
}
static GstFlowReturn
chainlist_func (GstPad * pad, GstObject * parent, GstBufferList * list)
{
guint i, len;
len = gst_buffer_list_length (list);
GST_DEBUG ("buffer list with %u buffers", len);
for (i = 0; i < len; ++i) {
GstBuffer *buf = gst_buffer_list_get (list, i);
GST_LOG (" buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
++expect_offset;
}
gst_buffer_list_unref (list);
return GST_FLOW_OK;
}
GST_START_TEST (basesrc_create_bufferlist)
{
GstElement *src;
src = g_object_new (test_src_get_type (), NULL);
mysinkpad = gst_check_setup_sink_pad (src, &sinktemplate);
gst_pad_set_chain_function (mysinkpad, chain_____func);
gst_pad_set_chain_list_function (mysinkpad, chainlist_func);
gst_pad_set_active (mysinkpad, TRUE);
done = FALSE;
expect_offset = 0;
gst_element_set_state (src, GST_STATE_PLAYING);
g_mutex_lock (&check_mutex);
while (!done)
g_cond_wait (&check_cond, &check_mutex);
g_mutex_unlock (&check_mutex);
gst_element_set_state (src, GST_STATE_NULL);
gst_check_teardown_sink_pad (src);
gst_object_unref (src);
}
GST_END_TEST;
static Suite * static Suite *
gst_basesrc_suite (void) gst_basesrc_suite (void)
{ {
@ -783,6 +919,7 @@ gst_basesrc_suite (void)
tcase_add_test (tc, basesrc_eos_events_pull_live_eos); tcase_add_test (tc, basesrc_eos_events_pull_live_eos);
tcase_add_test (tc, basesrc_seek_events_rate_update); tcase_add_test (tc, basesrc_seek_events_rate_update);
tcase_add_test (tc, basesrc_seek_on_last_buffer); tcase_add_test (tc, basesrc_seek_on_last_buffer);
tcase_add_test (tc, basesrc_create_bufferlist);
return s; return s;
} }

View file

@ -114,6 +114,7 @@ EXPORTS
gst_base_src_set_live gst_base_src_set_live
gst_base_src_start_complete gst_base_src_start_complete
gst_base_src_start_wait gst_base_src_start_wait
gst_base_src_submit_buffer_list
gst_base_src_wait_playing gst_base_src_wait_playing
gst_base_transform_get_allocator gst_base_transform_get_allocator
gst_base_transform_get_buffer_pool gst_base_transform_get_buffer_pool