appsink: add support for buffer lists

https://bugzilla.gnome.org/show_bug.cgi?id=752363
This commit is contained in:
Patricia Muscalu 2016-07-04 09:32:28 +02:00 committed by Jan Schmidt
parent 08ee940de2
commit f1562053fe
4 changed files with 241 additions and 25 deletions

View file

@ -68,6 +68,7 @@
#include <gst/gst.h>
#include <gst/base/gstbasesink.h>
#include <gst/gstbuffer.h>
#include <gst/gstbufferlist.h>
#include <string.h>
@ -94,6 +95,7 @@ struct _GstAppSinkPrivate
gboolean unlock;
gboolean started;
gboolean is_eos;
gboolean buffer_lists_supported;
GstAppSinkCallbacks callbacks;
gpointer user_data;
@ -124,6 +126,7 @@ enum
#define DEFAULT_PROP_MAX_BUFFERS 0
#define DEFAULT_PROP_DROP FALSE
#define DEFAULT_PROP_WAIT_ON_EOS TRUE
#define DEFAULT_PROP_BUFFER_LIST FALSE
enum
{
@ -134,6 +137,7 @@ enum
PROP_MAX_BUFFERS,
PROP_DROP,
PROP_WAIT_ON_EOS,
PROP_BUFFER_LIST,
PROP_LAST
};
@ -162,8 +166,12 @@ static gboolean gst_app_sink_event (GstBaseSink * sink, GstEvent * event);
static gboolean gst_app_sink_query (GstBaseSink * bsink, GstQuery * query);
static GstFlowReturn gst_app_sink_preroll (GstBaseSink * psink,
GstBuffer * buffer);
static GstFlowReturn gst_app_sink_render_common (GstBaseSink * psink,
GstMiniObject * data, gboolean is_list);
static GstFlowReturn gst_app_sink_render (GstBaseSink * psink,
GstBuffer * buffer);
static GstFlowReturn gst_app_sink_render_list (GstBaseSink * psink,
GstBufferList * list);
static gboolean gst_app_sink_setcaps (GstBaseSink * sink, GstCaps * caps);
static GstCaps *gst_app_sink_getcaps (GstBaseSink * psink, GstCaps * filter);
@ -216,6 +224,10 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
"Drop old buffers when the buffer queue is filled", DEFAULT_PROP_DROP,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
g_object_class_install_property (gobject_class, PROP_BUFFER_LIST,
g_param_spec_boolean ("buffer-list", "Buffer List",
"Use buffer lists", DEFAULT_PROP_BUFFER_LIST,
G_PARAM_READWRITE | G_PARAM_STATIC_STRINGS));
/**
* GstAppSink::wait-on-eos:
*
@ -413,6 +425,7 @@ gst_app_sink_class_init (GstAppSinkClass * klass)
basesink_class->event = gst_app_sink_event;
basesink_class->preroll = gst_app_sink_preroll;
basesink_class->render = gst_app_sink_render;
basesink_class->render_list = gst_app_sink_render_list;
basesink_class->get_caps = gst_app_sink_getcaps;
basesink_class->set_caps = gst_app_sink_setcaps;
basesink_class->query = gst_app_sink_query;
@ -442,6 +455,7 @@ gst_app_sink_init (GstAppSink * appsink)
priv->max_buffers = DEFAULT_PROP_MAX_BUFFERS;
priv->drop = DEFAULT_PROP_DROP;
priv->wait_on_eos = DEFAULT_PROP_WAIT_ON_EOS;
priv->buffer_lists_supported = DEFAULT_PROP_BUFFER_LIST;
}
static void
@ -507,6 +521,10 @@ gst_app_sink_set_property (GObject * object, guint prop_id,
case PROP_DROP:
gst_app_sink_set_drop (appsink, g_value_get_boolean (value));
break;
case PROP_BUFFER_LIST:
gst_app_sink_set_buffer_list_support (appsink,
g_value_get_boolean (value));
break;
case PROP_WAIT_ON_EOS:
gst_app_sink_set_wait_on_eos (appsink, g_value_get_boolean (value));
break;
@ -545,6 +563,10 @@ gst_app_sink_get_property (GObject * object, guint prop_id, GValue * value,
case PROP_DROP:
g_value_set_boolean (value, gst_app_sink_get_drop (appsink));
break;
case PROP_BUFFER_LIST:
g_value_set_boolean (value,
gst_app_sink_get_buffer_list_support (appsink));
break;
case PROP_WAIT_ON_EOS:
g_value_set_boolean (value, gst_app_sink_get_wait_on_eos (appsink));
break;
@ -753,20 +775,17 @@ flushing:
}
}
static GstBuffer *
static GstMiniObject *
dequeue_buffer (GstAppSink * appsink)
{
GstAppSinkPrivate *priv = appsink->priv;
GstBuffer *buffer;
GstMiniObject *obj;
do {
GstMiniObject *obj;
obj = g_queue_pop_head (priv->queue);
if (GST_IS_BUFFER (obj)) {
buffer = GST_BUFFER_CAST (obj);
GST_DEBUG_OBJECT (appsink, "dequeued buffer %p", buffer);
if (GST_IS_BUFFER (obj) || GST_IS_BUFFER_LIST (obj)) {
GST_DEBUG_OBJECT (appsink, "dequeued buffer/list %p", obj);
priv->num_buffers--;
break;
} else if (GST_IS_EVENT (obj)) {
@ -794,11 +813,12 @@ dequeue_buffer (GstAppSink * appsink)
}
} while (TRUE);
return buffer;
return obj;
}
static GstFlowReturn
gst_app_sink_render (GstBaseSink * psink, GstBuffer * buffer)
gst_app_sink_render_common (GstBaseSink * psink, GstMiniObject * data,
gboolean is_list)
{
GstFlowReturn ret;
GstAppSink *appsink = GST_APP_SINK_CAST (psink);
@ -819,17 +839,17 @@ restart:
priv->last_caps);
}
GST_DEBUG_OBJECT (appsink, "pushing render buffer %p on queue (%d)",
buffer, priv->num_buffers);
GST_DEBUG_OBJECT (appsink, "pushing render buffer/list %p on queue (%d)",
data, priv->num_buffers);
while (priv->max_buffers > 0 && priv->num_buffers >= priv->max_buffers) {
if (priv->drop) {
GstBuffer *old;
GstMiniObject *old;
/* we need to drop the oldest buffer and try again */
/* we need to drop the oldest buffer/list and try again */
if ((old = dequeue_buffer (appsink))) {
GST_DEBUG_OBJECT (appsink, "dropping old buffer %p", old);
gst_buffer_unref (old);
GST_DEBUG_OBJECT (appsink, "dropping old buffer/list %p", old);
gst_mini_object_unref (old);
}
} else {
GST_DEBUG_OBJECT (appsink, "waiting for free space, length %d >= %d",
@ -851,8 +871,8 @@ restart:
goto flushing;
}
}
/* we need to ref the buffer when pushing it in the queue */
g_queue_push_tail (priv->queue, gst_buffer_ref (buffer));
/* we need to ref the buffer/list when pushing it in the queue */
g_queue_push_tail (priv->queue, gst_mini_object_ref (data));
priv->num_buffers++;
g_cond_signal (&priv->cond);
emit = priv->emit_signals;
@ -880,6 +900,43 @@ stopping:
}
}
static GstFlowReturn
gst_app_sink_render (GstBaseSink * psink, GstBuffer * buffer)
{
return gst_app_sink_render_common (psink, GST_MINI_OBJECT_CAST (buffer),
FALSE);
}
static GstFlowReturn
gst_app_sink_render_list (GstBaseSink * sink, GstBufferList * list)
{
GstFlowReturn flow;
GstAppSink *appsink;
GstBuffer *buffer;
guint i, len;
appsink = GST_APP_SINK_CAST (sink);
if (appsink->priv->buffer_lists_supported)
return gst_app_sink_render_common (sink, GST_MINI_OBJECT_CAST (list), TRUE);
/* The application doesn't support buffer lists, extract individual buffers
* then and push them one-by-one */
GST_INFO_OBJECT (sink, "chaining each group in list as a merged buffer");
len = gst_buffer_list_length (list);
flow = GST_FLOW_OK;
for (i = 0; i < len; i++) {
buffer = gst_buffer_list_get (list, i);
flow = gst_app_sink_render (sink, buffer);
if (flow != GST_FLOW_OK)
break;
}
return flow;
}
static GstCaps *
gst_app_sink_getcaps (GstBaseSink * psink, GstCaps * filter)
{
@ -1185,6 +1242,57 @@ gst_app_sink_get_drop (GstAppSink * appsink)
return result;
}
/**
* gst_app_sink_set_buffer_list_support:
* @appsink: a #GstAppSink
* @buffer_list: enable or disable buffer list support
*
* Instruct @appsink to enable or disable buffer list support.
*
*/
void
gst_app_sink_set_buffer_list_support (GstAppSink * appsink,
gboolean buffer_list)
{
GstAppSinkPrivate *priv;
g_return_if_fail (GST_IS_APP_SINK (appsink));
priv = appsink->priv;
g_mutex_lock (&priv->mutex);
if (priv->buffer_lists_supported != buffer_list) {
priv->buffer_lists_supported = buffer_list;
}
g_mutex_unlock (&priv->mutex);
}
/**
* gst_app_sink_get_buffer_list_support:
* @appsink: a #GstAppSink
*
* Check if @appsink supports buffer lists.
*
* Returns: %TRUE if @appsink supports buffer lists.
*
*/
gboolean
gst_app_sink_get_buffer_list_support (GstAppSink * appsink)
{
gboolean result;
GstAppSinkPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SINK (appsink), FALSE);
priv = appsink->priv;
g_mutex_lock (&priv->mutex);
result = priv->buffer_lists_supported;
g_mutex_unlock (&priv->mutex);
return result;
}
/**
* gst_app_sink_set_wait_on_eos:
* @appsink: a #GstAppSink
@ -1418,7 +1526,7 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
{
GstAppSinkPrivate *priv;
GstSample *sample = NULL;
GstBuffer *buffer;
GstMiniObject *obj;
gboolean timeout_valid;
gint64 end_time;
@ -1454,10 +1562,18 @@ gst_app_sink_try_pull_sample (GstAppSink * appsink, GstClockTime timeout)
g_cond_wait (&priv->cond, &priv->mutex);
}
}
buffer = dequeue_buffer (appsink);
GST_DEBUG_OBJECT (appsink, "we have a buffer %p", buffer);
sample = gst_sample_new (buffer, priv->last_caps, &priv->last_segment, NULL);
gst_buffer_unref (buffer);
obj = dequeue_buffer (appsink);
if (GST_IS_BUFFER (obj)) {
GST_DEBUG_OBJECT (appsink, "we have a buffer %p", obj);
sample = gst_sample_new (GST_BUFFER_CAST (obj), priv->last_caps,
&priv->last_segment, NULL);
} else {
GST_DEBUG_OBJECT (appsink, "we have a list %p", obj);
sample = gst_sample_new (NULL, priv->last_caps, &priv->last_segment, NULL);
gst_sample_set_buffer_list (sample, GST_BUFFER_LIST_CAST (obj));
}
gst_mini_object_unref (obj);
g_cond_signal (&priv->cond);
g_mutex_unlock (&priv->mutex);

View file

@ -117,6 +117,9 @@ guint gst_app_sink_get_max_buffers (GstAppSink *appsink);
void gst_app_sink_set_drop (GstAppSink *appsink, gboolean drop);
gboolean gst_app_sink_get_drop (GstAppSink *appsink);
void gst_app_sink_set_buffer_list_support (GstAppSink *appsink, gboolean drop);
gboolean gst_app_sink_get_buffer_list_support (GstAppSink *appsink);
void gst_app_sink_set_wait_on_eos (GstAppSink *appsink, gboolean wait);
gboolean gst_app_sink_get_wait_on_eos (GstAppSink *appsink);

View file

@ -224,7 +224,7 @@ create_buffer_list (void)
}
static GstFlowReturn
callback_function_sample (GstAppSink * appsink, gpointer p_counter)
callback_function_sample_fallback (GstAppSink * appsink, gpointer p_counter)
{
GstSample *sample;
GstBuffer *buf;
@ -260,16 +260,50 @@ callback_function_sample (GstAppSink * appsink, gpointer p_counter)
return GST_FLOW_OK;
}
static GstFlowReturn
callback_function_sample (GstAppSink * appsink, gpointer p_counter)
{
GstSample *sample;
GstBufferList *list;
gint *p_int_counter = p_counter;
guint len;
gint i;
sample = gst_app_sink_pull_sample (appsink);
list = gst_sample_get_buffer_list (sample);
fail_unless (GST_IS_BUFFER_LIST (list));
len = gst_buffer_list_length (list);
fail_unless_equals_int (len, 3);
for (i = 0; i < len; i++) {
GstBuffer *buf = gst_buffer_list_get (list, i);
fail_unless_equals_int (gst_buffer_get_size (buf), sizeof (gint));
gst_check_buffer_data (buf, &values[i], sizeof (gint));
}
gst_sample_unref (sample);
*p_int_counter += 1;
return GST_FLOW_OK;
}
GST_START_TEST (test_buffer_list_fallback)
{
GstElement *sink;
GstBufferList *list;
GstAppSinkCallbacks callbacks = { NULL };
gint counter = 0;
gboolean buffer_list_support;
sink = setup_appsink ();
callbacks.new_sample = callback_function_sample;
/* verify that the buffer list support is disabled per default */
g_object_get (sink, "buffer-list", &buffer_list_support, NULL);
fail_unless (buffer_list_support == FALSE);
callbacks.new_sample = callback_function_sample_fallback;
gst_app_sink_set_callbacks (GST_APP_SINK (sink), &callbacks, &counter, NULL);
@ -286,6 +320,35 @@ GST_START_TEST (test_buffer_list_fallback)
GST_END_TEST;
GST_START_TEST (test_buffer_list_support)
{
GstElement *sink;
GstBufferList *list;
GstAppSinkCallbacks callbacks = { NULL };
gint counter = 0;
sink = setup_appsink ();
/* enable buffer list support */
g_object_set (sink, "buffer-list", TRUE, NULL);
callbacks.new_sample = callback_function_sample;
gst_app_sink_set_callbacks (GST_APP_SINK (sink), &callbacks, &counter, NULL);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
list = create_buffer_list ();
fail_unless (gst_pad_push_list (mysrcpad, list) == GST_FLOW_OK);
fail_unless_equals_int (counter, 1);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
GST_START_TEST (test_buffer_list_fallback_signal)
{
GstElement *sink;
@ -294,6 +357,36 @@ GST_START_TEST (test_buffer_list_fallback_signal)
sink = setup_appsink ();
/* C calling convention to the rescue.. */
g_signal_connect (sink, "new-sample",
G_CALLBACK (callback_function_sample_fallback), &counter);
g_object_set (sink, "emit-signals", TRUE, NULL);
ASSERT_SET_STATE (sink, GST_STATE_PLAYING, GST_STATE_CHANGE_ASYNC);
list = create_buffer_list ();
fail_unless (gst_pad_push_list (mysrcpad, list) == GST_FLOW_OK);
fail_unless_equals_int (counter, 3);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
}
GST_END_TEST;
GST_START_TEST (test_buffer_list_signal)
{
GstElement *sink;
GstBufferList *list;
gint counter = 0;
sink = setup_appsink ();
/* enable buffer list support */
g_object_set (sink, "buffer-list", TRUE, NULL);
/* C calling convention to the rescue.. */
g_signal_connect (sink, "new-sample", G_CALLBACK (callback_function_sample),
&counter);
@ -305,7 +398,7 @@ GST_START_TEST (test_buffer_list_fallback_signal)
list = create_buffer_list ();
fail_unless (gst_pad_push_list (mysrcpad, list) == GST_FLOW_OK);
fail_unless_equals_int (counter, 3);
fail_unless_equals_int (counter, 1);
ASSERT_SET_STATE (sink, GST_STATE_NULL, GST_STATE_CHANGE_SUCCESS);
cleanup_appsink (sink);
@ -422,7 +515,9 @@ appsink_suite (void)
tcase_add_test (tc_chain, test_notify0);
tcase_add_test (tc_chain, test_notify1);
tcase_add_test (tc_chain, test_buffer_list_fallback);
tcase_add_test (tc_chain, test_buffer_list_support);
tcase_add_test (tc_chain, test_buffer_list_fallback_signal);
tcase_add_test (tc_chain, test_buffer_list_signal);
tcase_add_test (tc_chain, test_segment);
tcase_add_test (tc_chain, test_pull_with_timeout);

View file

@ -1,4 +1,5 @@
EXPORTS
gst_app_sink_get_buffer_list_support
gst_app_sink_get_caps
gst_app_sink_get_drop
gst_app_sink_get_emit_signals
@ -8,6 +9,7 @@ EXPORTS
gst_app_sink_is_eos
gst_app_sink_pull_preroll
gst_app_sink_pull_sample
gst_app_sink_set_buffer_list_support
gst_app_sink_set_callbacks
gst_app_sink_set_caps
gst_app_sink_set_drop