basetransform: Split input buffer processing from output generation

Allow for sub-classes which want to collate incoming buffers or
split them into multiple output buffers by separating the input
buffer submission from output buffer generation and allowing
for looping of one of the phases depending on pull or push mode
operation.

https://bugzilla.gnome.org/show_bug.cgi?id=750033
This commit is contained in:
Jan Schmidt 2015-05-23 01:08:29 +10:00
parent ade4a83e63
commit a198803bd6
6 changed files with 375 additions and 89 deletions

View file

@ -26,7 +26,14 @@
* @short_description: Base class for simple transform filters
* @see_also: #GstBaseSrc, #GstBaseSink
*
* This base class is for filter elements that process data.
* This base class is for filter elements that process data. Elements
* that are suitable for implementation using #GstBaseTransform are ones
* where the size and caps of the output is known entirely from the input
* caps and buffer sizes. These include elements that directly transform
* one buffer into another, modify the contents of a buffer in-place, as
* well as elements that collate multiple input buffers into one output buffer,
* or that expand one input buffer into multiple output buffers. See below
* for more concrete use cases.
*
* It provides for:
* <itemizedlist>
@ -275,6 +282,10 @@ static GstElementClass *parent_class = NULL;
static void gst_base_transform_class_init (GstBaseTransformClass * klass);
static void gst_base_transform_init (GstBaseTransform * trans,
GstBaseTransformClass * klass);
static GstFlowReturn default_submit_input_buffer (GstBaseTransform * trans,
gboolean is_discont, GstBuffer * input);
static GstFlowReturn default_generate_output (GstBaseTransform * trans,
GstBuffer ** outbuf);
/* we can't use G_DEFINE_ABSTRACT_TYPE because we need the klass in the _init
* method to get to the padtemplates */
@ -418,6 +429,8 @@ gst_base_transform_class_init (GstBaseTransformClass * klass)
klass->prepare_output_buffer =
GST_DEBUG_FUNCPTR (default_prepare_output_buffer);
klass->copy_metadata = GST_DEBUG_FUNCPTR (default_copy_metadata);
klass->submit_input_buffer = GST_DEBUG_FUNCPTR (default_submit_input_buffer);
klass->generate_output = GST_DEBUG_FUNCPTR (default_generate_output);
}
static void
@ -1977,24 +1990,17 @@ gst_base_transform_src_eventfunc (GstBaseTransform * trans, GstEvent * event)
return ret;
}
/* perform a transform on @inbuf and put the result in @outbuf.
*
* This function is common to the push and pull-based operations.
*
* This function takes ownership of @inbuf */
/* Takes the input buffer */
static GstFlowReturn
gst_base_transform_handle_buffer (GstBaseTransform * trans, GstBuffer * inbuf,
GstBuffer ** outbuf)
default_submit_input_buffer (GstBaseTransform * trans, gboolean is_discont,
GstBuffer * inbuf)
{
GstBaseTransformClass *bclass;
GstBaseTransformClass *bclass = GST_BASE_TRANSFORM_GET_CLASS (trans);
GstBaseTransformPrivate *priv = trans->priv;
GstFlowReturn ret = GST_FLOW_OK;
gboolean want_in_place;
gboolean reconfigure;
GstClockTime running_time;
GstClockTime timestamp;
gboolean reconfigure;
bclass = GST_BASE_TRANSFORM_GET_CLASS (trans);
reconfigure = gst_pad_check_reconfigure (trans->srcpad);
@ -2034,12 +2040,6 @@ no_reconfigure:
if (!priv->negotiated && !priv->passthrough && (bclass->set_caps != NULL))
goto not_negotiated;
/* Set discont flag so we can mark the outgoing buffer */
if (GST_BUFFER_IS_DISCONT (inbuf)) {
GST_DEBUG_OBJECT (trans, "got DISCONT buffer %p", inbuf);
priv->discont = TRUE;
}
/* can only do QoS if the segment is in TIME */
if (trans->segment.format != GST_FORMAT_TIME)
goto no_qos;
@ -2093,11 +2093,49 @@ no_reconfigure:
/* mark discont for next buffer */
priv->discont = TRUE;
ret = GST_BASE_TRANSFORM_FLOW_DROPPED;
goto skip;
}
}
no_qos:
/* Stash input buffer where the default generate_output
* function can find it */
if (trans->queued_buf)
gst_buffer_unref (trans->queued_buf);
trans->queued_buf = inbuf;
return ret;
skip:
gst_buffer_unref (inbuf);
return ret;
not_negotiated:
{
gst_buffer_unref (inbuf);
GST_ELEMENT_WARNING (trans, STREAM, FORMAT,
("not negotiated"), ("not negotiated"));
return GST_FLOW_NOT_NEGOTIATED;
}
}
static GstFlowReturn
default_generate_output (GstBaseTransform * trans, GstBuffer ** outbuf)
{
GstBaseTransformClass *bclass = GST_BASE_TRANSFORM_GET_CLASS (trans);
GstBaseTransformPrivate *priv = trans->priv;
GstFlowReturn ret = GST_FLOW_OK;
GstBuffer *inbuf;
gboolean want_in_place;
/* Retrieve stashed input buffer, if the default submit_input_buffer
* was run. Takes ownership back from there */
inbuf = trans->queued_buf;
trans->queued_buf = NULL;
/* This default processing method needs one input buffer to feed to
* the transform functions, we can't do anything without it */
if (inbuf == NULL)
return GST_FLOW_OK;
/* first try to allocate an output buffer based on the currently negotiated
* format. outbuf will contain a buffer suitable for doing the configured
@ -2141,7 +2179,6 @@ no_qos:
}
}
skip:
/* only unref input buffer if we allocated a new outbuf buffer. If we reused
* the input buffer, no refcount is changed to keep the input buffer writable
* when needed. */
@ -2151,14 +2188,6 @@ skip:
return ret;
/* ERRORS */
not_negotiated:
{
gst_buffer_unref (inbuf);
*outbuf = NULL;
GST_ELEMENT_WARNING (trans, STREAM, FORMAT,
("not negotiated"), ("not negotiated"));
return GST_FLOW_NOT_NEGOTIATED;
}
no_prepare:
{
gst_buffer_unref (inbuf);
@ -2174,6 +2203,8 @@ no_buffer:
gst_flow_get_name (ret));
return ret;
}
return GST_FLOW_OK;
}
/* FIXME, getrange is broken, need to pull range from the other
@ -2183,23 +2214,65 @@ static GstFlowReturn
gst_base_transform_getrange (GstPad * pad, GstObject * parent, guint64 offset,
guint length, GstBuffer ** buffer)
{
GstBaseTransform *trans;
GstBaseTransformClass *klass;
GstBaseTransformClass *klass = GST_BASE_TRANSFORM_GET_CLASS (parent);
GstBaseTransform *trans = GST_BASE_TRANSFORM (parent);
GstBaseTransformPrivate *priv = trans->priv;
GstFlowReturn ret;
GstBuffer *inbuf = NULL;
GstBuffer *outbuf = NULL;
trans = GST_BASE_TRANSFORM (parent);
/* Try and generate a buffer, if the sub-class wants more data,
* pull some and repeat until a buffer (or error) is produced */
do {
ret = klass->generate_output (trans, &outbuf);
/* Consume the DROPPED return value and go get more data */
if (ret == GST_BASE_TRANSFORM_FLOW_DROPPED)
ret = GST_FLOW_OK;
if (ret != GST_FLOW_OK || outbuf != NULL)
break;
/* No buffer generated, try and pull data */
ret = gst_pad_pull_range (trans->sinkpad, offset, length, &inbuf);
if (G_UNLIKELY (ret != GST_FLOW_OK))
goto pull_error;
klass = GST_BASE_TRANSFORM_GET_CLASS (trans);
if (klass->before_transform)
klass->before_transform (trans, inbuf);
ret = gst_base_transform_handle_buffer (trans, inbuf, buffer);
/* Set discont flag so we can mark the next outgoing buffer */
if (GST_BUFFER_IS_DISCONT (inbuf)) {
GST_DEBUG_OBJECT (trans, "got DISCONT buffer %p", inbuf);
priv->discont = TRUE;
}
/* FIXME: Input offsets and lengths need to be translated, as per
* the FIXME above. For now, just advance somewhat */
offset += gst_buffer_get_size (inbuf);
ret = klass->submit_input_buffer (trans, priv->discont, inbuf);
if (ret != GST_FLOW_OK) {
if (ret == GST_BASE_TRANSFORM_FLOW_DROPPED)
ret = GST_FLOW_OK;
goto done;
}
} while (ret == GST_FLOW_OK && outbuf == NULL);
*buffer = outbuf;
if (outbuf) {
/* apply DISCONT flag if the buffer is not yet marked as such */
if (priv->discont) {
GST_DEBUG_OBJECT (trans, "we have a pending DISCONT");
if (!GST_BUFFER_IS_DISCONT (outbuf)) {
GST_DEBUG_OBJECT (trans, "marking DISCONT on output buffer");
outbuf = gst_buffer_make_writable (outbuf);
GST_BUFFER_FLAG_SET (outbuf, GST_BUFFER_FLAG_DISCONT);
}
priv->discont = FALSE;
}
priv->processed++;
}
done:
return ret;
@ -2212,20 +2285,21 @@ pull_error:
}
}
/* The flow of the chain function is the reverse of the
* getrange() function - we have data, feed it to the sub-class
* and then iterate, pushing buffers it generates until it either
* wants more data or returns an error */
static GstFlowReturn
gst_base_transform_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
{
GstBaseTransform *trans;
GstBaseTransformClass *klass;
GstBaseTransformPrivate *priv;
GstBaseTransform *trans = GST_BASE_TRANSFORM (parent);
GstBaseTransformClass *klass = GST_BASE_TRANSFORM_GET_CLASS (trans);
GstBaseTransformPrivate *priv = trans->priv;
GstFlowReturn ret;
GstClockTime position = GST_CLOCK_TIME_NONE;
GstClockTime timestamp, duration;
GstBuffer *outbuf = NULL;
trans = GST_BASE_TRANSFORM (parent);
priv = trans->priv;
timestamp = GST_BUFFER_TIMESTAMP (buffer);
duration = GST_BUFFER_DURATION (buffer);
@ -2237,12 +2311,24 @@ gst_base_transform_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
position = timestamp;
}
klass = GST_BASE_TRANSFORM_GET_CLASS (trans);
if (klass->before_transform)
klass->before_transform (trans, buffer);
/* protect transform method and concurrent buffer alloc */
ret = gst_base_transform_handle_buffer (trans, buffer, &outbuf);
/* Set discont flag so we can mark the outgoing buffer */
if (GST_BUFFER_IS_DISCONT (buffer)) {
GST_DEBUG_OBJECT (trans, "got DISCONT buffer %p", buffer);
priv->discont = TRUE;
}
/* Takes ownership of input buffer */
ret = klass->submit_input_buffer (trans, priv->discont, buffer);
if (ret != GST_FLOW_OK)
goto done;
do {
outbuf = NULL;
ret = klass->generate_output (trans, &outbuf);
/* outbuf can be NULL, this means a dropped buffer, if we have a buffer but
* GST_BASE_TRANSFORM_FLOW_DROPPED we will not push either. */
@ -2284,7 +2370,9 @@ gst_base_transform_chain (GstPad * pad, GstObject * parent, GstBuffer * buffer)
gst_buffer_unref (outbuf);
}
}
} while (ret == GST_FLOW_OK && outbuf != NULL);
done:
/* convert internal flow to OK and mark discont for the next buffer. */
if (ret == GST_BASE_TRANSFORM_FLOW_DROPPED) {
GST_DEBUG_OBJECT (trans, "dropped a buffer, marking DISCONT");
@ -2387,6 +2475,9 @@ gst_base_transform_activate (GstBaseTransform * trans, gboolean active)
gst_caps_replace (&priv->cache_caps1, NULL);
gst_caps_replace (&priv->cache_caps2, NULL);
/* Make sure any left over buffer is freed */
gst_buffer_replace (&trans->queued_buf, NULL);
if (priv->pad_mode != GST_PAD_MODE_NONE && bclass->stop)
result &= bclass->stop (trans);

View file

@ -90,11 +90,14 @@ struct _GstBaseTransform {
/* MT-protected (with STREAM_LOCK) */
gboolean have_segment;
GstSegment segment;
/* Default submit_input_buffer places the buffer here,
* for consumption by the generate_output method: */
GstBuffer *queued_buf;
/*< private >*/
GstBaseTransformPrivate *priv;
gpointer _gst_reserved[GST_PADDING_LARGE];
gpointer _gst_reserved[GST_PADDING_LARGE-1];
};
/**
@ -190,6 +193,23 @@ struct _GstBaseTransform {
* @transform_ip: Required if the element operates in-place.
* Transform the incoming buffer in-place.
*
* @submit_input_buffer: Function which accepts a new input buffer and pre-processes it.
* The default implementation performs caps (re)negotiation, then
* QoS if needed, and places the input buffer into the @queued_buf
* member variable. If the buffer is dropped due to QoS, it returns
* GST_BASE_TRANSFORM_FLOW_DROPPED. If this input buffer is not
* contiguous with any previous input buffer, then @is_discont
* is set to #TRUE.
* @generate_output: Called after each new input buffer is submitted repeatedly
* until it either generates an error or fails to generate an output
* buffer. The default implementation takes the contents of the
* @queued_buf variable, generates an output buffer if needed
* by calling the class @prepare_output_buffer, and then
* calls either @transform or @transform_ip. Elements that don't
* do 1-to-1 transformations on input to output buffers can either
* return GST_BASE_TRANSFORM_FLOW_DROPPED or simply not generate
* an output buffer until they are ready to do so.
*
* Subclasses can override any of the available virtual methods or not, as
* needed. At minimum either @transform or @transform_ip need to be overridden.
* If the element can overwrite the input data with the results (data is of the
@ -258,8 +278,11 @@ struct _GstBaseTransformClass {
GstBuffer *outbuf);
GstFlowReturn (*transform_ip) (GstBaseTransform *trans, GstBuffer *buf);
GstFlowReturn (*submit_input_buffer) (GstBaseTransform *trans, gboolean is_discont, GstBuffer *input);
GstFlowReturn (*generate_output) (GstBaseTransform *trans, GstBuffer **outbuf);
/*< private >*/
gpointer _gst_reserved[GST_PADDING_LARGE];
gpointer _gst_reserved[GST_PADDING_LARGE - 2];
};
GType gst_base_transform_get_type (void);

View file

@ -162,6 +162,7 @@ check_PROGRAMS = \
libs/gstnettimeprovider \
libs/gsttestclock \
libs/transform1 \
libs/transform2 \
tools/gstinspect
# failing tests

View file

@ -20,6 +20,7 @@ gsttestclock
libsabi
sparsefile
transform1
transform2
typefindhelper
queuearray
*.check.xml

View file

@ -69,6 +69,10 @@ static gboolean (*klass_transform_size) (GstBaseTransform * trans,
GstPadDirection direction, GstCaps * caps, gsize size, GstCaps * othercaps,
gsize * othersize) = NULL;
static gboolean klass_passthrough_on_same_caps = FALSE;
GstFlowReturn (*klass_submit_input_buffer) (GstBaseTransform * trans,
gboolean is_discont, GstBuffer * input) = NULL;
GstFlowReturn (*klass_generate_output) (GstBaseTransform * trans,
GstBuffer ** outbuf) = NULL;
static GstStaticPadTemplate *sink_template = &gst_test_trans_sink_template;
static GstStaticPadTemplate *src_template = &gst_test_trans_src_template;
@ -101,6 +105,10 @@ gst_test_trans_class_init (GstTestTransClass * klass)
trans_class->transform_size = klass_transform_size;
if (klass_set_caps != NULL)
trans_class->set_caps = klass_set_caps;
if (klass_submit_input_buffer != NULL)
trans_class->submit_input_buffer = klass_submit_input_buffer;
if (klass_generate_output)
trans_class->generate_output = klass_generate_output;
}
static void

View file

@ -0,0 +1,162 @@
/* GStreamer
*
* Unit tests for basetransform collation/separation
*
* Copyright (C) 2008 Wim Taymans <wim.taymans@gmail.com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/gst.h>
#include <gst/check/gstcheck.h>
#include <gst/base/gstbasetransform.h>
#include "test_transform.c"
GstBuffer *buf1, *buf2;
/* Output buffers are twice the size as input */
static gboolean
transform_size_collate (GstBaseTransform * trans, GstPadDirection direction,
GstCaps * caps, gsize size, GstCaps * othercaps, gsize * othersize)
{
if (direction == GST_PAD_SINK) {
*othersize = size * 2;
} else {
*othersize = size / 2;
}
return TRUE;
}
static GstFlowReturn
collate_submit_input_buffer (GstBaseTransform * trans,
gboolean is_discont, GstBuffer * input)
{
GstFlowReturn ret =
GST_BASE_TRANSFORM_CLASS
(gst_test_trans_parent_class)->submit_input_buffer (trans, is_discont,
input);
if (ret != GST_FLOW_OK)
return ret;
fail_unless (buf1 == NULL || buf2 == NULL);
if (buf1 == NULL) {
buf1 = trans->queued_buf;
trans->queued_buf = NULL;
} else if (buf2 == NULL) {
buf2 = trans->queued_buf;
trans->queued_buf = NULL;
}
return ret;
}
static GstFlowReturn
collate_generate_output (GstBaseTransform * trans, GstBuffer ** outbuf)
{
/* Not ready to generate output unless we've collected 2 buffers */
if (buf1 == NULL || buf2 == NULL)
return GST_BASE_TRANSFORM_FLOW_DROPPED;
fail_unless (buf1 != NULL && buf2 != NULL);
*outbuf = gst_buffer_new_and_alloc (40);
gst_buffer_unref (buf1);
gst_buffer_unref (buf2);
buf1 = NULL;
buf2 = NULL;
return GST_FLOW_OK;
}
/* Take 2 input buffers, generate 1 output
* buffer with twice the size
*/
GST_START_TEST (basetransform_chain_collate)
{
TestTransData *trans;
GstBuffer *buffer;
GstFlowReturn res;
GstCaps *incaps, *outcaps;
src_template = &gst_test_trans_src_template;
klass_passthrough_on_same_caps = FALSE;
klass_transform_size = transform_size_collate;
klass_submit_input_buffer = collate_submit_input_buffer;
klass_generate_output = collate_generate_output;
trans = gst_test_trans_new ();
incaps = gst_caps_new_empty_simple ("foo/x-bar");
outcaps = gst_caps_new_empty_simple ("foo/x-bar");
gst_test_trans_push_segment (trans);
gst_pad_push_event (trans->srcpad, gst_event_new_flush_start ());
gst_pad_push_event (trans->srcpad, gst_event_new_flush_stop (TRUE));
GST_DEBUG_OBJECT (trans, "buffer with caps %" GST_PTR_FORMAT, incaps);
gst_test_trans_setcaps (trans, incaps);
gst_test_trans_push_segment (trans);
buffer = gst_buffer_new_and_alloc (20);
res = gst_test_trans_push (trans, buffer);
fail_unless (res == GST_FLOW_OK);
/* We do not expect an output buffer after only pushing one input */
buffer = gst_test_trans_pop (trans);
fail_unless (buffer == NULL);
buffer = gst_buffer_new_and_alloc (20);
res = gst_test_trans_push (trans, buffer);
fail_unless (res == GST_FLOW_OK);
buffer = gst_test_trans_pop (trans);
fail_unless (buffer != NULL);
fail_unless (gst_buffer_get_size (buffer) == 40);
/* output buffer has refcount 1 */
fail_unless (GST_MINI_OBJECT_REFCOUNT_VALUE (buffer) == 1);
gst_buffer_unref (buffer);
gst_caps_unref (incaps);
gst_caps_unref (outcaps);
gst_test_trans_free (trans);
}
GST_END_TEST;
static Suite *
gst_basetransform_collate_suite (void)
{
Suite *s = suite_create ("GstBaseTransformCollate");
TCase *tc = tcase_create ("general");
suite_add_tcase (s, tc);
tcase_add_test (tc, basetransform_chain_collate);
return s;
}
GST_CHECK_MAIN (gst_basetransform_collate);