flowcombiner: add GstFlowCombiner

Adds a utility struct that is capable of storing and aggregating flow returns
associated with pads.

This way all demuxers will have a standard function to use and have the
same expected results.

Includes tests.

https://bugzilla.gnome.org/show_bug.cgi?id=709224
This commit is contained in:
Thiago Santos 2014-05-26 12:31:33 -03:00
parent c6f92562b6
commit 9e8bd15c12
7 changed files with 436 additions and 0 deletions

View file

@ -13,6 +13,7 @@ libgstbase_@GST_API_VERSION@_la_SOURCES = \
gstbytewriter.c \
gstcollectpads.c \
gstdataqueue.c \
gstflowcombiner.c \
gstpushsrc.c \
gstqueuearray.c \
gsttypefindhelper.c
@ -36,6 +37,7 @@ libgstbase_@GST_API_VERSION@include_HEADERS = \
gstbytewriter.h \
gstcollectpads.h \
gstdataqueue.h \
gstflowcombiner.h \
gstpushsrc.h \
gstqueuearray.h \
gsttypefindhelper.h

View file

@ -0,0 +1,215 @@
/* GStreamer
*
* Copyright (C) 2014 Samsung Electronics. All rights reserved.
* Author: Thiago Santos <ts.santos@sisa.samsung.com>
*
* gstflowcombiner.c: utility to combine multiple flow returns into a single one
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
/**
* SECTION:gstflowcombiner
* @short_description: Utility to combine multiple flow returns into one
*
* Utility struct to help handling #GstFlowReturn combination. Useful for
* #GstElement<!-- -->s that have multiple source pads and need to combine
* the different #GstFlowReturn for those pads.
*
* #GstFlowCombiner works by using the last #GstFlowReturn for all #GstPad
* it has in its list and computes the combined return value and provides
* it to the caller.
*
* To add a new pad to the #GstFlowCombiner use gst_flow_combiner_add_pad().
* The new #GstPad is stored with a default value of %GST_FLOW_OK.
*
* In case you want a #GstPad to be removed, use gst_flow_combiner_remove_pad().
*
* Please be aware that this struct isn't thread safe as its designed to be
* used by demuxers, those usually will have a single thread operating it.
*
* None of these functions will take refs on the passed #GstPad<!-- -->s, it
* is the caller's responsibility to make sure that the #GstPad exists as long
* as this struct exists.
*
* Aside from reducing the user's code size, the main advantage of using this
* helper struct is to follow the standard rules for #GstFlowReturn combination.
* These rules are:
*
* * %GST_FLOW_EOS: only if all returns are EOS too
* * %GST_FLOW_NOT_LINKED: only if all returns are NOT_LINKED too
* * %GST_FLOW_ERROR or below: if at least one returns an error return
* * %GST_FLOW_NOT_NEGOTIATED: if at least one returns a not-negotiated return
* * %GST_FLOW_FLUSHING: if at least one returns flushing
* * %GST_FLOW_OK: otherwise
*
* %GST_FLOW_ERROR or below, GST_FLOW_NOT_NEGOTIATED and GST_FLOW_FLUSHING are
* returned immediatelly from the gst_flow_combiner_update_flow() function.
*
* Since: 1.4
*/
#include <gst/gst.h>
#include "gstflowcombiner.h"
struct _GstFlowCombiner
{
GQueue pads;
GstFlowReturn last_ret;
};
/**
* gst_flow_combiner_new:
*
* Creates a new #GstFlowCombiner, use gst_flow_combiner_free() to free it.
*
* Returns: A new #GstFlowCombiner
* Since: 1.4
*/
GstFlowCombiner *
gst_flow_combiner_new (void)
{
GstFlowCombiner *combiner = g_slice_new (GstFlowCombiner);
g_queue_init (&combiner->pads);
combiner->last_ret = GST_FLOW_OK;
return combiner;
}
/**
* gst_flow_combiner_free:
* @combiner: the #GstFlowCombiner to free
*
* Frees a #GstFlowCombiner struct and all its internal data.
*
* Since: 1.4
*/
void
gst_flow_combiner_free (GstFlowCombiner * combiner)
{
g_return_if_fail (combiner != NULL);
g_queue_clear (&combiner->pads);
g_slice_free (GstFlowCombiner, combiner);
}
static GstFlowReturn
gst_flow_combiner_get_flow (GstFlowCombiner * combiner)
{
GstFlowReturn cret = GST_FLOW_OK;
gboolean all_eos = TRUE;
gboolean all_notlinked = TRUE;
GList *iter;
GST_DEBUG ("Combining flow returns");
for (iter = combiner->pads.head; iter; iter = iter->next) {
GstFlowReturn fret = GST_PAD_LAST_FLOW_RETURN (iter->data);
if (fret <= GST_FLOW_NOT_NEGOTIATED || fret == GST_FLOW_FLUSHING) {
GST_DEBUG ("Error flow return found, returning");
cret = fret;
goto done;
}
if (fret != GST_FLOW_NOT_LINKED) {
all_notlinked = FALSE;
if (fret != GST_FLOW_EOS)
all_eos = FALSE;
}
}
if (all_notlinked)
cret = GST_FLOW_NOT_LINKED;
else if (all_eos)
cret = GST_FLOW_EOS;
done:
GST_DEBUG ("Combined flow return: %s (%d)", gst_flow_get_name (cret), cret);
return cret;
}
/**
* gst_flow_combiner_update_flow:
* @combiner: the #GstFlowCombiner
* @fret: the latest #GstFlowReturn received for a pad in this #GstFlowCombiner
*
* Computes the combined flow return for the pads in it.
*
* The #GstFlowReturn paramter should be the last flow return update for a pad
* in this #GstFlowCombiner. It will use this value to be able to shortcut some
* combinations and avoid looking over all pads again. e.g. The last combined
* return is the same as the latest obtained #GstFlowReturn.
*
* Returns: The combined #GstFlowReturn
* Since: 1.4
*/
GstFlowReturn
gst_flow_combiner_update_flow (GstFlowCombiner * combiner, GstFlowReturn fret)
{
GstFlowReturn ret;
g_return_val_if_fail (combiner != NULL, GST_FLOW_ERROR);
if (combiner->last_ret == fret) {
return fret;
}
if (fret <= GST_FLOW_NOT_NEGOTIATED || fret == GST_FLOW_FLUSHING) {
ret = fret;
} else {
ret = gst_flow_combiner_get_flow (combiner);
}
combiner->last_ret = ret;
return ret;
}
/**
* gst_flow_combiner_add_pad:
* @combiner: the #GstFlowCombiner
* @pad: (transfer-none): the #GstPad that is being added
*
* Adds a new #GstPad to the #GstFlowCombiner.
*
* Since: 1.4
*/
void
gst_flow_combiner_add_pad (GstFlowCombiner * combiner, GstPad * pad)
{
g_return_if_fail (combiner != NULL);
g_return_if_fail (pad != NULL);
g_queue_push_head (&combiner->pads, pad);
}
/**
* gst_flow_combiner_remove_pad:
* @combiner: the #GstFlowCombiner
* @pad: (transfer-none): the #GstPad to remove
*
* Removes a #GstPad from the #GstFlowCombiner.
*
* Since: 1.4
*/
void
gst_flow_combiner_remove_pad (GstFlowCombiner * combiner, GstPad * pad)
{
g_return_if_fail (combiner != NULL);
g_return_if_fail (pad != NULL);
g_queue_remove (&combiner->pads, pad);
}

View file

@ -0,0 +1,45 @@
/* GStreamer
* Copyright
*
* Copyright (C) 2014 Samsung Electronics. All rights reserved.
* Author: Thiago Santos <ts.santos@sisa.samsung.com>
*
* gstflowcombiner.h: utility to combine multiple flow returns into a single one
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifndef __GST_FLOW_COMBINER_H__
#define __GST_FLOW_COMBINER_H__
#include <glib.h>
#include <gst/gst.h>
G_BEGIN_DECLS
typedef struct _GstFlowCombiner GstFlowCombiner;
GstFlowCombiner * gst_flow_combiner_new (void);
void gst_flow_combiner_free (GstFlowCombiner * combiner);
GstFlowReturn gst_flow_combiner_update_flow (GstFlowCombiner * combiner,
GstFlowReturn fret);
void gst_flow_combiner_add_pad (GstFlowCombiner * combiner, GstPad * pad);
void gst_flow_combiner_remove_pad (GstFlowCombiner * combiner, GstPad * pad);
G_END_DECLS
#endif /* __GST_FLOW_COMBINER_H__ */

View file

@ -143,6 +143,7 @@ check_PROGRAMS = \
libs/bitreader-noinline \
libs/bytereader-noinline \
libs/bytewriter-noinline \
libs/flowcombiner \
libs/sparsefile \
libs/collectpads \
libs/gstnetclientclock \

View file

@ -12,6 +12,7 @@ bytewriter-noinline
gdp
collectpads
controller
flowcombiner
gstlibscpp
gstnetclientclock
gstnettimeprovider

View file

@ -0,0 +1,167 @@
/* GStreamer
*
* Copyright (C) 2014 Samsung Electronics. All rights reserved.
* Author: Thiago Santos <ts.santos@sisa.samsung.com>
*
* flowcombiner.c: Unit test for GstFlowCombiner
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 51 Franklin St, Fifth Floor,
* Boston, MA 02110-1301, USA.
*/
#ifdef HAVE_CONFIG_H
#include "config.h"
#endif
#include <gst/check/gstcheck.h>
#include <gst/base/gstflowcombiner.h>
static GstFlowReturn sink_flowret = GST_FLOW_OK;
#define CHECK_COMBINED_FLOWS(f1, f2, f3, expected) \
G_STMT_START { \
combiner = gst_flow_combiner_new (); \
gst_flow_combiner_add_pad (combiner, pad1); \
gst_flow_combiner_add_pad (combiner, pad2); \
gst_flow_combiner_add_pad (combiner, pad3); \
sink_flowret = f1; \
gst_pad_push (pad1, gst_buffer_new ()); \
gst_flow_combiner_update_flow (combiner, f1); \
sink_flowret = f2; \
gst_pad_push (pad2, gst_buffer_new ()); \
gst_flow_combiner_update_flow (combiner, f2); \
sink_flowret = f3; \
gst_pad_push (pad3, gst_buffer_new ()); \
ret = gst_flow_combiner_update_flow (combiner, f3); \
gst_flow_combiner_free (combiner); \
fail_unless_equals_int (ret, expected); \
} G_STMT_END
static GstFlowReturn
_sink_chain (GstPad * pad, GstObject * parent, GstBuffer * buf)
{
gst_buffer_unref (buf);
return sink_flowret;
}
GST_START_TEST (test_combined_flows)
{
GstFlowReturn ret;
GstFlowCombiner *combiner;
GstPad *pad1, *pad2, *pad3;
GstPad *peer1, *peer2, *peer3;
GstSegment segment;
pad1 = gst_pad_new ("src1", GST_PAD_SRC);
pad2 = gst_pad_new ("src2", GST_PAD_SRC);
pad3 = gst_pad_new ("src3", GST_PAD_SRC);
peer1 = gst_pad_new ("sink1", GST_PAD_SINK);
peer2 = gst_pad_new ("sink2", GST_PAD_SINK);
peer3 = gst_pad_new ("sink3", GST_PAD_SINK);
gst_pad_set_chain_function (peer1, _sink_chain);
gst_pad_set_chain_function (peer2, _sink_chain);
gst_pad_set_chain_function (peer3, _sink_chain);
gst_pad_link (pad1, peer1);
gst_pad_link (pad2, peer2);
gst_pad_link (pad3, peer3);
gst_pad_set_active (peer1, TRUE);
gst_pad_set_active (peer2, TRUE);
gst_pad_set_active (peer3, TRUE);
gst_pad_set_active (pad1, TRUE);
gst_pad_set_active (pad2, TRUE);
gst_pad_set_active (pad3, TRUE);
gst_segment_init (&segment, GST_FORMAT_BYTES);
gst_pad_push_event (pad1, gst_event_new_stream_start ("p1"));
gst_pad_push_event (pad2, gst_event_new_stream_start ("p2"));
gst_pad_push_event (pad3, gst_event_new_stream_start ("p3"));
gst_pad_push_event (pad1, gst_event_new_segment (&segment));
gst_pad_push_event (pad2, gst_event_new_segment (&segment));
gst_pad_push_event (pad3, gst_event_new_segment (&segment));
/* ok */
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_OK, GST_FLOW_OK, GST_FLOW_OK);
/* not linked */
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_NOT_LINKED, GST_FLOW_OK,
GST_FLOW_OK);
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_EOS, GST_FLOW_OK, GST_FLOW_OK);
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_NOT_LINKED, GST_FLOW_NOT_LINKED,
GST_FLOW_OK);
CHECK_COMBINED_FLOWS (GST_FLOW_NOT_LINKED, GST_FLOW_NOT_LINKED,
GST_FLOW_NOT_LINKED, GST_FLOW_NOT_LINKED);
/* errors */
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_ERROR, GST_FLOW_OK,
GST_FLOW_ERROR);
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_CUSTOM_ERROR, GST_FLOW_OK,
GST_FLOW_CUSTOM_ERROR);
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_NOT_NEGOTIATED, GST_FLOW_OK,
GST_FLOW_NOT_NEGOTIATED);
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_OK, GST_FLOW_NOT_NEGOTIATED,
GST_FLOW_NOT_NEGOTIATED);
CHECK_COMBINED_FLOWS (GST_FLOW_NOT_LINKED, GST_FLOW_ERROR, GST_FLOW_OK,
GST_FLOW_ERROR);
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_OK, GST_FLOW_ERROR,
GST_FLOW_ERROR);
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_OK, GST_FLOW_CUSTOM_ERROR,
GST_FLOW_CUSTOM_ERROR);
/* flushing */
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_OK, GST_FLOW_FLUSHING,
GST_FLOW_FLUSHING);
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_FLUSHING, GST_FLOW_OK,
GST_FLOW_FLUSHING);
CHECK_COMBINED_FLOWS (GST_FLOW_FLUSHING, GST_FLOW_FLUSHING, GST_FLOW_FLUSHING,
GST_FLOW_FLUSHING);
/* eos */
CHECK_COMBINED_FLOWS (GST_FLOW_OK, GST_FLOW_NOT_LINKED, GST_FLOW_EOS,
GST_FLOW_OK);
CHECK_COMBINED_FLOWS (GST_FLOW_EOS, GST_FLOW_OK, GST_FLOW_EOS, GST_FLOW_OK);
CHECK_COMBINED_FLOWS (GST_FLOW_EOS, GST_FLOW_EOS, GST_FLOW_EOS, GST_FLOW_EOS);
/* eos + not-linked */
CHECK_COMBINED_FLOWS (GST_FLOW_NOT_LINKED, GST_FLOW_EOS, GST_FLOW_EOS,
GST_FLOW_EOS);
CHECK_COMBINED_FLOWS (GST_FLOW_NOT_LINKED, GST_FLOW_NOT_LINKED, GST_FLOW_EOS,
GST_FLOW_EOS);
gst_object_unref (pad1);
gst_object_unref (pad2);
gst_object_unref (pad3);
gst_object_unref (peer1);
gst_object_unref (peer2);
gst_object_unref (peer3);
}
GST_END_TEST;
static Suite *
flow_combiner_suite (void)
{
Suite *s = suite_create ("GstFlowCombiner");
TCase *tc_chain = tcase_create ("general");
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_combined_flows);
return s;
}
GST_CHECK_MAIN (flow_combiner);

View file

@ -262,6 +262,11 @@ EXPORTS
gst_data_queue_push
gst_data_queue_push_force
gst_data_queue_set_flushing
gst_flow_combiner_add_pad
gst_flow_combiner_free
gst_flow_combiner_new
gst_flow_combiner_remove_pad
gst_flow_combiner_update_flow
gst_push_src_get_type
gst_queue_array_drop_element
gst_queue_array_find