diff --git a/docs/libs/gstreamer-libs-sections.txt b/docs/libs/gstreamer-libs-sections.txt index 003210c978..957f3a866f 100644 --- a/docs/libs/gstreamer-libs-sections.txt +++ b/docs/libs/gstreamer-libs-sections.txt @@ -335,6 +335,7 @@ gst_base_src_get_allocator gst_base_src_get_buffer_pool gst_base_src_is_async gst_base_src_set_async +gst_base_src_submit_buffer_list GST_BASE_SRC_PAD GST_BASE_SRC_IS_STARTED diff --git a/libs/gst/base/gstbasesrc.c b/libs/gst/base/gstbasesrc.c index 30411b0ba1..da73c8c05a 100644 --- a/libs/gst/base/gstbasesrc.c +++ b/libs/gst/base/gstbasesrc.c @@ -261,8 +261,14 @@ struct _GstBaseSrcPrivate GstAllocationParams params; /* OBJECT_LOCK */ GCond async_cond; /* OBJECT_LOCK */ + + /* for _submit_buffer_list() */ + GstBufferList *pending_bufferlist; }; +#define BASE_SRC_HAS_PENDING_BUFFER_LIST(src) \ + ((src)->priv->pending_bufferlist != NULL) + static GstElementClass *parent_class = NULL; static void gst_base_src_class_init (GstBaseSrcClass * klass); @@ -2556,6 +2562,16 @@ again: res_buf = in_buf; } + if (res_buf == NULL) { + GstBufferList *pending_list = src->priv->pending_bufferlist; + + if (pending_list == NULL || gst_buffer_list_length (pending_list) == 0) + goto null_buffer; + + res_buf = gst_buffer_list_get_writable (pending_list, 0); + own_res_buf = FALSE; + } + /* no timestamp set and we are at offset 0, we can timestamp with 0 */ if (offset == 0 && src->segment.time == 0 && GST_BUFFER_DTS (res_buf) == -1 && !src->is_live) { @@ -2671,6 +2687,14 @@ eos: GST_DEBUG_OBJECT (src, "we are EOS"); return GST_FLOW_EOS; } +null_buffer: + { + GST_ELEMENT_ERROR (src, STREAM, FAILED, + (_("Internal data flow error.")), + ("Subclass %s neither returned a buffer nor submitted a buffer list " + "from its create function", G_OBJECT_TYPE_NAME (src))); + return GST_FLOW_ERROR; + } } static GstFlowReturn @@ -2803,6 +2827,12 @@ gst_base_src_loop (GstPad * pad) GST_LOG_OBJECT (src, "next_ts %" GST_TIME_FORMAT " size %u", GST_TIME_ARGS (position), blocksize); + /* clean up just in case we got interrupted or so last time round */ + if (src->priv->pending_bufferlist != NULL) { + gst_buffer_list_unref (src->priv->pending_bufferlist); + src->priv->pending_bufferlist = NULL; + } + ret = gst_base_src_get_range (src, position, blocksize, &buf); if (G_UNLIKELY (ret != GST_FLOW_OK)) { GST_INFO_OBJECT (src, "pausing after gst_base_src_get_range() = %s", @@ -2810,9 +2840,11 @@ gst_base_src_loop (GstPad * pad) GST_LIVE_UNLOCK (src); goto pause; } - /* this should not happen */ - if (G_UNLIKELY (buf == NULL)) - goto null_buffer; + + /* Note: at this point buf might be a single buf returned which we own or + * the first buf of a pending buffer list submitted via submit_buffer_list(), + * in which case the buffer is owned by the pending buffer list and not us. */ + g_assert (buf != NULL); /* push events to close/start our segment before we push the buffer. */ if (G_UNLIKELY (src->priv->segment_pending)) { @@ -2917,7 +2949,14 @@ gst_base_src_loop (GstPad * pad) } GST_LIVE_UNLOCK (src); - ret = gst_pad_push (pad, buf); + /* push buffer or buffer list */ + if (src->priv->pending_bufferlist != NULL) { + ret = gst_pad_push_list (pad, src->priv->pending_bufferlist); + src->priv->pending_bufferlist = NULL; + } else { + ret = gst_pad_push (pad, buf); + } + if (G_UNLIKELY (ret != GST_FLOW_OK)) { if (ret == GST_FLOW_NOT_NEGOTIATED) { goto not_negotiated; @@ -3018,13 +3057,6 @@ pause: } goto done; } -null_buffer: - { - GST_ELEMENT_ERROR (src, STREAM, FAILED, - (_("Internal data flow error.")), ("element returned NULL buffer")); - GST_LIVE_UNLOCK (src); - goto done; - } } static gboolean @@ -3619,6 +3651,11 @@ gst_base_src_stop (GstBaseSrc * basesrc) if (bclass->stop) result = bclass->stop (basesrc); + if (basesrc->priv->pending_bufferlist != NULL) { + gst_buffer_list_unref (basesrc->priv->pending_bufferlist); + basesrc->priv->pending_bufferlist = NULL; + } + gst_base_src_set_allocation (basesrc, NULL, NULL, NULL); return result; @@ -3959,3 +3996,40 @@ gst_base_src_get_allocator (GstBaseSrc * src, *params = src->priv->params; GST_OBJECT_UNLOCK (src); } + +/** + * gst_base_src_submit_buffer_list: + * @src: a #GstBaseSrc + * @buffer_list: (transfer full): a #GstBufferList + * + * Subclasses can call this from their create virtual method implementation + * to submit a buffer list to be pushed out later. This is useful in + * cases where the create function wants to produce multiple buffers to be + * pushed out in one go in form of a #GstBufferList, which can reduce overhead + * drastically, especially for packetised inputs (for data streams where + * the packetisation/chunking is not important it is usually more efficient + * to return larger buffers instead). + * + * Subclasses that use this function from their create function must return + * %GST_FLOW_OK and no buffer from their create virtual method implementation. + * If a buffer is returned after a buffer list has also been submitted via this + * function the behaviour is undefined. + * + * Subclasses must only call this function once per create function call and + * subclasses must only call this function when the source operates in push + * mode. + * + * Since: 1.14 + */ +void +gst_base_src_submit_buffer_list (GstBaseSrc * src, GstBufferList * buffer_list) +{ + g_return_if_fail (GST_IS_BASE_SRC (src)); + g_return_if_fail (GST_IS_BUFFER_LIST (buffer_list)); + g_return_if_fail (BASE_SRC_HAS_PENDING_BUFFER_LIST (src) == FALSE); + + src->priv->pending_bufferlist = buffer_list; + + GST_LOG_OBJECT (src, "%u buffers submitted in buffer list", + gst_buffer_list_length (buffer_list)); +} diff --git a/libs/gst/base/gstbasesrc.h b/libs/gst/base/gstbasesrc.h index 757a471380..f23799ac53 100644 --- a/libs/gst/base/gstbasesrc.h +++ b/libs/gst/base/gstbasesrc.h @@ -299,6 +299,9 @@ void gst_base_src_get_allocator (GstBaseSrc *src, GstAllocator **allocator, GstAllocationParams *params); +GST_EXPORT +void gst_base_src_submit_buffer_list (GstBaseSrc * src, + GstBufferList * buffer_list); #ifdef G_DEFINE_AUTOPTR_CLEANUP_FUNC G_DEFINE_AUTOPTR_CLEANUP_FUNC(GstBaseSrc, gst_object_unref) diff --git a/tests/check/libs/basesrc.c b/tests/check/libs/basesrc.c index 82c6589ec9..ee360ce28b 100644 --- a/tests/check/libs/basesrc.c +++ b/tests/check/libs/basesrc.c @@ -2,7 +2,7 @@ * * some unit tests for GstBaseSrc * - * Copyright (C) 2006 Tim-Philipp Müller + * Copyright (C) 2006-2017 Tim-Philipp Müller * * This library is free software; you can redistribute it and/or * modify it under the terms of the GNU Library General Public @@ -768,6 +768,142 @@ GST_START_TEST (basesrc_seek_on_last_buffer) GST_END_TEST; +typedef GstBaseSrc TestSrc; +typedef GstBaseSrcClass TestSrcClass; + +static GstStaticPadTemplate src_template = GST_STATIC_PAD_TEMPLATE ("src", + GST_PAD_SRC, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +static GType test_src_get_type (void); + +G_DEFINE_TYPE (TestSrc, test_src, GST_TYPE_BASE_SRC); + +static void +test_src_init (TestSrc * src) +{ +} + +static GstFlowReturn +test_src_create (GstBaseSrc * src, guint64 offset, guint size, + GstBuffer ** p_buf) +{ + GstBuffer *buf; + static int num = 0; + + fail_if (*p_buf != NULL); + + buf = gst_buffer_new (); + GST_BUFFER_OFFSET (buf) = num++; + + if (num == 1 || g_random_boolean ()) { + GstBufferList *buflist = gst_buffer_list_new (); + + gst_buffer_list_add (buflist, buf); + + buf = gst_buffer_new (); + GST_BUFFER_OFFSET (buf) = num++; + gst_buffer_list_add (buflist, buf); + gst_base_src_submit_buffer_list (src, buflist); + } else { + *p_buf = buf; + } + + return GST_FLOW_OK; +} + +static void +test_src_class_init (TestSrcClass * klass) +{ + GstBaseSrcClass *gstbasesrc_class = GST_BASE_SRC_CLASS (klass); + + gst_element_class_add_static_pad_template (GST_ELEMENT_CLASS (klass), + &src_template); + + gstbasesrc_class->create = test_src_create; +} + +static GstPad *mysinkpad; + +static GstStaticPadTemplate sinktemplate = GST_STATIC_PAD_TEMPLATE ("sink", + GST_PAD_SINK, + GST_PAD_ALWAYS, + GST_STATIC_CAPS_ANY); + +#define NUM_BUFFERS 100 +static gboolean done; +static guint expect_offset; + +static GstFlowReturn +chain_____func (GstPad * pad, GstObject * parent, GstBuffer * buf) +{ + GST_LOG (" buffer # %3u", (guint) GST_BUFFER_OFFSET (buf)); + + fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset); + ++expect_offset; + + if (GST_BUFFER_OFFSET (buf) > NUM_BUFFERS) { + g_mutex_lock (&check_mutex); + done = TRUE; + g_cond_signal (&check_cond); + g_mutex_unlock (&check_mutex); + } + gst_buffer_unref (buf); + + return GST_FLOW_OK; +} + +static GstFlowReturn +chainlist_func (GstPad * pad, GstObject * parent, GstBufferList * list) +{ + guint i, len; + + len = gst_buffer_list_length (list); + + GST_DEBUG ("buffer list with %u buffers", len); + for (i = 0; i < len; ++i) { + GstBuffer *buf = gst_buffer_list_get (list, i); + GST_LOG (" buffer # %3u", (guint) GST_BUFFER_OFFSET (buf)); + + fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset); + ++expect_offset; + } + + gst_buffer_list_unref (list); + return GST_FLOW_OK; +} + +GST_START_TEST (basesrc_create_bufferlist) +{ + GstElement *src; + + src = g_object_new (test_src_get_type (), NULL); + + mysinkpad = gst_check_setup_sink_pad (src, &sinktemplate); + gst_pad_set_chain_function (mysinkpad, chain_____func); + gst_pad_set_chain_list_function (mysinkpad, chainlist_func); + gst_pad_set_active (mysinkpad, TRUE); + + done = FALSE; + expect_offset = 0; + + gst_element_set_state (src, GST_STATE_PLAYING); + + g_mutex_lock (&check_mutex); + while (!done) + g_cond_wait (&check_cond, &check_mutex); + g_mutex_unlock (&check_mutex); + + gst_element_set_state (src, GST_STATE_NULL); + + gst_check_teardown_sink_pad (src); + + gst_object_unref (src); +} + +GST_END_TEST; + static Suite * gst_basesrc_suite (void) { @@ -783,6 +919,7 @@ gst_basesrc_suite (void) tcase_add_test (tc, basesrc_eos_events_pull_live_eos); tcase_add_test (tc, basesrc_seek_events_rate_update); tcase_add_test (tc, basesrc_seek_on_last_buffer); + tcase_add_test (tc, basesrc_create_bufferlist); return s; } diff --git a/win32/common/libgstbase.def b/win32/common/libgstbase.def index a65c611b85..ff2b844491 100644 --- a/win32/common/libgstbase.def +++ b/win32/common/libgstbase.def @@ -114,6 +114,7 @@ EXPORTS gst_base_src_set_live gst_base_src_start_complete gst_base_src_start_wait + gst_base_src_submit_buffer_list gst_base_src_wait_playing gst_base_transform_get_allocator gst_base_transform_get_buffer_pool