plugins/elements/gsttee.c: Be a lot smarter when deciding what srcpad to use for proxying the buffer_alloc. Also hand...

Original commit message from CVS:
* plugins/elements/gsttee.c: (gst_tee_base_init),
(gst_tee_request_new_pad), (gst_tee_release_pad),
(gst_tee_find_buffer_alloc), (gst_tee_buffer_alloc),
(gst_tee_do_push), (clear_pads), (gst_tee_handle_buffer),
(gst_tee_chain):
Be a lot smarter when deciding what srcpad to use for proxying
the buffer_alloc. Also handle pad added/removed when doing so.
Fixes #357959.
Keep track of what pads we already pushed on in case we have pads
added/removed while pushing. Fixes #374639
* tests/check/Makefile.am:
* tests/check/elements/tee.c: (handoff), (GST_START_TEST),
(tee_suite):
Added unit test for pad resync.
This commit is contained in:
Wim Taymans 2007-07-03 16:26:29 +00:00
parent f26d1795b9
commit 4cc7b818fd
4 changed files with 364 additions and 58 deletions

View file

@ -1,3 +1,21 @@
2007-07-03 Wim Taymans <wim.taymans@gmail.com>
* plugins/elements/gsttee.c: (gst_tee_base_init),
(gst_tee_request_new_pad), (gst_tee_release_pad),
(gst_tee_find_buffer_alloc), (gst_tee_buffer_alloc),
(gst_tee_do_push), (clear_pads), (gst_tee_handle_buffer),
(gst_tee_chain):
Be a lot smarter when deciding what srcpad to use for proxying
the buffer_alloc. Also handle pad added/removed when doing so.
Fixes #357959.
Keep track of what pads we already pushed on in case we have pads
added/removed while pushing. Fixes #374639
* tests/check/Makefile.am:
* tests/check/elements/tee.c: (handoff), (GST_START_TEST),
(tee_suite):
Added unit test for pad resync.
2007-07-01 Thomas Vander Stichele <thomas at apestaart dot org>
* po/nl.po:

View file

@ -1,7 +1,7 @@
/* GStreamer
* Copyright (C) 1999,2000 Erik Walthinsen <omega@cse.ogi.edu>
* 2000,2001,2002,2003,2004,2005 Wim Taymans <wim@fluendo.com>
*
* 2007 Wim Taymans <wim.taymans@gmail.com>
*
* gsttee.c: Tee element, one in N out
*
@ -91,6 +91,15 @@ GstStaticPadTemplate tee_src_template = GST_STATIC_PAD_TEMPLATE ("src%d",
GST_BOILERPLATE_FULL (GstTee, gst_tee, GstElement, GST_TYPE_ELEMENT, _do_init);
/* structure and quark to keep track of which pads have been pushed */
static GQuark push_data;
typedef struct
{
gboolean pushed;
GstFlowReturn result;
} PushData;
static GstPad *gst_tee_request_new_pad (GstElement * element,
GstPadTemplate * temp, const gchar * unused);
static void gst_tee_release_pad (GstElement * element, GstPad * pad);
@ -125,6 +134,8 @@ gst_tee_base_init (gpointer g_class)
gst_static_pad_template_get (&sinktemplate));
gst_element_class_add_pad_template (gstelement_class,
gst_static_pad_template_get (&tee_src_template));
push_data = g_quark_from_static_string ("tee-push-data");
}
static void
@ -211,19 +222,27 @@ gst_tee_request_new_pad (GstElement * element, GstPadTemplate * templ,
GstTee *tee;
GstActivateMode mode;
gboolean res;
PushData *data;
tee = GST_TEE (element);
GST_DEBUG_OBJECT (tee, "requesting pad");
GST_OBJECT_LOCK (tee);
name = g_strdup_printf ("src%d", tee->pad_counter++);
srcpad = gst_pad_new_from_template (templ, name);
g_free (name);
if (tee->allocpad == NULL)
tee->allocpad = srcpad;
mode = tee->sink_mode;
/* install the data, we automatically free it when the pad is disposed because
* of _release_pad or when the element goes away. */
data = g_new0 (PushData, 1);
data->pushed = FALSE;
data->result = GST_FLOW_NOT_LINKED;
g_object_set_qdata_full (G_OBJECT (srcpad), push_data, data, g_free);
GST_OBJECT_UNLOCK (tee);
switch (mode) {
@ -275,6 +294,8 @@ gst_tee_release_pad (GstElement * element, GstPad * pad)
tee = GST_TEE (element);
GST_DEBUG_OBJECT (tee, "releasing pad");
GST_OBJECT_LOCK (tee);
if (tee->allocpad == pad)
tee->allocpad = NULL;
@ -348,6 +369,61 @@ gst_tee_get_property (GObject * object, guint prop_id, GValue * value,
GST_OBJECT_UNLOCK (tee);
}
/* we have no previous source pad we can use to proxy the pad alloc. Loop over
* the source pads, try to alloc a buffer on each one of them. Keep a reference
* to the first pad that succeeds, we will be using it to alloc more buffers
* later. */
static GstFlowReturn
gst_tee_find_buffer_alloc (GstTee * tee, guint64 offset, guint size,
GstCaps * caps, GstBuffer ** buf)
{
GstFlowReturn res;
GList *pads;
guint32 cookie;
res = GST_FLOW_NOT_LINKED;
retry:
pads = GST_ELEMENT_CAST (tee)->srcpads;
cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
while (pads) {
GstPad *pad;
pad = GST_PAD_CAST (pads->data);
gst_object_ref (pad);
GST_DEBUG_OBJECT (tee, "try alloc on pad %s:%s", GST_DEBUG_PAD_NAME (pad));
GST_OBJECT_UNLOCK (tee);
res = gst_pad_alloc_buffer (pad, offset, size, caps, buf);
GST_DEBUG_OBJECT (tee, "got return value %d", res);
gst_object_unref (pad);
GST_OBJECT_LOCK (tee);
if (GST_ELEMENT_CAST (tee)->pads_cookie != cookie) {
GST_DEBUG_OBJECT (tee, "pad list changed, restart");
/* pad list changed, restart. If the pad alloc function returned OK we
* need to unref the buffer */
if (res == GST_FLOW_OK)
gst_buffer_unref (*buf);
goto retry;
}
if (res == GST_FLOW_OK) {
GST_DEBUG_OBJECT (tee, "we have a buffer on pad %s:%s",
GST_DEBUG_PAD_NAME (pad));
/* we have a buffer, keep the pad for later and exit the loop. */
tee->allocpad = pad;
break;
}
/* no valid buffer, try another pad */
pads = g_list_next (pads);
}
return res;
}
static GstFlowReturn
gst_tee_buffer_alloc (GstPad * pad, guint64 offset, guint size,
GstCaps * caps, GstBuffer ** buf)
@ -358,100 +434,162 @@ gst_tee_buffer_alloc (GstPad * pad, guint64 offset, guint size,
tee = GST_TEE (GST_PAD_PARENT (pad));
GST_OBJECT_LOCK (tee);
if ((allocpad = tee->allocpad))
gst_object_ref (allocpad);
GST_OBJECT_UNLOCK (tee);
res = GST_FLOW_NOT_LINKED;
GST_OBJECT_LOCK (tee);
if ((allocpad = tee->allocpad)) {
/* if we had a previous pad we used for allocating a buffer, continue using
* it. */
GST_DEBUG_OBJECT (tee, "using pad %s:%s for alloc",
GST_DEBUG_PAD_NAME (allocpad));
gst_object_ref (allocpad);
GST_OBJECT_UNLOCK (tee);
if (allocpad) {
res = gst_pad_alloc_buffer (allocpad, offset, size, caps, buf);
gst_object_unref (allocpad);
} else {
res = GST_FLOW_OK;
*buf = NULL;
GST_OBJECT_LOCK (tee);
}
/* either we failed to alloc on the the previous pad or we did not have a
* previous pad. */
if (res == GST_FLOW_NOT_LINKED) {
/* find a new pad to alloc a buffer on */
GST_DEBUG_OBJECT (tee, "finding pad for alloc");
res = gst_tee_find_buffer_alloc (tee, offset, size, caps, buf);
}
GST_OBJECT_UNLOCK (tee);
return res;
}
typedef struct
{
GstTee *tee;
GstBuffer *buffer;
} PushData;
static gboolean
gst_tee_do_push (GstPad * pad, GValue * ret, PushData * data)
static GstFlowReturn
gst_tee_do_push (GstTee * tee, GstPad * pad, GstBuffer * buffer)
{
GstFlowReturn res;
GstTee *tee = data->tee;
if (G_UNLIKELY (!data->tee->silent)) {
GstBuffer *buf = data->buffer;
if (G_UNLIKELY (!tee->silent)) {
GST_OBJECT_LOCK (tee);
g_free (tee->last_message);
tee->last_message =
g_strdup_printf ("chain ******* (%s:%s)t (%d bytes, %"
G_GUINT64_FORMAT ") %p", GST_DEBUG_PAD_NAME (pad),
GST_BUFFER_SIZE (buf), GST_BUFFER_TIMESTAMP (buf), buf);
GST_BUFFER_SIZE (buffer), GST_BUFFER_TIMESTAMP (buffer), buffer);
GST_OBJECT_UNLOCK (tee);
g_object_notify (G_OBJECT (tee), "last_message");
}
/* Push */
if (pad == data->tee->pull_pad) {
if (pad == tee->pull_pad) {
/* don't push on the pad we're pulling from */
res = GST_FLOW_OK;
} else {
res = gst_pad_push (pad, gst_buffer_ref (data->buffer));
GST_LOG_OBJECT (tee, "Pushing buffer %p to %" GST_PTR_FORMAT
" yielded result=%d", data->buffer, pad, res);
res = gst_pad_push (pad, gst_buffer_ref (buffer));
}
return res;
}
/* If it's fatal or OK, or if ret is currently
* not-linked, we overwrite the previous value */
if (GST_FLOW_IS_FATAL (res) || (res == GST_FLOW_OK) ||
(g_value_get_enum (ret) == GST_FLOW_NOT_LINKED)) {
GST_LOG_OBJECT (tee, "Replacing ret val %d with %d",
g_value_get_enum (ret), res);
g_value_set_enum (ret, res);
}
static void
clear_pads (GstPad * pad, GstTee * tee)
{
PushData *data;
gst_object_unref (pad);
data = g_object_get_qdata (G_OBJECT (pad), push_data);
/* Stop iterating if flow return is fatal */
return (!GST_FLOW_IS_FATAL (res));
/* the data must be there or we have a screwed up internal state */
g_assert (data != NULL);
data->pushed = FALSE;
data->result = GST_FLOW_NOT_LINKED;
}
static GstFlowReturn
gst_tee_handle_buffer (GstTee * tee, GstBuffer * buffer)
{
GstIterator *iter;
PushData data;
GValue ret = { 0, };
GstIteratorResult res;
GList *pads;
guint32 cookie;
GstFlowReturn ret, cret;
tee->offset += GST_BUFFER_SIZE (buffer);
g_value_init (&ret, GST_TYPE_FLOW_RETURN);
g_value_set_enum (&ret, GST_FLOW_NOT_LINKED);
iter = gst_element_iterate_src_pads (GST_ELEMENT (tee));
data.tee = tee;
data.buffer = buffer;
GST_OBJECT_LOCK (tee);
/* mark all pads as 'not pushed on yet' */
g_list_foreach (GST_ELEMENT_CAST (tee)->srcpads, (GFunc) clear_pads, tee);
GST_LOG_OBJECT (tee, "Starting to push buffer %p", buffer);
/* FIXME: Not sure how tee would handle RESEND buffer from some of the
* pads but not from others. */
res = gst_iterator_fold (iter, (GstIteratorFoldFunction) gst_tee_do_push,
&ret, &data);
gst_iterator_free (iter);
restart:
cret = GST_FLOW_NOT_LINKED;
pads = GST_ELEMENT_CAST (tee)->srcpads;
cookie = GST_ELEMENT_CAST (tee)->pads_cookie;
GST_LOG_OBJECT (tee, "Pushing buffer %p yielded result=%d", buffer,
g_value_get_enum (&ret));
while (pads) {
GstPad *pad;
PushData *data;
pad = GST_PAD_CAST (pads->data);
/* get the private data, something is really wrong with the internal state
* when it is not there */
data = g_object_get_qdata (G_OBJECT (pad), push_data);
g_assert (data != NULL);
if (!data->pushed) {
/* not yet pushed, release lock and start pushing */
gst_object_ref (pad);
GST_OBJECT_UNLOCK (tee);
GST_LOG_OBJECT (tee, "Starting to push buffer %p", buffer);
ret = gst_tee_do_push (tee, pad, buffer);
GST_LOG_OBJECT (tee, "Pushing buffer %p yielded result %s", buffer,
gst_flow_get_name (ret));
GST_OBJECT_LOCK (tee);
/* keep track of which pad we pushed and the result value. We need to do
* this before we release the refcount on the pad, the PushData is
* destroyed when the last ref of the pad goes away. */
data->pushed = TRUE;
data->result = ret;
gst_object_unref (pad);
} else {
/* already pushed, use previous return value */
ret = data->result;
GST_LOG_OBJECT (tee, "pad already pushed with %s",
gst_flow_get_name (ret));
}
/* stop pushing more buffers when we have a fatal error */
if (GST_FLOW_IS_FATAL (ret))
goto error;
/* keep all other return values, overwriting the previous one */
GST_LOG_OBJECT (tee, "Replacing ret val %d with %d", cret, ret);
if (cret == GST_FLOW_NOT_LINKED)
cret = ret;
if (GST_ELEMENT_CAST (tee)->pads_cookie != cookie) {
GST_LOG_OBJECT (tee, "pad list changed");
/* the list of pads changed, restart iteration. Pads that we already
* pushed on and are still in the new list, will not be pushed on
* again. */
goto restart;
}
pads = g_list_next (pads);
}
GST_OBJECT_UNLOCK (tee);
gst_buffer_unref (buffer);
/* no need to unset gvalue */
return g_value_get_enum (&ret);
return cret;
/* ERRORS */
error:
{
GST_DEBUG_OBJECT (tee, "received error %s", gst_flow_get_name (ret));
gst_buffer_unref (buffer);
GST_OBJECT_UNLOCK (tee);
return ret;
}
}
static GstFlowReturn
@ -462,8 +600,12 @@ gst_tee_chain (GstPad * pad, GstBuffer * buffer)
tee = GST_TEE (gst_pad_get_parent (pad));
GST_DEBUG_OBJECT (tee, "received buffer %p", buffer);
res = gst_tee_handle_buffer (tee, buffer);
GST_DEBUG_OBJECT (tee, "handled buffer %s", gst_flow_get_name (res));
gst_object_unref (tee);
return res;

View file

@ -62,6 +62,7 @@ REGISTRY_CHECKS = \
elements/filesrc \
elements/identity \
elements/multiqueue \
elements/tee \
libs/basesrc \
libs/controller \
libs/typefindhelper \

145
tests/check/elements/tee.c Normal file
View file

@ -0,0 +1,145 @@
/* GStreamer
*
* unit test for tee
*
* Copyright (C) <2007> Wim Taymans <wim dot taymans at gmail dot com>
*
* This library is free software; you can redistribute it and/or
* modify it under the terms of the GNU Library General Public
* License as published by the Free Software Foundation; either
* version 2 of the License, or (at your option) any later version.
*
* This library is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
* Library General Public License for more details.
*
* You should have received a copy of the GNU Library General Public
* License along with this library; if not, write to the
* Free Software Foundation, Inc., 59 Temple Place - Suite 330,
* Boston, MA 02111-1307, USA.
*/
#include <unistd.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <fcntl.h>
#include <gst/check/gstcheck.h>
static gint count1;
static gint count2;
static void
handoff (GstElement * fakesink, GstBuffer * buf, GstPad * pad, guint * count)
{
*count = *count + 1;
}
/* construct fakesrc num-buffers=3 ! tee name=t ! queue ! fakesink t. ! queue !
* fakesink. Each fakesink should exactly receive 3 buffers.
*/
GST_START_TEST (test_num_buffers)
{
GstElement *pipeline;
GstElement *f1, *f2;
gchar *desc;
GstBus *bus;
GstMessage *msg;
desc = "fakesrc num-buffers=3 ! tee name=t ! queue ! fakesink name=f1 "
"t. ! queue ! fakesink name=f2";
pipeline = gst_parse_launch (desc, NULL);
fail_if (pipeline == NULL);
f1 = gst_bin_get_by_name (GST_BIN (pipeline), "f1");
fail_if (f1 == NULL);
f2 = gst_bin_get_by_name (GST_BIN (pipeline), "f2");
fail_if (f2 == NULL);
count1 = 0;
count2 = 0;
g_object_set (G_OBJECT (f1), "signal-handoffs", TRUE, NULL);
g_signal_connect (G_OBJECT (f1), "handoff", (GCallback) handoff, &count1);
g_object_set (G_OBJECT (f2), "signal-handoffs", TRUE, NULL);
g_signal_connect (G_OBJECT (f2), "handoff", (GCallback) handoff, &count2);
bus = gst_element_get_bus (pipeline);
fail_if (bus == NULL);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
gst_message_unref (msg);
fail_if (count1 != 3);
fail_if (count2 != 3);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (f1);
gst_object_unref (f2);
gst_object_unref (bus);
gst_object_unref (pipeline);
}
GST_END_TEST;
/* we use fakesrc ! tee ! fakesink and then randomly request/release and link
* some pads from tee. This should happily run without any errors. */
GST_START_TEST (test_stress)
{
GstElement *pipeline;
GstElement *tee;
gchar *desc;
GstBus *bus;
GstMessage *msg;
gint i;
desc = "fakesrc num-buffers=100000 ! tee name=t ! queue ! fakesink";
pipeline = gst_parse_launch (desc, NULL);
fail_if (pipeline == NULL);
tee = gst_bin_get_by_name (GST_BIN (pipeline), "t");
fail_if (tee == NULL);
/* bring the pipeline to PLAYING, then start switching */
bus = gst_element_get_bus (pipeline);
fail_if (bus == NULL);
gst_element_set_state (pipeline, GST_STATE_PLAYING);
for (i = 0; i < 50000; i++) {
GstPad *pad;
pad = gst_element_get_request_pad (tee, "src%d");
gst_element_release_request_pad (tee, pad);
gst_object_unref (pad);
}
/* now wait for completion or error */
msg = gst_bus_poll (bus, GST_MESSAGE_EOS | GST_MESSAGE_ERROR, -1);
fail_if (GST_MESSAGE_TYPE (msg) != GST_MESSAGE_EOS);
gst_message_unref (msg);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (tee);
gst_object_unref (bus);
gst_object_unref (pipeline);
}
GST_END_TEST;
Suite *
tee_suite (void)
{
Suite *s = suite_create ("tee");
TCase *tc_chain = tcase_create ("general");
suite_add_tcase (s, tc_chain);
tcase_add_test (tc_chain, test_num_buffers);
tcase_add_test (tc_chain, test_stress);
return s;
}
GST_CHECK_MAIN (tee);