appsrc: add support for pushing buffer lists

And samples that carry buffer lists.

https://bugzilla.gnome.org/show_bug.cgi?id=752363
This commit is contained in:
Tim-Philipp Müller 2017-08-31 11:12:12 +01:00
parent 0ef44cefc9
commit b60cc0274c
4 changed files with 264 additions and 23 deletions

View file

@ -72,6 +72,7 @@ gst_app_src_set_emit_signals
GstAppSrcCallbacks
gst_app_src_set_callbacks
gst_app_src_push_buffer
gst_app_src_push_buffer_list
gst_app_src_push_sample
gst_app_src_end_of_stream
<SUBSECTION Standard>

View file

@ -149,6 +149,7 @@ enum
SIGNAL_PUSH_BUFFER,
SIGNAL_END_OF_STREAM,
SIGNAL_PUSH_SAMPLE,
SIGNAL_PUSH_BUFFER_LIST,
LAST_SIGNAL
};
@ -224,6 +225,8 @@ static gboolean gst_app_src_event (GstBaseSrc * src, GstEvent * event);
static GstFlowReturn gst_app_src_push_buffer_action (GstAppSrc * appsrc,
GstBuffer * buffer);
static GstFlowReturn gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
GstBufferList * buffer_list);
static GstFlowReturn gst_app_src_push_sample_action (GstAppSrc * appsrc,
GstSample * sample);
@ -465,6 +468,27 @@ gst_app_src_class_init (GstAppSrcClass * klass)
push_buffer), NULL, NULL, NULL,
GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER);
/**
* GstAppSrc::push-buffer-list:
* @appsrc: the appsrc
* @buffer_list: a buffer list to push
*
* Adds a buffer list to the queue of buffers and buffer lists that the
* appsrc element will push to its source pad. This function does not take
* ownership of the buffer list so the buffer list needs to be unreffed
* after calling this function.
*
* When the block property is TRUE, this function can block until free space
* becomes available in the queue.
*
* Since: 1.14
*/
gst_app_src_signals[SIGNAL_PUSH_BUFFER_LIST] =
g_signal_new ("push-buffer-list", G_TYPE_FROM_CLASS (klass),
G_SIGNAL_RUN_LAST | G_SIGNAL_ACTION, G_STRUCT_OFFSET (GstAppSrcClass,
push_buffer_list), NULL, NULL, NULL,
GST_TYPE_FLOW_RETURN, 1, GST_TYPE_BUFFER_LIST);
/**
* GstAppSrc::push-sample:
* @appsrc: the appsrc
@ -527,6 +551,7 @@ gst_app_src_class_init (GstAppSrcClass * klass)
basesrc_class->event = gst_app_src_event;
klass->push_buffer = gst_app_src_push_buffer_action;
klass->push_buffer_list = gst_app_src_push_buffer_list_action;
klass->push_sample = gst_app_src_push_sample_action;
klass->end_of_stream = gst_app_src_end_of_stream;
@ -1152,7 +1177,7 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
guint buf_size;
GstMiniObject *obj = g_queue_pop_head (priv->queue);
if (!GST_IS_BUFFER (obj)) {
if (GST_IS_CAPS (obj)) {
GstCaps *next_caps = GST_CAPS (obj);
gboolean caps_changed = TRUE;
@ -1181,10 +1206,25 @@ gst_app_src_create (GstBaseSrc * bsrc, guint64 offset, guint size,
continue;
}
*buf = GST_BUFFER (obj);
buf_size = gst_buffer_get_size (*buf);
if (GST_IS_BUFFER (obj)) {
*buf = GST_BUFFER (obj);
buf_size = gst_buffer_get_size (*buf);
GST_LOG_OBJECT (appsrc, "have buffer %p of size %u", *buf, buf_size);
} else {
GstBufferList *buffer_list;
GST_DEBUG_OBJECT (appsrc, "we have buffer %p of size %u", *buf, buf_size);
g_assert (GST_IS_BUFFER_LIST (obj));
buffer_list = GST_BUFFER_LIST (obj);
buf_size = gst_buffer_list_calculate_size (buffer_list);
GST_LOG_OBJECT (appsrc, "have buffer list %p of size %u, %u buffers",
buffer_list, buf_size, gst_buffer_list_length (buffer_list));
gst_base_src_submit_buffer_list (bsrc, buffer_list);
*buf = NULL;
}
priv->queued_bytes -= buf_size;
@ -1681,17 +1721,28 @@ gst_app_src_get_emit_signals (GstAppSrc * appsrc)
}
static GstFlowReturn
gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
gboolean steal_ref)
gst_app_src_push_internal (GstAppSrc * appsrc, GstBuffer * buffer,
GstBufferList * buflist, gboolean steal_ref)
{
gboolean first = TRUE;
GstAppSrcPrivate *priv;
g_return_val_if_fail (GST_IS_APP_SRC (appsrc), GST_FLOW_ERROR);
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
priv = appsrc->priv;
if (buffer != NULL)
g_return_val_if_fail (GST_IS_BUFFER (buffer), GST_FLOW_ERROR);
else
g_return_val_if_fail (GST_IS_BUFFER_LIST (buflist), GST_FLOW_ERROR);
if (buflist != NULL) {
if (gst_buffer_list_length (buflist) == 0)
return GST_FLOW_OK;
buffer = gst_buffer_list_get (buflist, 0);
}
if (GST_BUFFER_DTS (buffer) == GST_CLOCK_TIME_NONE &&
GST_BUFFER_PTS (buffer) == GST_CLOCK_TIME_NONE &&
gst_base_src_get_do_timestamp (GST_BASE_SRC_CAST (appsrc))) {
@ -1710,14 +1761,25 @@ gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
now = 0;
gst_object_unref (clock);
if (!steal_ref)
buffer = gst_buffer_copy (buffer);
else
buffer = gst_buffer_make_writable (buffer);
if (buflist == NULL) {
if (!steal_ref) {
buffer = gst_buffer_copy (buffer);
steal_ref = TRUE;
} else {
buffer = gst_buffer_make_writable (buffer);
}
} else {
if (!steal_ref) {
buflist = gst_buffer_list_copy (buflist);
steal_ref = TRUE;
} else {
buflist = gst_buffer_list_make_writable (buflist);
}
buffer = gst_buffer_list_get_writable (buflist, 0);
}
GST_BUFFER_PTS (buffer) = now;
GST_BUFFER_DTS (buffer) = now;
steal_ref = TRUE;
} else {
GST_WARNING_OBJECT (appsrc,
"do-timestamp=TRUE but buffers are provided before "
@ -1774,11 +1836,19 @@ gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
break;
}
GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
if (!steal_ref)
gst_buffer_ref (buffer);
g_queue_push_tail (priv->queue, buffer);
priv->queued_bytes += gst_buffer_get_size (buffer);
if (buflist != NULL) {
GST_DEBUG_OBJECT (appsrc, "queueing buffer list %p", buflist);
if (!steal_ref)
gst_buffer_list_ref (buflist);
g_queue_push_tail (priv->queue, buflist);
priv->queued_bytes += gst_buffer_list_calculate_size (buflist);
} else {
GST_DEBUG_OBJECT (appsrc, "queueing buffer %p", buffer);
if (!steal_ref)
gst_buffer_ref (buffer);
g_queue_push_tail (priv->queue, buffer);
priv->queued_bytes += gst_buffer_get_size (buffer);
}
g_cond_broadcast (&priv->cond);
g_mutex_unlock (&priv->mutex);
@ -1803,9 +1873,17 @@ eos:
}
}
static GstFlowReturn
gst_app_src_push_buffer_full (GstAppSrc * appsrc, GstBuffer * buffer,
gboolean steal_ref)
{
return gst_app_src_push_internal (appsrc, buffer, NULL, steal_ref);
}
static GstFlowReturn
gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
{
GstBufferList *buffer_list;
GstBuffer *buffer;
GstCaps *caps;
@ -1819,12 +1897,15 @@ gst_app_src_push_sample_internal (GstAppSrc * appsrc, GstSample * sample)
}
buffer = gst_sample_get_buffer (sample);
if (buffer == NULL) {
GST_WARNING_OBJECT (appsrc, "received sample without buffer");
return GST_FLOW_OK;
}
if (buffer != NULL)
return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
buffer_list = gst_sample_get_buffer_list (sample);
if (buffer_list != NULL)
return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
GST_WARNING_OBJECT (appsrc, "received sample without buffer or buffer list");
return GST_FLOW_OK;
}
/**
@ -1848,6 +1929,30 @@ gst_app_src_push_buffer (GstAppSrc * appsrc, GstBuffer * buffer)
return gst_app_src_push_buffer_full (appsrc, buffer, TRUE);
}
/**
* gst_app_src_push_buffer_list:
* @appsrc: a #GstAppSrc
* @buffer_list: (transfer full): a #GstBufferList to push
*
* Adds a buffer list to the queue of buffers and buffer lists that the
* appsrc element will push to its source pad. This function takes ownership
* of @buffer_list.
*
* When the block property is TRUE, this function can block until free
* space becomes available in the queue.
*
* Returns: #GST_FLOW_OK when the buffer list was successfuly queued.
* #GST_FLOW_FLUSHING when @appsrc is not PAUSED or PLAYING.
* #GST_FLOW_EOS when EOS occured.
*
* Since: 1.14
*/
GstFlowReturn
gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list)
{
return gst_app_src_push_internal (appsrc, NULL, buffer_list, TRUE);
}
/**
* gst_app_src_push_sample:
* @appsrc: a #GstAppSrc
@ -1883,6 +1988,15 @@ gst_app_src_push_buffer_action (GstAppSrc * appsrc, GstBuffer * buffer)
return gst_app_src_push_buffer_full (appsrc, buffer, FALSE);
}
/* push a buffer list without stealing the ref of the buffer list. This is
* used for the action signal. */
static GstFlowReturn
gst_app_src_push_buffer_list_action (GstAppSrc * appsrc,
GstBufferList * buffer_list)
{
return gst_app_src_push_internal (appsrc, NULL, buffer_list, FALSE);
}
/* push a sample without stealing the ref. This is used for the
* action signal. */
static GstFlowReturn

View file

@ -111,9 +111,10 @@ struct _GstAppSrcClass
GstFlowReturn (*push_buffer) (GstAppSrc *appsrc, GstBuffer *buffer);
GstFlowReturn (*end_of_stream) (GstAppSrc *appsrc);
GstFlowReturn (*push_sample) (GstAppSrc *appsrc, GstSample *sample);
GstFlowReturn (*push_buffer_list) (GstAppSrc *appsrc, GstBufferList *buffer_list);
/*< private >*/
gpointer _gst_reserved[GST_PADDING-1];
gpointer _gst_reserved[GST_PADDING-2];
};
GST_EXPORT
@ -167,6 +168,9 @@ gboolean gst_app_src_get_emit_signals (GstAppSrc *appsrc);
GST_EXPORT
GstFlowReturn gst_app_src_push_buffer (GstAppSrc *appsrc, GstBuffer *buffer);
GST_EXPORT
GstFlowReturn gst_app_src_push_buffer_list (GstAppSrc * appsrc, GstBufferList * buffer_list);
GST_EXPORT
GstFlowReturn gst_app_src_end_of_stream (GstAppSrc *appsrc);

View file

@ -485,6 +485,127 @@ GST_START_TEST (test_appsrc_blocked_on_caps)
GST_END_TEST;
static guint expect_offset;
static gboolean chainlist_called;
static gboolean done;
static gboolean
event_func (GstPad * pad, GstObject * parent, GstEvent * event)
{
GST_LOG ("event %" GST_PTR_FORMAT, event);
if (GST_EVENT_TYPE (event) == GST_EVENT_EOS) {
g_mutex_lock (&check_mutex);
done = TRUE;
g_cond_signal (&check_cond);
g_mutex_unlock (&check_mutex);
}
gst_event_unref (event);
return TRUE;
}
static GstFlowReturn
chain_____func (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
GST_LOG (" buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
++expect_offset;
gst_buffer_unref (buf);
return GST_FLOW_OK;
}
static GstFlowReturn
chainlist_func (GstPad * pad, GstObject * parent, GstBufferList * list)
{
guint i, len;
len = gst_buffer_list_length (list);
GST_DEBUG ("buffer list with %u buffers", len);
for (i = 0; i < len; ++i) {
GstBuffer *buf = gst_buffer_list_get (list, i);
GST_LOG (" buffer # %3u", (guint) GST_BUFFER_OFFSET (buf));
fail_unless_equals_int (GST_BUFFER_OFFSET (buf), expect_offset);
++expect_offset;
}
chainlist_called = TRUE;
gst_buffer_list_unref (list);
return GST_FLOW_OK;
}
GST_START_TEST (test_appsrc_push_buffer_list)
{
GstElement *src;
guint i;
src = gst_element_factory_make ("appsrc", "appsrc");
mysinkpad = gst_check_setup_sink_pad (src, &sinktemplate);
gst_pad_set_chain_function (mysinkpad, chain_____func);
gst_pad_set_chain_list_function (mysinkpad, chainlist_func);
gst_pad_set_event_function (mysinkpad, event_func);
gst_pad_set_active (mysinkpad, TRUE);
expect_offset = 0;
chainlist_called = FALSE;
done = FALSE;
gst_element_set_state (src, GST_STATE_PLAYING);
#define NUM_BUFFERS 100
for (i = 0; i < NUM_BUFFERS; ++i) {
GstFlowReturn flow;
GstBuffer *buf;
buf = gst_buffer_new ();
GST_BUFFER_OFFSET (buf) = i;
if (i == 0 || g_random_boolean ()) {
GstBufferList *buflist = gst_buffer_list_new ();
gst_buffer_list_add (buflist, buf);
buf = gst_buffer_new ();
GST_BUFFER_OFFSET (buf) = ++i;
gst_buffer_list_add (buflist, buf);
if (g_random_boolean ()) {
flow = gst_app_src_push_buffer_list (GST_APP_SRC (src), buflist);
} else {
g_signal_emit_by_name (src, "push-buffer-list", buflist, &flow);
gst_buffer_list_unref (buflist);
}
} else {
flow = gst_app_src_push_buffer (GST_APP_SRC (src), buf);
}
fail_unless_equals_int (flow, GST_FLOW_OK);
}
gst_app_src_end_of_stream (GST_APP_SRC (src));
g_mutex_lock (&check_mutex);
while (!done)
g_cond_wait (&check_cond, &check_mutex);
g_mutex_unlock (&check_mutex);
gst_element_set_state (src, GST_STATE_NULL);
/* make sure the buffer list was pushed out as list! */
fail_unless (chainlist_called);
/* can be NUM_BUFFERS or NUM_BUFFERS + 1 depending on whether last item
* was buffer list or not */
fail_unless (expect_offset >= NUM_BUFFERS);
gst_check_teardown_sink_pad (src);
gst_object_unref (src);
}
GST_END_TEST;
static Suite *
appsrc_suite (void)
{
@ -495,6 +616,7 @@ appsrc_suite (void)
tcase_add_test (tc_chain, test_appsrc_set_caps_twice);
tcase_add_test (tc_chain, test_appsrc_caps_in_push_modes);
tcase_add_test (tc_chain, test_appsrc_blocked_on_caps);
tcase_add_test (tc_chain, test_appsrc_push_buffer_list);
if (RUNNING_ON_VALGRIND)
tcase_add_loop_test (tc_chain, test_appsrc_block_deadlock, 0, 5);