Another big set of changes. Connections are now also pullfunc based. gstqueue has been updated, I don't know of any ...

Original commit message from CVS:
Another big set of changes.  Connections are now also pullfunc based.
gstqueue has been updated, I don't know of any other connections offhand.

There are still a few things that need doing, specifically the concept
of a source or connection with connections to multiple thread contexts is
not dealt with.  This may force us to move the threadstate from the
element to the pad, maybe keeping the element's copy for simple cases.
Then the Bin would create a structure to pass to the cothreaded _wrappers
of any such elements, which would detail the pads that are to be dealt with
by this particular cothread context.

That will speed things up to, since we don't have to look through the list
of all pads for every Src or Connection element for every iteration, we can
simply step through the list provided by the plan.  Special case might even
have a single pad pointer sitting there to trump the list, if there's only
one (the common case anyway).

Task 23098 is tracking these changes.  The main task 22588 depends on that
subtask, as well as 22240, which is a consistency check on PAD_DISABLED.
This commit is contained in:
Erik Walthinsen 2000-12-08 10:33:01 +00:00
parent d29ff12b69
commit 990baba8e3
8 changed files with 148 additions and 124 deletions

View file

@ -58,7 +58,7 @@ static void gst_queue_init (GstQueue *queue);
static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id);
static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id);
static void gst_queue_push (GstConnection *connection);
static void gst_queue_pull (GstPad *pad);
static void gst_queue_chain (GstPad *pad, GstBuffer *buf);
static void gst_queue_flush (GstQueue *queue);
@ -107,8 +107,6 @@ gst_queue_class_init (GstQueueClass *klass)
gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT,
GTK_ARG_READWRITE, ARG_MAX_LEVEL);
gstconnection_class->push = gst_queue_push;
gtkobject_class->set_arg = gst_queue_set_arg;
gtkobject_class->get_arg = gst_queue_get_arg;
@ -123,6 +121,7 @@ gst_queue_init (GstQueue *queue)
gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_pad_set_pull_function (queue->srcpad, gst_queue_pull);
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
queue->queue = NULL;
@ -219,14 +218,14 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
}
static void
gst_queue_push (GstConnection *connection)
gst_queue_pull (GstPad *pad)
{
GstQueue *queue = GST_QUEUE (connection);
GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad));
GstBuffer *buf = NULL;
GSList *front;
gboolean tosignal = FALSE;
const guchar *name;
name = gst_element_get_name (GST_ELEMENT (queue));
/* have to lock for thread-safety */

View file

@ -479,14 +479,14 @@ gst_bin_iterate (GstBin *bin)
{
GstBinClass *oclass;
DEBUG_ENTER("('%s')",gst_element_get_name(GST_ELEMENT(bin)));
DEBUG_ENTER("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
oclass = GST_BIN_CLASS (GTK_OBJECT (bin)->klass);
if (oclass->iterate)
(oclass->iterate) (bin);
DEBUG_LEAVE("('%s')",gst_element_get_name(GST_ELEMENT(bin)));
DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
}
/**
@ -531,29 +531,21 @@ gst_bin_loopfunc_wrapper (int argc,char *argv[])
DEBUG("element %s ended loop function\n", name);
} else {
DEBUG("element %s is chain-based\n", name);
if (GST_IS_CONNECTION (element) && argc == 1) {
while (1) {
DEBUG("calling push function of connection %s\n", name);
gst_connection_push (GST_CONNECTION (element));
DEBUG("calling push function of connection %s done\n", name);
}
} else {
DEBUG("stepping through pads\n");
do {
pads = element->pads;
while (pads) {
pad = GST_PAD (pads->data);
if (pad->direction == GST_PAD_SINK) {
DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad));
buf = gst_pad_pull (pad);
DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad));
(pad->chainfunc) (pad,buf);
DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad));
}
pads = g_list_next (pads);
DEBUG("stepping through pads\n");
do {
pads = element->pads;
while (pads) {
pad = GST_PAD (pads->data);
if (pad->direction == GST_PAD_SINK) {
DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad));
buf = gst_pad_pull (pad);
DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad));
(pad->chainfunc) (pad,buf);
DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad));
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
}
pads = g_list_next (pads);
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
}
GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
@ -562,39 +554,57 @@ gst_bin_loopfunc_wrapper (int argc,char *argv[])
}
static int
gst_bin_pullsrc_wrapper (int argc,char *argv[])
gst_bin_src_wrapper (int argc,char *argv[])
{
GstElement *element = GST_ELEMENT (argv);
GList *pads;
GstPad *pad;
G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
DEBUG("entering gst_bin_pullsrc_wrapper(%d,\"%s\")\n",argc,name);
DEBUG_ENTER("(%d,\"%s\")",argc,name);
do {
pads = element->pads;
while (pads) {
pad = GST_PAD (pads->data);
if (pad->direction == GST_PAD_SRC) {
region_struct *region = cothread_get_data (element->threadstate, "region");
if (region) {
//gst_src_push_region (GST_SRC (element), region->offset, region->size);
if (pad->pullregionfunc == NULL)
fprintf(stderr,"error, no pullregionfunc in \"%s\"\n", name);
(pad->pullregionfunc)(pad, region->offset, region->size);
}
else {
if (pad->pullfunc == NULL)
fprintf(stderr,"error, no pullfunc in \"%s\"\n", name);
(pad->pullfunc)(pad);
}
if (pad->pullfunc == NULL) DEBUG("error, no pullfunc in \"%s\"\n", name);
(pad->pullfunc)(pad);
}
pads = g_list_next(pads);
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
DEBUG("leaving gst_bin_pullsrc_wrapper(%d,\"%s\")\n",argc,name);
DEBUG_LEAVE("");
return 0;
}
static int
gst_bin_connection_wrapper (int argc,char *argv[])
{
GstElement *element = GST_ELEMENT (argv);
GList *pads;
GstPad *pad;
G_GNUC_UNUSED const gchar *name = gst_element_get_name (element);
DEBUG_ENTER("(%d,\"%s\")",argc,name);
do {
pads = element->pads;
while (pads) {
pad = GST_PAD (pads->data);
if (pad->direction == GST_PAD_SRC) {
DEBUG("pullfunc for %s:%s(%p) at %p is set to %p\n",GST_DEBUG_PAD_NAME(pad),pad,&pad->pullfunc,pad->pullfunc);
if (pad->pullfunc == NULL) fprintf(stderr,"error, no pullfunc in \"%s\"\n", name);
(pad->pullfunc)(pad);
}
pads = g_list_next(pads);
}
} while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element));
GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING);
DEBUG_LEAVE("");
return 0;
}
@ -632,14 +642,17 @@ gst_bin_pushfunc_proxy (GstPad *pad)
static void
gst_bin_create_plan_func (GstBin *bin)
{
const gchar *binname = gst_element_get_name(GST_ELEMENT(bin));
GList *elements;
GstElement *element;
int sink_pads;
GList *pads;
GstPad *pad, *opad, *peer;
GstPad *pad, *peer;
GstElement *outside;
g_print("gstbin: creating plan for bin \"%s\"\n", gst_element_get_name (GST_ELEMENT (bin)));
DEBUG_SET_STRING("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
DEBUG_ENTER_STRING;
// g_print("gstbin: creating plan for bin \"%s\"\n", gst_element_get_name (GST_ELEMENT (bin)));
// first loop through all children to see if we need cothreads
// we break immediately when we find we need to, why keep searching?
@ -647,25 +660,25 @@ gst_bin_create_plan_func (GstBin *bin)
while (elements) {
element = GST_ELEMENT (elements->data);
g_print("gstbin: found element \"%s\" in bin \"%s\"\n",
gst_element_get_name (element),
gst_element_get_name (GST_ELEMENT (bin)));
DEBUG("found element \"%s\" in bin \"%s\"\n",
gst_element_get_name (element),
gst_element_get_name (GST_ELEMENT (bin)));
// if it's a loop-based element, use cothreads
if (element->loopfunc != NULL) {
g_print("gstbin: loop based element \"%s\" in bin \"%s\"\n",
gst_element_get_name (element),
gst_element_get_name (GST_ELEMENT (bin)));
DEBUG("loop based element \"%s\" in bin \"%s\"\n",
gst_element_get_name (element),
gst_element_get_name (GST_ELEMENT (bin)));
bin->need_cothreads = TRUE;
break;
}
// if it's a complex element, use cothreads
else if (GST_ELEMENT_IS_MULTI_IN (element)) {
g_print("gstbin: complex element \"%s\" in bin \"%s\"\n",
gst_element_get_name (element),
gst_element_get_name (GST_ELEMENT (bin)));
DEBUG("complex element \"%s\" in bin \"%s\"\n",
gst_element_get_name (element),
gst_element_get_name (GST_ELEMENT (bin)));
bin->need_cothreads = TRUE;
break;
}
@ -679,9 +692,9 @@ gst_bin_create_plan_func (GstBin *bin)
pads = g_list_next (pads);
}
if (sink_pads > 1) {
g_print("gstbin: more than 1 sinkpad for element \"%s\" in bin \"%s\"\n",
gst_element_get_name (element),
gst_element_get_name (GST_ELEMENT (bin)));
DEBUG("more than 1 sinkpad for element \"%s\" in bin \"%s\"\n",
gst_element_get_name (element),
gst_element_get_name (GST_ELEMENT (bin)));
bin->need_cothreads = TRUE;
break;
@ -700,12 +713,12 @@ gst_bin_create_plan_func (GstBin *bin)
bin->numentries = 0;
if (bin->need_cothreads) {
g_print("gstbin: need cothreads\n");
DEBUG("NEED COTHREADS\n");
// first create thread context
if (bin->threadcontext == NULL) {
bin->threadcontext = cothread_init ();
g_print("gstbin: initialized cothread context\n");
DEBUG("initialized cothread context\n");
}
// walk through all the children
@ -718,59 +731,68 @@ gst_bin_create_plan_func (GstBin *bin)
element->threadstate = cothread_create (bin->threadcontext);
cothread_setfunc (element->threadstate, gst_bin_loopfunc_wrapper,
0, (char **)element);
DEBUG("created element threadstate %p for \"%s\"\n",element->threadstate,
gst_element_get_name(element));
}
if (GST_IS_BIN (element)) {
gst_bin_create_plan (GST_BIN (element));
}
if (GST_IS_SRC (element)) {
g_print("gstbin: adding '%s' as entry point\n",gst_element_get_name (element));
DEBUG("adding '%s' as entry point\n",gst_element_get_name (element));
bin->entries = g_list_prepend (bin->entries,element);
bin->numentries++;
cothread_setfunc(element->threadstate,gst_bin_pullsrc_wrapper,0,(char **)element);
cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element);
}
pads = gst_element_get_pad_list (element);
while (pads) {
pad = GST_PAD(pads->data);
g_print("gstbin: setting push&pull handlers for %s:%s\n",
gst_element_get_name (element), gst_pad_get_name (pad));
// DEBUG("setting push&pull handlers for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
// an internal connection will push outside this bin.
if (!GST_IS_CONNECTION (element)) {
pad->pushfunc = gst_bin_pushfunc_proxy;
}
if (!pad->pullfunc)
pad->pullfunc = gst_bin_pullfunc_proxy;
if (!pad->pullregionfunc)
pad->pullregionfunc = gst_bin_pullregionfunc_proxy;
if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
DEBUG("checking/setting push proxy for srcpad %s:%s\n",
GST_DEBUG_PAD_NAME(pad));
// set the proxy functions
if (!pad->pushfunc)
pad->pushfunc = gst_bin_pushfunc_proxy;
/* we only worry about sink pads */
if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
/* get the pad's peer */
} else if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
DEBUG("checking/setting pull proxies for sinkpad %s:%s\n",
GST_DEBUG_PAD_NAME(pad));
// set the proxy functions
if (!pad->pullfunc)
pad->pullfunc = gst_bin_pullfunc_proxy;
if (!pad->pullfunc)
pad->pullregionfunc = gst_bin_pullregionfunc_proxy;
// ***** check for possible connections outside
// get the pad's peer
peer = gst_pad_get_peer (pad);
// FIXME this should be an error condition, if not disabled
if (!peer) break;
/* get the parent of the peer of the pad */
// get the parent of the peer of the pad
outside = GST_ELEMENT (gst_pad_get_parent (peer));
// FIXME this should *really* be an error condition
if (!outside) break;
/* if it's a connection and it's not ours... */
if (GST_IS_CONNECTION (outside) &&
(gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
/***** this is irrelevant now *****
GList *connection_pads = gst_element_get_pad_list (outside);
while (connection_pads) {
opad = GST_PAD (connection_pads->data);
if (gst_pad_get_direction (opad) == GST_PAD_SRC) {
g_print("gstbin: setting push&pull handlers for %s:%s SRC connection %p %p\n",
gst_element_get_name (outside),gst_pad_get_name (opad), opad, opad->pullfunc);
GST_DEBUG_PAD_NAME(pad), opad, opad->pullfunc);
opad->pushfunc = gst_bin_pushfunc_proxy;
opad->pullfunc = gst_bin_pullfunc_proxy;
opad->pullregionfunc = gst_bin_pullregionfunc_proxy;
if (outside->threadstate == NULL) {
outside->threadstate = cothread_create (bin->threadcontext);
cothread_setfunc (outside->threadstate, gst_bin_loopfunc_wrapper,
1, (char **)outside);
cothread_setfunc (outside->threadstate, gst_bin_loopfunc_wrapper,
1, (char **)outside);
}
}
connection_pads = g_list_next (connection_pads);
@ -779,8 +801,19 @@ gst_bin_create_plan_func (GstBin *bin)
"for internal element \"%s\"\n",
gst_element_get_name (GST_ELEMENT (outside)),
gst_element_get_name (GST_ELEMENT (element)));
*****/
DEBUG("connection '%s' outside bin is an entry\n",gst_element_get_name(outside));
bin->entries = g_list_prepend (bin->entries,outside);
bin->numentries++;
// FIXME: this won't work in the case where a connection feeds multiple contexts.
// have to have a list of internal structures of connection entries, struct passed to
// the cothread wrapper, struct listing pads that enter that context.
if (outside->threadstate == NULL) {
outside->threadstate = cothread_create (bin->threadcontext);
DEBUG("created element threadstate %p for \"%s\"\n",outside->threadstate,
gst_element_get_name(outside));
}
cothread_setfunc(outside->threadstate,gst_bin_connection_wrapper,0,(char **)outside);
}
}
pads = g_list_next (pads);
@ -808,7 +841,7 @@ gst_bin_create_plan_func (GstBin *bin)
pad = GST_PAD (pads->data);
/* we only worry about sink pads */
if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
g_print("gstbin: found SINK pad %s\n", gst_pad_get_name (pad));
g_print("gstbin '%s': found SINK pad %s:%s\n", binname, GST_DEBUG_PAD_NAME(pad));
/* get the pad's peer */
peer = gst_pad_get_peer (pad);
if (!peer) {
@ -848,7 +881,8 @@ gst_bin_iterate_func (GstBin *bin)
GList *pads;
GstPad *pad;
DEBUG_ENTER("(%s)", gst_element_get_name (GST_ELEMENT (bin)));
DEBUG_SET_STRING("(\"%s\")", gst_element_get_name (GST_ELEMENT (bin)));
DEBUG_ENTER_STRING;
g_return_if_fail (bin != NULL);
g_return_if_fail (GST_IS_BIN (bin));
@ -887,9 +921,9 @@ gst_bin_iterate_func (GstBin *bin)
}
pads = g_list_next (pads);
}
} else if (GST_IS_CONNECTION (entry))
gst_connection_push (GST_CONNECTION (entry));
else if (GST_IS_BIN (entry))
// } else if (GST_IS_CONNECTION (entry)) {
// gst_connection_push (GST_CONNECTION (entry));
} else if (GST_IS_BIN (entry))
gst_bin_iterate (GST_BIN (entry));
else {
fprintf(stderr, "gstbin: entry \"%s\" cannot be handled\n", gst_element_get_name (entry));

View file

@ -91,24 +91,3 @@ gst_connection_new (gchar *name)
return connection;
}
/**
* gst_connection_push:
* @connection: the connection to push
*
* Push a buffer along a connection
*/
void
gst_connection_push (GstConnection *connection)
{
GstConnectionClass *oclass;
g_return_if_fail (connection != NULL);
g_return_if_fail (GST_IS_CONNECTION (connection));
oclass = (GstConnectionClass *)(GTK_OBJECT (connection)->klass);
g_return_if_fail (oclass->push != NULL);
(oclass->push)(connection);
}

View file

@ -50,16 +50,11 @@ struct _GstConnection {
struct _GstConnectionClass {
GstElementClass parent_class;
/* push function */
void (*push) (GstConnection *connection);
};
GtkType gst_connection_get_type (void);
GstElement* gst_connection_new (gchar *name);
void gst_connection_push (GstConnection *connection);
#ifdef __cplusplus
}
#endif /* __cplusplus */

View file

@ -92,13 +92,28 @@ G_GNUC_UNUSED static GModule *_debug_self_module = NULL;
}
#ifdef GST_DEBUG_ENABLED
#define DEBUG(format,args...) fprintf(stderr,GST_DEBUG_PREFIX(": "format , ## args ))
#define DEBUG_ENTER(format, args...) fprintf(stderr,GST_DEBUG_PREFIX(format": entering\n" , ## args ))
#define DEBUG_LEAVE(format, args...) fprintf(stderr,GST_DEBUG_PREFIX(format": leaving\n" , ## args ))
#define DEBUG(format,args...) \
(_debug_string != NULL) ? \
fprintf(stderr,GST_DEBUG_PREFIX("%s: "format , _debug_string , ## args )) : \
fprintf(stderr,GST_DEBUG_PREFIX(": "format , ## args ))
#define DEBUG_ENTER(format, args...) \
fprintf(stderr,GST_DEBUG_PREFIX(format": entering\n" , ## args ))
#define DEBUG_SET_STRING(format, args...) \
gchar *_debug_string = g_strdup_printf(format , ## args )
#define DEBUG_ENTER_STRING DEBUG_ENTER("%s",_debug_string)
#define DEBUG_LEAVE(format, args...) \
if (_debug_string != NULL) g_free(_debug_string),\
fprintf(stderr,GST_DEBUG_PREFIX(format": leaving\n" , ## args ))
#else
#define DEBUG(format, args...)
#define DEBUG_ENTER(format, args...)
#define DEBUG_LEAVE(format, args...)
#endif
/********** some convenience macros for debugging **********/
#define GST_DEBUG_PAD_NAME(pad) \
((pad)->parent != NULL) ? gst_element_get_name(GST_ELEMENT((pad)->parent)) : "''", gst_pad_get_name(pad)
#endif /* __GST_H__ */

View file

@ -111,6 +111,7 @@ gst_element_init (GstElement *element)
element->numpads = 0;
element->pads = NULL;
element->loopfunc = NULL;
element->threadstate = NULL;
}
/**

View file

@ -240,9 +240,11 @@ gst_pad_set_pull_function (GstPad *pad,
g_return_if_fail (pad != NULL);
g_return_if_fail (GST_IS_PAD (pad));
g_print("gstpad: pad setting pull function\n");
// the if and such should optimize out when DEBUG is off
DEBUG("setting pull function for %s:%s\n",GST_DEBUG_PAD_NAME(pad));
pad->pullfunc = pull;
DEBUG("pullfunc for %s:%s(%p) at %p is set to %p\n",GST_DEBUG_PAD_NAME(pad),pad,&pad->pullfunc,pull);
}
/**

View file

@ -58,7 +58,7 @@ static void gst_queue_init (GstQueue *queue);
static void gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id);
static void gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id);
static void gst_queue_push (GstConnection *connection);
static void gst_queue_pull (GstPad *pad);
static void gst_queue_chain (GstPad *pad, GstBuffer *buf);
static void gst_queue_flush (GstQueue *queue);
@ -107,8 +107,6 @@ gst_queue_class_init (GstQueueClass *klass)
gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT,
GTK_ARG_READWRITE, ARG_MAX_LEVEL);
gstconnection_class->push = gst_queue_push;
gtkobject_class->set_arg = gst_queue_set_arg;
gtkobject_class->get_arg = gst_queue_get_arg;
@ -123,6 +121,7 @@ gst_queue_init (GstQueue *queue)
gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_pad_set_pull_function (queue->srcpad, gst_queue_pull);
gst_element_add_pad (GST_ELEMENT (queue), queue->srcpad);
queue->queue = NULL;
@ -219,14 +218,14 @@ gst_queue_chain (GstPad *pad, GstBuffer *buf)
}
static void
gst_queue_push (GstConnection *connection)
gst_queue_pull (GstPad *pad)
{
GstQueue *queue = GST_QUEUE (connection);
GstQueue *queue = GST_QUEUE (gst_pad_get_parent(pad));
GstBuffer *buf = NULL;
GSList *front;
gboolean tosignal = FALSE;
const guchar *name;
name = gst_element_get_name (GST_ELEMENT (queue));
/* have to lock for thread-safety */