diff --git a/gst/Makefile.am b/gst/Makefile.am index ce82fe365e..2fe397fda1 100644 --- a/gst/Makefile.am +++ b/gst/Makefile.am @@ -46,7 +46,8 @@ libgst_la_SOURCES = \ gstmeta.c \ gsttee.c \ gstxml.c \ - cothreads.c + cothreads.c \ + gstscheduler.c libgstincludedir = $(includedir)/gst libgstinclude_HEADERS = \ @@ -78,7 +79,8 @@ libgstinclude_HEADERS = \ gsttee.h \ gstxml.h \ gstdebug.h \ - cothreads.h + cothreads.h \ + gstscheduler.h noinst_HEADERS = \ gstarch.h \ diff --git a/gst/elements/gstidentity.c b/gst/elements/gstidentity.c index 5622f31152..17e4f38a3b 100644 --- a/gst/elements/gstidentity.c +++ b/gst/elements/gstidentity.c @@ -134,6 +134,7 @@ gst_identity_loop (GstElement *element) do { buf = gst_pad_pull (identity->sinkpad); + g_print("(%s:%s)i ",GST_DEBUG_PAD_NAME(identity->sinkpad)); gst_pad_push (identity->srcpad, buf); diff --git a/gst/elements/gstqueue.c b/gst/elements/gstqueue.c index 8c2dcdbc9b..7a86bea0a1 100644 --- a/gst/elements/gstqueue.c +++ b/gst/elements/gstqueue.c @@ -49,6 +49,7 @@ enum { ARG_0, ARG_LEVEL, ARG_MAX_LEVEL, + ARG_BLOCK, }; @@ -106,6 +107,8 @@ gst_queue_class_init (GstQueueClass *klass) GTK_ARG_READABLE, ARG_LEVEL); gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT, GTK_ARG_READWRITE, ARG_MAX_LEVEL); + gtk_object_add_arg_type ("GstQueue::block", GTK_TYPE_BOOL, + GTK_ARG_READWRITE, ARG_BLOCK); gtkobject_class->set_arg = gst_queue_set_arg; gtkobject_class->get_arg = gst_queue_get_arg; @@ -116,7 +119,8 @@ gst_queue_class_init (GstQueueClass *klass) static void gst_queue_init (GstQueue *queue) { - GST_FLAG_SET (queue, GST_ELEMENT_SCHEDULE_PASSIVELY); + // scheduling on this kind of element is, well, interesting + GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED); queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain)); @@ -129,6 +133,7 @@ gst_queue_init (GstQueue *queue) queue->queue = NULL; queue->level_buffers = 0; queue->max_buffers = 20; + queue->block = TRUE; queue->level_bytes = 0; queue->size_buffers = 0; queue->size_bytes = 0; @@ -237,6 +242,12 @@ gst_queue_get (GstPad *pad) DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond); DEBUG("queue: %s have queue lock\n", name); + // we bail if there's nothing there + if (!queue->level_buffers && !queue->block) { + GST_UNLOCK(queue); + return NULL; + } + while (!queue->level_buffers) { STATUS("queue: %s U released lock\n"); GST_UNLOCK (queue); @@ -312,6 +323,9 @@ gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id) case ARG_MAX_LEVEL: queue->max_buffers = GTK_VALUE_INT (*arg); break; + case ARG_BLOCK: + queue->block = GTK_VALUE_BOOL (*arg); + break; default: break; } @@ -334,6 +348,9 @@ gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id) case ARG_MAX_LEVEL: GTK_VALUE_INT (*arg) = queue->max_buffers; break; + case ARG_BLOCK: + GTK_VALUE_BOOL (*arg) = queue->block; + break; default: arg->type = GTK_TYPE_INVALID; break; diff --git a/gst/elements/gstqueue.h b/gst/elements/gstqueue.h index a06ff3080e..0274d890a7 100644 --- a/gst/elements/gstqueue.h +++ b/gst/elements/gstqueue.h @@ -59,6 +59,7 @@ struct _GstQueue { gint level_buffers; /* number of buffers queued here */ gint max_buffers; /* maximum number of buffers queued here */ + gboolean block; /* if set to FALSE, _get returns NULL if queue empty */ gint level_bytes; /* number of bytes queued here */ gint size_buffers; /* size of queue in buffers */ gint size_bytes; /* size of queue in bytes */ diff --git a/gst/gstbin.c b/gst/gstbin.c index c540a539f7..d658e6fefd 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -24,6 +24,8 @@ #include "gstsrc.h" #include "gstconnection.h" +#include "gstscheduler.h" + GstElementDetails gst_bin_details = { "Generic bin", "Bin", @@ -43,6 +45,7 @@ static gboolean gst_bin_change_state_type (GstBin *bin, GtkType type); static void gst_bin_create_plan_func (GstBin *bin); +//static void gst_bin_schedule_func (GstBin *bin); static void gst_bin_iterate_func (GstBin *bin); static xmlNodePtr gst_bin_save_thyself (GstElement *element, xmlNodePtr parent); @@ -109,6 +112,7 @@ gst_bin_class_init (GstBinClass *klass) klass->change_state_type = gst_bin_change_state_type; klass->create_plan = gst_bin_create_plan_func; + klass->schedule = gst_bin_schedule_func; klass->iterate = gst_bin_iterate_func; gstelement_class->change_state = gst_bin_change_state; @@ -515,158 +519,28 @@ gst_bin_create_plan (GstBin *bin) (oclass->create_plan) (bin); } +/** + * gst_bin_schedule: + * @bin: #GstBin to schedule + * + * let the bin figure out how to handle the plugins in it. + */ +void +gst_bin_schedule (GstBin *bin) +{ + GstBinClass *oclass; + + oclass = GST_BIN_CLASS (GTK_OBJECT (bin)->klass); + + if (oclass->schedule) + (oclass->schedule) (bin); +} + typedef struct { gulong offset; gulong size; } region_struct; -static int -gst_bin_loopfunc_wrapper (int argc,char *argv[]) -{ - GstElement *element = GST_ELEMENT (argv); - G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); - - DEBUG_ENTER("(%d,'%s')",argc,name); - - do { - DEBUG("calling loopfunc %s for element %s\n", - GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name); - (element->loopfunc) (element); - DEBUG("element %s ended loop function\n", name); - } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); - GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); - - DEBUG_LEAVE("(%d,'%s')",argc,name); - return 0; -} - -static int -gst_bin_chain_wrapper (int argc,char *argv[]) -{ - GstElement *element = GST_ELEMENT (argv); - G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); - GList *pads; - GstPad *pad; - GstBuffer *buf; - - DEBUG_ENTER("(\"%s\")",name); - DEBUG("stepping through pads\n"); - do { - pads = element->pads; - while (pads) { - pad = GST_PAD (pads->data); - pads = g_list_next (pads); - if (pad->direction == GST_PAD_SINK) { - DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad)); - buf = gst_pad_pull (pad); - DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad)); - (pad->chainfunc) (pad,buf); - DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad)); - } - } - } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); - GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); - - DEBUG_LEAVE("(%d,'%s')",argc,name); - return 0; -} - -static int -gst_bin_src_wrapper (int argc,char *argv[]) -{ - GstElement *element = GST_ELEMENT (argv); - GList *pads; - GstPad *pad; - GstBuffer *buf; - G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); - - DEBUG_ENTER("(%d,\"%s\")",argc,name); - - do { - pads = element->pads; - while (pads) { - pad = GST_PAD (pads->data); - if (pad->direction == GST_PAD_SRC) { - region_struct *region = cothread_get_data (element->threadstate, "region"); - DEBUG("calling _getfunc for %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - if (region) { - //gst_src_push_region (GST_SRC (element), region->offset, region->size); - if (pad->getregionfunc == NULL) - fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name); - buf = (pad->getregionfunc)(pad, region->offset, region->size); - } else { - if (pad->getfunc == NULL) - fprintf(stderr,"error, no getfunc in \"%s\"\n", name); - buf = (pad->getfunc)(pad); - } - - DEBUG("calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - gst_pad_push (pad, buf); - } - pads = g_list_next(pads); - } - } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); - GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); - - DEBUG_LEAVE(""); - return 0; -} - -static void -gst_bin_pushfunc_proxy (GstPad *pad, GstBuffer *buf) -{ - cothread_state *threadstate = GST_ELEMENT(pad->parent)->threadstate; - DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad)); - DEBUG("putting buffer %p in peer's pen\n",buf); - pad->peer->bufpen = buf; - DEBUG("switching to %p (@%p)\n",threadstate,&(GST_ELEMENT(pad->parent)->threadstate)); - cothread_switch (threadstate); - DEBUG("done switching\n"); -} - -static GstBuffer* -gst_bin_pullfunc_proxy (GstPad *pad) -{ - GstBuffer *buf; - - cothread_state *threadstate = GST_ELEMENT(pad->parent)->threadstate; - DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad)); - if (pad->bufpen == NULL) { - DEBUG("switching to %p (@%p)\n",threadstate,&(GST_ELEMENT(pad->parent)->threadstate)); - cothread_switch (threadstate); - } - DEBUG("done switching\n"); - buf = pad->bufpen; - pad->bufpen = NULL; - return buf; -} - -static GstBuffer * -gst_bin_chainfunc_proxy (GstPad *pad) -{ - GstBuffer *buf; -} - -// FIXME!!! -static void -gst_bin_pullregionfunc_proxy (GstPad *pad, - gulong offset, - gulong size) -{ - region_struct region; - cothread_state *threadstate; - - DEBUG_ENTER("%s:%s,%ld,%ld",GST_DEBUG_PAD_NAME(pad),offset,size); - - region.offset = offset; - region.size = size; - - threadstate = GST_ELEMENT(pad->parent)->threadstate; - cothread_set_data (threadstate, "region", ®ion); - cothread_switch (threadstate); - cothread_set_data (threadstate, "region", NULL); -} - static void gst_bin_create_plan_func (GstBin *bin) @@ -675,12 +549,14 @@ gst_bin_create_plan_func (GstBin *bin) GList *elements; GstElement *element; const gchar *elementname; - GSList *pending_bins = NULL; + GSList *pending = NULL; GstBin *pending_bin; - GList *pads; - GstPad *pad; - GstElement *peer_manager; - cothread_func wrapper_function; +// GList *pads; +// GstPad *pad; +// GstElement *peer_manager; +// cothread_func wrapper_function; +// _GstBinChain *chain; +// GList *chains; DEBUG_SET_STRING("(\"%s\")",gst_element_get_name (GST_ELEMENT (bin))); DEBUG_ENTER_STRING; @@ -737,16 +613,18 @@ gst_bin_create_plan_func (GstBin *bin) // if it's a loop-based element, use cothreads if (element->loopfunc != NULL) { DEBUG("requiring cothreads because \"%s\" is a loop-based element\n",elementname); - bin->need_cothreads = TRUE; + GST_FLAG_SET (element, GST_ELEMENT_USE_COTHREAD); // if it's a 'complex' element, use cothreads } else if (GST_FLAG_IS_SET (element, GST_ELEMENT_COMPLEX)) { DEBUG("requiring cothreads because \"%s\" is complex\n",elementname); - bin->need_cothreads = TRUE; + GST_FLAG_SET (element, GST_ELEMENT_USE_COTHREAD); // if the element has more than one sink pad, use cothreads } else if (element->numsinkpads > 1) { DEBUG("requiring cothreads because \"%s\" has more than one sink pad\n",elementname); - bin->need_cothreads = TRUE; + GST_FLAG_SET (element, GST_ELEMENT_USE_COTHREAD); } + if (GST_FLAG_IS_SET (element, GST_ELEMENT_USE_COTHREAD)) + bin->need_cothreads = TRUE; } } @@ -768,11 +646,11 @@ gst_bin_create_plan_func (GstBin *bin) // find all the managed children // here we pull off the trick of walking an entire arbitrary tree without recursion DEBUG("attempting to find all the elements to manage\n"); - pending_bins = g_slist_prepend (pending_bins, bin); + pending = g_slist_prepend (pending, bin); do { // retrieve the top of the stack and pop it - pending_bin = GST_BIN (pending_bins->data); - pending_bins = g_slist_remove (pending_bins, pending_bin); + pending_bin = GST_BIN (pending->data); + pending = g_slist_remove (pending, pending_bin); // walk the list of elements, find bins, and do stuff DEBUG("checking Bin \"%s\" for managed elements\n", @@ -790,7 +668,7 @@ gst_bin_create_plan_func (GstBin *bin) // if it's a Bin, add it to the list of Bins to check if (GST_IS_BIN (element)) { DEBUG("flattened recurse into \"%s\"\n",elementname); - pending_bins = g_slist_prepend (pending_bins, element); + pending = g_slist_prepend (pending, element); // otherwise add it to the list of elements } else { DEBUG("found element \"%s\" that I manage\n",elementname); @@ -799,129 +677,11 @@ gst_bin_create_plan_func (GstBin *bin) } } } - } while (pending_bins); + } while (pending); DEBUG("have %d elements to manage, implementing plan\n",bin->num_managed_elements); - // If cothreads are needed, we need to not only find elements but - // set up cothread states and various proxy functions. - if (bin->need_cothreads) { - DEBUG("bin is using cothreads\n"); - - // first create thread context - if (bin->threadcontext == NULL) { - DEBUG("initializing cothread context\n"); - bin->threadcontext = cothread_init (); - } - - // walk through all the children - elements = bin->managed_elements; - while (elements) { - element = GST_ELEMENT (elements->data); - elements = g_list_next (elements); - - // start out with a NULL warpper function, we'll set it if we want a cothread - wrapper_function = NULL; - - // have to decide if we need to or can use a cothreads, and if so which wrapper - // first of all, if there's a loopfunc, the decision's already made - if (element->loopfunc != NULL) { - wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper); - DEBUG("element %s is a loopfunc, must use a cothread\n",gst_element_get_name(element)); - } else { - // otherwise we need to decide if it needs a cothread - // if it's complex, or cothreads are preferred and it's *not* passive, cothread it - if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) || - (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) && - !GST_FLAG_IS_SET (element,GST_ELEMENT_SCHEDULE_PASSIVELY))) { - // base it on whether we're going to loop through source or sink pads - if (element->numsinkpads == 0) - wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper); - else - wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper); - } - } - - // walk through the all the pads for this element, setting proxy functions - // the selection of proxy functions depends on whether we're in a cothread or not - pads = gst_element_get_pad_list (element); - while (pads) { - pad = GST_PAD (pads->data); - pads = g_list_next (pads); - - // check to see if someone else gets to set up the element - peer_manager = GST_ELEMENT((pad)->peer->parent)->manager; - if (peer_manager != GST_ELEMENT(bin)) { - DEBUG("WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad)); - } - - // if the wrapper_function is set, we need to use the proxy functions - if (wrapper_function != NULL) { - // set up proxy functions - if (gst_pad_get_direction (pad) == GST_PAD_SINK) { - DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy); - } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) { - DEBUG("setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy); - } - } else { - // otherwise we need to set up for 'traditional' chaining - if (gst_pad_get_direction (pad) == GST_PAD_SINK) { - // we can just copy the chain function, since it shares the prototype - DEBUG("copying chain function into push proxy for %s:%s\n", - GST_DEBUG_PAD_NAME(pad)); - pad->pushfunc = pad->chainfunc; - } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) { - // we can just copy the get function, since it shares the prototype - DEBUG("copying get function into pull proxy for %s:%s\n", - GST_DEBUG_PAD_NAME(pad)); - pad->pullfunc = pad->getfunc; - } - } - } - - // if a loopfunc has been specified, create and set up a cothread - if (wrapper_function != NULL) { - if (element->threadstate == NULL) { - element->threadstate = cothread_create (bin->threadcontext); - DEBUG("created cothread %p (@%p) for \"%s\"\n",element->threadstate, - &element->threadstate,gst_element_get_name(element)); - } - cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element); - DEBUG("set wrapper function for \"%s\" to &%s\n",gst_element_get_name(element), - GST_DEBUG_FUNCPTR_NAME(wrapper_function)); - } - -// // HACK: if the element isn't passive, it's an entry -// if (!GST_FLAG_IS_SET(element,GST_ELEMENT_SCHEDULE_PASSIVELY)) -// bin->entries = g_list_append(bin->entries, element); - } - - // otherwise, cothreads are not needed - } else { - DEBUG("bin is chained, no cothreads needed\n"); - - elements = bin->managed_elements; - while (elements) { - element = GST_ELEMENT (elements->data); - elements = g_list_next (elements); - - pads = gst_element_get_pad_list (element); - while (pads) { - pad = GST_PAD (pads->data); - pads = g_list_next (pads); - - if (gst_pad_get_direction (pad) == GST_PAD_SINK) { - DEBUG("copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - pad->pushfunc = pad->chainfunc; - } else { - DEBUG("copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - pad->pullfunc = pad->getfunc; - } - } - } - } + gst_bin_schedule(bin); DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin))); } @@ -929,6 +689,8 @@ gst_bin_create_plan_func (GstBin *bin) void gst_bin_iterate_func (GstBin *bin) { + GList *chains; + _GstBinChain *chain; GList *entries; GstElement *entry; GList *pads; @@ -942,187 +704,61 @@ gst_bin_iterate_func (GstBin *bin) g_return_if_fail (GST_IS_BIN (bin)); g_return_if_fail (GST_STATE (bin) == GST_STATE_PLAYING); - if (bin->need_cothreads) { - // all we really have to do is switch to the first child - // FIXME this should be lots more intelligent about where to start - DEBUG("starting iteration via cothreads\n"); + // step through all the chains + chains = bin->chains; + while (chains) { + chain = (_GstBinChain *)(chains->data); + chains = g_list_next (chains); - entry = GST_ELEMENT (bin->managed_elements->data); - GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING); - DEBUG("set COTHREAD_STOPPING flag on \"%s\"(@%p)\n", - gst_element_get_name(entry),entry); - cothread_switch (entry->threadstate); + if (chain->use_cothreads) { + // all we really have to do is switch to the first child + // FIXME this should be lots more intelligent about where to start + DEBUG("starting iteration via cothreads\n"); - } else { - DEBUG("starting iteration via chain-functions\n"); + entry = GST_ELEMENT (chain->elements->data); + GST_FLAG_SET (entry, GST_ELEMENT_COTHREAD_STOPPING); + DEBUG("set COTHREAD_STOPPING flag on \"%s\"(@%p)\n", + gst_element_get_name(entry),entry); + cothread_switch (entry->threadstate); - if (bin->num_entries <= 0) { - DEBUG("no entries in bin \"%s\", trying managed elements...\n", - gst_element_get_name(GST_ELEMENT(bin))); - // we will try loop over the elements then... - entries = bin->managed_elements; - } - else { - entries = bin->entries; - } + } else { + DEBUG("starting iteration via chain-functions\n"); - g_assert (entries != NULL); + entries = chain->entries; - while (entries) { - entry = GST_ELEMENT (entries->data); - entries = g_list_next (entries); + g_assert (entries != NULL); - DEBUG("have entry \"%s\"\n",gst_element_get_name(entry)); + while (entries) { + entry = GST_ELEMENT (entries->data); + entries = g_list_next (entries); - if (GST_IS_SRC (entry) || GST_IS_CONNECTION (entry)) { - pads = entry->pads; - while (pads) { - pad = GST_PAD (pads->data); - if (pad->direction == GST_PAD_SRC) { - DEBUG("calling getfunc of %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - if (pad->getfunc == NULL) - fprintf(stderr, "error, no getfunc in \"%s\"\n", gst_element_get_name (entry)); - else - buf = (pad->getfunc)(pad); - gst_pad_push(pad,buf); + DEBUG("have entry \"%s\"\n",gst_element_get_name(entry)); + + if (GST_IS_SRC (entry) || GST_IS_CONNECTION (entry)) { + pads = entry->pads; + while (pads) { + pad = GST_PAD (pads->data); + if (pad->direction == GST_PAD_SRC) { + DEBUG("calling getfunc of %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + if (pad->getfunc == NULL) + fprintf(stderr, "error, no getfunc in \"%s\"\n", gst_element_get_name (entry)); + else + buf = (pad->getfunc)(pad); + gst_pad_push(pad,buf); + } + pads = g_list_next (pads); } - pads = g_list_next (pads); +// } else if (GST_IS_CONNECTION (entry)) { +// gst_connection_push (GST_CONNECTION (entry)); + } else if (GST_IS_BIN (entry)) + gst_bin_iterate (GST_BIN (entry)); + else { + fprintf(stderr, "gstbin: entry \"%s\" cannot be handled\n", gst_element_get_name (entry)); +// g_assert_not_reached (); } -// } else if (GST_IS_CONNECTION (entry)) { -// gst_connection_push (GST_CONNECTION (entry)); - } else if (GST_IS_BIN (entry)) - gst_bin_iterate (GST_BIN (entry)); - else { - fprintf(stderr, "gstbin: entry \"%s\" cannot be handled\n", gst_element_get_name (entry)); -// g_assert_not_reached (); } } } DEBUG_LEAVE("(%s)", gst_element_get_name (GST_ELEMENT (bin))); } - - - -/* - // ***** check for possible connections outside - // get the pad's peer - peer = gst_pad_get_peer (pad); - // FIXME this should be an error condition, if not disabled - if (!peer) break; - // get the parent of the peer of the pad - outside = GST_ELEMENT (gst_pad_get_parent (peer)); - // FIXME this should *really* be an error condition - if (!outside) break; - // if it's a source or connection and it's not ours... - if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) && - (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) { - if (gst_pad_get_direction (pad) == GST_PAD_SINK) { - DEBUG("dealing with outside source element %s\n",gst_element_get_name(outside)); -// DEBUG("PUNT: copying pullfunc ptr from %s:%s to %s:%s (@ %p)\n", -//GST_DEBUG_PAD_NAME(pad->peer),GST_DEBUG_PAD_NAME(pad),&pad->pullfunc); -// pad->pullfunc = pad->peer->pullfunc; -// DEBUG("PUNT: setting pushfunc proxy to fake proxy on %s:%s\n",GST_DEBUG_PAD_NAME(pad->peer)); -// pad->peer->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_fake_proxy); - pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy); - } - } else { -*/ - - - - - -/* - } else if (GST_IS_SRC (element)) { - DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element)); - bin->entries = g_list_prepend (bin->entries,element); - bin->num_entries++; - cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element); - } - - pads = gst_element_get_pad_list (element); - while (pads) { - pad = GST_PAD(pads->data); - - if (gst_pad_get_direction (pad) == GST_PAD_SINK) { - DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - // set the proxy functions - pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy); - DEBUG("pushfunc %p = gst_bin_pushfunc_proxy %p\n",&pad->pushfunc,gst_bin_pushfunc_proxy); - } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) { - DEBUG("setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); - // set the proxy functions - pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy); - DEBUG("pad->pullfunc(@%p) = gst_bin_pullfunc_proxy(@%p)\n", - &pad->pullfunc,gst_bin_pullfunc_proxy); - pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy); - } - pads = g_list_next (pads); - } - elements = g_list_next (elements); - - // if there are no entries, we have to pick one at random - if (bin->num_entries == 0) - bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data)); - } - } else { - DEBUG("don't need cothreads, looking for entry points\n"); - // we have to find which elements will drive an iteration - elements = bin->children; - while (elements) { - element = GST_ELEMENT (elements->data); - DEBUG("found element \"%s\"\n", gst_element_get_name (element)); - if (GST_IS_BIN (element)) { - gst_bin_create_plan (GST_BIN (element)); - } - if (GST_IS_SRC (element)) { - DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element)); - bin->entries = g_list_prepend (bin->entries, element); - bin->num_entries++; - } - - // go through all the pads, set pointers, and check for connections - pads = gst_element_get_pad_list (element); - while (pads) { - pad = GST_PAD (pads->data); - - if (gst_pad_get_direction (pad) == GST_PAD_SINK) { - DEBUG("found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad)); - - // copy the peer's chain function, easy enough - DEBUG("copying peer's chainfunc to %s:%s's pushfunc\n",GST_DEBUG_PAD_NAME(pad)); - pad->pushfunc = GST_DEBUG_FUNCPTR(pad->peer->chainfunc); - - // need to walk through and check for outside connections -//FIXME need to do this for all pads - // get the pad's peer - peer = gst_pad_get_peer (pad); - if (!peer) { - DEBUG("found SINK pad %s has no peer\n", gst_pad_get_name (pad)); - break; - } - // get the parent of the peer of the pad - outside = GST_ELEMENT (gst_pad_get_parent (peer)); - if (!outside) break; - // if it's a connection and it's not ours... - if (GST_IS_CONNECTION (outside) && - (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) { - gst_info("gstbin: element \"%s\" is the external source Connection " - "for internal element \"%s\"\n", - gst_element_get_name (GST_ELEMENT (outside)), - gst_element_get_name (GST_ELEMENT (element))); - bin->entries = g_list_prepend (bin->entries, outside); - bin->num_entries++; - } - } - else { - DEBUG("found pad %s\n", gst_pad_get_name (pad)); - } - pads = g_list_next (pads); - - } - elements = g_list_next (elements); - } -*/ - diff --git a/gst/gstbin.h b/gst/gstbin.h index e01a83eab4..e523fe7509 100644 --- a/gst/gstbin.h +++ b/gst/gstbin.h @@ -54,6 +54,7 @@ typedef enum { typedef struct _GstBin GstBin; typedef struct _GstBinClass GstBinClass; +typedef struct __GstBinChain _GstBinChain; struct _GstBin { GstElement element; @@ -66,12 +67,14 @@ struct _GstBin { gboolean need_cothreads; GList *managed_elements; gint num_managed_elements; + + GList *chains; + gint num_chains; GList *entries; gint num_entries; cothread_context *threadcontext; gboolean use_cothreads; - GList *outside_schedules; }; struct _GstBinClass { @@ -87,10 +90,21 @@ struct _GstBinClass { GtkType type); /* create a plan for the execution of the bin */ void (*create_plan) (GstBin *bin); + void (*schedule) (GstBin *bin); /* run a full iteration of operation */ void (*iterate) (GstBin *bin); }; +struct __GstBinChain { + GList *elements; + gint num_elements; + + GList *entries; + + gboolean use_cothreads; + + GstElement *entry; +}; GtkType gst_bin_get_type (void); @@ -109,6 +123,7 @@ GstElement* gst_bin_get_by_name (GstBin *bin, GList* gst_bin_get_list (GstBin *bin); void gst_bin_create_plan (GstBin *bin); +void gst_bin_schedule (GstBin *bin); gboolean gst_bin_set_state_type (GstBin *bin, GstElementState state, GtkType type); diff --git a/gst/gstdebug.h b/gst/gstdebug.h index 462dc77253..b438332057 100644 --- a/gst/gstdebug.h +++ b/gst/gstdebug.h @@ -101,6 +101,7 @@ G_GNUC_UNUSED static GModule *_debug_self_module = NULL; (_debug_string != NULL) ? \ fprintf(stderr,GST_DEBUG_PREFIX("%s: "format , _debug_string , ## args )) : \ fprintf(stderr,GST_DEBUG_PREFIX(": "format , ## args )) +#define DEBUG_NOPREFIX(format,args...) fprintf(stderr,format , ## args ) #define DEBUG_ENTER(format, args...) \ fprintf(stderr,GST_DEBUG_PREFIX(format": entering\n" , ## args )) #define DEBUG_SET_STRING(format, args...) \ @@ -112,6 +113,7 @@ G_GNUC_UNUSED static GModule *_debug_self_module = NULL; #define DEBUG_LEAVE_STRING DEBUG_LEAVE("%s",_debug_string) #else #define DEBUG(format, args...) +#define DEBUG_NOPREFIX(format, args...) #define DEBUG_ENTER(format, args...) #define DEBUG_LEAVE(format, args...) #define DEBUG_SET_STRING(format, args...) diff --git a/gst/gstelement.h b/gst/gstelement.h index dd37c501f5..3ebb0e387e 100644 --- a/gst/gstelement.h +++ b/gst/gstelement.h @@ -85,8 +85,9 @@ static inline char *_gst_print_statename(int state) { typedef enum { // element is complex (for some def.) and generally require a cothread GST_ELEMENT_COMPLEX = GST_OBJECT_FLAG_LAST, - // not to be scheduled directly, let others trigger all events - GST_ELEMENT_SCHEDULE_PASSIVELY, + // input and output pads aren't directly coupled to each other + // examples: queues, multi-output async readers, etc. + GST_ELEMENT_DECOUPLED, // this element should be placed in a thread if at all possible GST_ELEMENT_THREAD_SUGGESTED, // this element is incable of seeking (FIXME: does this apply to filters?) @@ -96,6 +97,8 @@ typedef enum { GST_ELEMENT_NEW_LOOPFUNC, // the cothread holding this element needs to be stopped GST_ELEMENT_COTHREAD_STOPPING, + // the element has to be scheduled as a cothread for any sanity + GST_ELEMENT_USE_COTHREAD, /* use some padding for future expansion */ GST_ELEMENT_FLAG_LAST = GST_OBJECT_FLAG_LAST + 8, diff --git a/gst/gstscheduler.c b/gst/gstscheduler.c new file mode 100644 index 0000000000..109c33f519 --- /dev/null +++ b/gst/gstscheduler.c @@ -0,0 +1,631 @@ +/* Gnome-Streamer + * Copyright (C) <1999> Erik Walthinsen + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + +#define GST_DEBUG_ENABLED + +#include "gstscheduler.h" +#include "gstdebug.h" + + +static int +gst_bin_loopfunc_wrapper (int argc,char *argv[]) +{ + GstElement *element = GST_ELEMENT (argv); + G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); + + DEBUG_ENTER("(%d,'%s')",argc,name); + + do { + DEBUG("calling loopfunc %s for element %s\n", + GST_DEBUG_FUNCPTR_NAME (element->loopfunc),name); + (element->loopfunc) (element); + DEBUG("element %s ended loop function\n", name); + } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); + GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); + + DEBUG_LEAVE("(%d,'%s')",argc,name); + return 0; +} + +static int +gst_bin_chain_wrapper (int argc,char *argv[]) +{ + GstElement *element = GST_ELEMENT (argv); + G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); + GList *pads; + GstPad *pad; + GstBuffer *buf; + + DEBUG_ENTER("(\"%s\")",name); + DEBUG("stepping through pads\n"); + do { + pads = element->pads; + while (pads) { + pad = GST_PAD (pads->data); + pads = g_list_next (pads); + if (pad->direction == GST_PAD_SINK) { + DEBUG("pulling a buffer from %s:%s\n", name, gst_pad_get_name (pad)); + buf = gst_pad_pull (pad); + DEBUG("calling chain function of %s:%s\n", name, gst_pad_get_name (pad)); + (pad->chainfunc) (pad,buf); + DEBUG("calling chain function of %s:%s done\n", name, gst_pad_get_name (pad)); + } + } + } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); + GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); + + DEBUG_LEAVE("(%d,'%s')",argc,name); + return 0; +} + +static int +gst_bin_src_wrapper (int argc,char *argv[]) +{ + GstElement *element = GST_ELEMENT (argv); + GList *pads; + GstPad *pad; + GstBuffer *buf; + G_GNUC_UNUSED const gchar *name = gst_element_get_name (element); + + DEBUG_ENTER("(%d,\"%s\")",argc,name); + + do { + pads = element->pads; + while (pads) { + pad = GST_PAD (pads->data); + if (pad->direction == GST_PAD_SRC) { +// region_struct *region = cothread_get_data (element->threadstate, "region"); + DEBUG("calling _getfunc for %s:%s\n",GST_DEBUG_PAD_NAME(pad)); +// if (region) { + //gst_src_push_region (GST_SRC (element), region->offset, region->size); +// if (pad->getregionfunc == NULL) +// fprintf(stderr,"error, no getregionfunc in \"%s\"\n", name); +// buf = (pad->getregionfunc)(pad, region->offset, region->size); +// } else { + if (pad->getfunc == NULL) + fprintf(stderr,"error, no getfunc in \"%s\"\n", name); + buf = (pad->getfunc)(pad); +// } + + DEBUG("calling gst_pad_push on pad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + gst_pad_push (pad, buf); + } + pads = g_list_next(pads); + } + } while (!GST_ELEMENT_IS_COTHREAD_STOPPING(element)); + GST_FLAG_UNSET(element,GST_ELEMENT_COTHREAD_STOPPING); + + DEBUG_LEAVE(""); + return 0; +} + +static void +gst_bin_pushfunc_proxy (GstPad *pad, GstBuffer *buf) +{ + cothread_state *threadstate = GST_ELEMENT(pad->parent)->threadstate; + DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad)); + DEBUG("putting buffer %p in peer's pen\n",buf); + pad->peer->bufpen = buf; + DEBUG("switching to %p (@%p)\n",threadstate,&(GST_ELEMENT(pad->parent)->threadstate)); + cothread_switch (threadstate); + DEBUG("done switching\n"); +} + +static GstBuffer* +gst_bin_pullfunc_proxy (GstPad *pad) +{ + GstBuffer *buf; + + cothread_state *threadstate = GST_ELEMENT(pad->parent)->threadstate; + DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad)); + if (pad->bufpen == NULL) { + DEBUG("switching to %p (@%p)\n",threadstate,&(GST_ELEMENT(pad->parent)->threadstate)); + cothread_switch (threadstate); + } + DEBUG("done switching\n"); + buf = pad->bufpen; + pad->bufpen = NULL; + return buf; +} + +static GstBuffer * +gst_bin_chainfunc_proxy (GstPad *pad) +{ +// FIXME!! +// GstBuffer *buf; + return NULL; +} + +// FIXME!!! +static void +gst_bin_pullregionfunc_proxy (GstPad *pad, + gulong offset, + gulong size) +{ +// region_struct region; + cothread_state *threadstate; + + DEBUG_ENTER("%s:%s,%ld,%ld",GST_DEBUG_PAD_NAME(pad),offset,size); + +// region.offset = offset; +// region.size = size; + +// threadstate = GST_ELEMENT(pad->parent)->threadstate; +// cothread_set_data (threadstate, "region", ®ion); + cothread_switch (threadstate); +// cothread_set_data (threadstate, "region", NULL); +} + + +static void +gst_schedule_cothreaded_chain (GstBin *bin, _GstBinChain *chain) { + GList *elements; + GstElement *element; + cothread_func wrapper_function; + GList *pads; + GstPad *pad; + + DEBUG("chain is using cothreads\n"); + + // first create thread context + if (bin->threadcontext == NULL) { + DEBUG("initializing cothread context\n"); + bin->threadcontext = cothread_init (); + } + + // walk through all the chain's elements + elements = chain->elements; + while (elements) { + element = GST_ELEMENT (elements->data); + elements = g_list_next (elements); + + // start out without a wrapper function, we select it later + wrapper_function = NULL; + + // if the element has a loopfunc... + if (element->loopfunc != NULL) { + wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper); + DEBUG("\nelement '%s' is a loop-based\n",gst_element_get_name(element)); + } else { + // otherwise we need to decide what kind of cothread + // if it's not DECOUPLED, we decide based on whether it's a source or not + if (!GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) { + // if it doesn't have any sinks, it must be a source (duh) + if (element->numsinkpads == 0) { + wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper); + DEBUG("\nelement '%s' is a source, using _src_wrapper\n",gst_element_get_name(element)); + } else { + wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper); + DEBUG("\nelement '%s' is a filter, using _chain_wrapper\n",gst_element_get_name(element)); + } + } + } + + // now we have to walk through the pads to set up their state + pads = gst_element_get_pad_list (element); + while (pads) { + pad = GST_PAD (pads->data); + pads = g_list_next (pads); + + // if the element is DECOUPLED or outside the manager, we have to chain + if ((wrapper_function == NULL) || + (GST_ELEMENT(pad->peer->parent)->manager != GST_ELEMENT(bin))) { + // set the chain proxies + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + DEBUG("copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + pad->pushfunc = pad->chainfunc; + } else { + DEBUG("copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + pad->pullfunc = pad->getfunc; + pad->pullregionfunc = pad->getregionfunc; + } + + // otherwise we really are a cothread + } else { + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + DEBUG("setting cothreaded push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy); + } else { + DEBUG("setting cothreaded pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy); + } + } + } + + // need to set up the cothread now + if (wrapper_function != NULL) { + if (element->threadstate == NULL) { + element->threadstate = cothread_create (bin->threadcontext); + DEBUG("created cothread %p for '%s'\n",element->threadstate,gst_element_get_name(element)); + } + cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element); + DEBUG("set wrapper function for '%s' to &%s\n",gst_element_get_name(element), + GST_DEBUG_FUNCPTR_NAME(wrapper_function)); + } + } +} + +void gst_bin_schedule_func(GstBin *bin) { +// GstElement *manager; + GList *elements; + GstElement *element; +// const gchar *elementname; + GSList *pending = NULL; +// GstBin *pending_bin; + GList *pads; + GstPad *pad; +// GstElement *peer_manager; + GList *chains; + _GstBinChain *chain; + + DEBUG_SET_STRING("(\"%s\")",gst_element_get_name (GST_ELEMENT (bin))); + DEBUG_ENTER_STRING; + + // next we have to find all the separate scheduling chains + DEBUG("\nattempting to find scheduling chains...\n"); + // first make a copy of the managed_elements we can mess with + elements = g_list_copy (bin->managed_elements); + // we have to repeat until the list is empty to get all chains + while (elements) { + element = GST_ELEMENT (elements->data); + + // if this is a DECOUPLED element + if (GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) { + // skip this element entirely + DEBUG("skipping '%s' because it's decoupled\n",gst_element_get_name(element)); + elements = g_list_next (elements); + continue; + } + + DEBUG("starting with element '%s'\n",gst_element_get_name(element)); + + // prime the pending list with the first element off the top + pending = g_slist_prepend (NULL, element); + // and remove that one from the main list + elements = g_list_remove (elements, element); + + // create a chain structure + chain = g_new0 (_GstBinChain, 1); + + // for each pending element, walk the pipeline + do { + // retrieve the top of the stack and pop it + element = GST_ELEMENT (pending->data); + pending = g_slist_remove (pending, element); + + // add ourselves to the chain's list of elements + DEBUG("adding '%s' to chain\n",gst_element_get_name(element)); + chain->elements = g_list_prepend (chain->elements, element); + chain->num_elements++; + // set the cothreads flag as appropriate + if (GST_FLAG_IS_SET (element, GST_ELEMENT_USE_COTHREAD)) + chain->use_cothreads = TRUE; + + // if we're managed by the current bin, and we're not decoupled, + // go find all the peers and add them to the list of elements to check + if ((element->manager == GST_ELEMENT(bin)) && + !GST_FLAG_IS_SET (element, GST_ELEMENT_DECOUPLED)) { + // remove ourselves from the outer list of all managed elements + DEBUG("removing '%s' from list of possible elements\n",gst_element_get_name(element)); + elements = g_list_remove (elements, element); + + // now we have to walk the pads to find peers + pads = gst_element_get_pad_list (element); + while (pads) { + pad = GST_PAD (pads->data); + pads = g_list_next (pads); + DEBUG("have pad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + + // only bother with if the pad's peer's parent is this bin or it's DECOUPLED + // only add it if it's in the list of un-visited elements still + if ((g_list_find (elements, pad->peer->parent) != NULL) || + GST_FLAG_IS_SET (pad->peer->parent, GST_ELEMENT_DECOUPLED)) { + // add the peer element to the pending list + DEBUG("adding '%s' to list of pending elements\n",gst_element_get_name(GST_ELEMENT(pad->peer->parent))); + pending = g_slist_prepend (pending, GST_ELEMENT(pad->peer->parent)); + } else + DEBUG("element '%s' has already been dealt with\n",gst_element_get_name(GST_ELEMENT(pad->peer->parent))); + } + } + } while (pending); + + // add the chain to the bin + DEBUG("have chain with %d elements: ",chain->num_elements); + { GList *elements = chain->elements; + while (elements) { + element = GST_ELEMENT (elements->data); + elements = g_list_next(elements); + DEBUG_NOPREFIX("%s, ",gst_element_get_name(element)); + } + } + DEBUG_NOPREFIX("\n"); + bin->chains = g_list_prepend (bin->chains, chain); + bin->num_chains++; + } + // free up the list in case it's full of DECOUPLED elements + g_list_free (elements); + + DEBUG("\nwe have %d chains to schedule\n",bin->num_chains); + + // now we have to go through all the chains and schedule them + chains = bin->chains; + while (chains) { + chain = (_GstBinChain *)(chains->data); + chains = g_list_next (chains); + + // schedule as appropriate + if (chain->use_cothreads) { + gst_schedule_cothreaded_chain (bin,chain); + } else { + DEBUG("non-cothreaded case not coded yet\n"); + } + } + + DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin))); +} + + +/* + // ***** check for possible connections outside + // get the pad's peer + peer = gst_pad_get_peer (pad); + // FIXME this should be an error condition, if not disabled + if (!peer) break; + // get the parent of the peer of the pad + outside = GST_ELEMENT (gst_pad_get_parent (peer)); + // FIXME this should *really* be an error condition + if (!outside) break; + // if it's a source or connection and it's not ours... + if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) && + (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) { + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + DEBUG("dealing with outside source element %s\n",gst_element_get_name(outside)); +// DEBUG("PUNT: copying pullfunc ptr from %s:%s to %s:%s (@ %p)\n", +//GST_DEBUG_PAD_NAME(pad->peer),GST_DEBUG_PAD_NAME(pad),&pad->pullfunc); +// pad->pullfunc = pad->peer->pullfunc; +// DEBUG("PUNT: setting pushfunc proxy to fake proxy on %s:%s\n",GST_DEBUG_PAD_NAME(pad->peer)); +// pad->peer->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_fake_proxy); + pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy); + } + } else { +*/ + + + + + +/* + } else if (GST_IS_SRC (element)) { + DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element)); + bin->entries = g_list_prepend (bin->entries,element); + bin->num_entries++; + cothread_setfunc(element->threadstate,gst_bin_src_wrapper,0,(char **)element); + } + + pads = gst_element_get_pad_list (element); + while (pads) { + pad = GST_PAD(pads->data); + + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + // set the proxy functions + pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy); + DEBUG("pushfunc %p = gst_bin_pushfunc_proxy %p\n",&pad->pushfunc,gst_bin_pushfunc_proxy); + } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) { + DEBUG("setting pull proxies for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + // set the proxy functions + pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy); + DEBUG("pad->pullfunc(@%p) = gst_bin_pullfunc_proxy(@%p)\n", + &pad->pullfunc,gst_bin_pullfunc_proxy); + pad->pullregionfunc = GST_DEBUG_FUNCPTR(gst_bin_pullregionfunc_proxy); + } + pads = g_list_next (pads); + } + elements = g_list_next (elements); + + // if there are no entries, we have to pick one at random + if (bin->num_entries == 0) + bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data)); + } + } else { + DEBUG("don't need cothreads, looking for entry points\n"); + // we have to find which elements will drive an iteration + elements = bin->children; + while (elements) { + element = GST_ELEMENT (elements->data); + DEBUG("found element \"%s\"\n", gst_element_get_name (element)); + if (GST_IS_BIN (element)) { + gst_bin_create_plan (GST_BIN (element)); + } + if (GST_IS_SRC (element)) { + DEBUG("adding '%s' as entry point, because it's a source\n",gst_element_get_name (element)); + bin->entries = g_list_prepend (bin->entries, element); + bin->num_entries++; + } + + // go through all the pads, set pointers, and check for connections + pads = gst_element_get_pad_list (element); + while (pads) { + pad = GST_PAD (pads->data); + + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + DEBUG("found SINK pad %s:%s\n", GST_DEBUG_PAD_NAME(pad)); + + // copy the peer's chain function, easy enough + DEBUG("copying peer's chainfunc to %s:%s's pushfunc\n",GST_DEBUG_PAD_NAME(pad)); + pad->pushfunc = GST_DEBUG_FUNCPTR(pad->peer->chainfunc); + + // need to walk through and check for outside connections +//FIXME need to do this for all pads + // get the pad's peer + peer = gst_pad_get_peer (pad); + if (!peer) { + DEBUG("found SINK pad %s has no peer\n", gst_pad_get_name (pad)); + break; + } + // get the parent of the peer of the pad + outside = GST_ELEMENT (gst_pad_get_parent (peer)); + if (!outside) break; + // if it's a connection and it's not ours... + if (GST_IS_CONNECTION (outside) && + (gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) { + gst_info("gstbin: element \"%s\" is the external source Connection " + "for internal element \"%s\"\n", + gst_element_get_name (GST_ELEMENT (outside)), + gst_element_get_name (GST_ELEMENT (element))); + bin->entries = g_list_prepend (bin->entries, outside); + bin->num_entries++; + } + } + else { + DEBUG("found pad %s\n", gst_pad_get_name (pad)); + } + pads = g_list_next (pads); + + } + elements = g_list_next (elements); + } +*/ + + + + +/* + // If cothreads are needed, we need to not only find elements but + // set up cothread states and various proxy functions. + if (bin->need_cothreads) { + DEBUG("bin is using cothreads\n"); + + // first create thread context + if (bin->threadcontext == NULL) { + DEBUG("initializing cothread context\n"); + bin->threadcontext = cothread_init (); + } + + // walk through all the children + elements = bin->managed_elements; + while (elements) { + element = GST_ELEMENT (elements->data); + elements = g_list_next (elements); + + // start out with a NULL warpper function, we'll set it if we want a cothread + wrapper_function = NULL; + + // have to decide if we need to or can use a cothreads, and if so which wrapper + // first of all, if there's a loopfunc, the decision's already made + if (element->loopfunc != NULL) { + wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_loopfunc_wrapper); + DEBUG("element %s is a loopfunc, must use a cothread\n",gst_element_get_name(element)); + } else { + // otherwise we need to decide if it needs a cothread + // if it's complex, or cothreads are preferred and it's *not* decoupled, cothread it + if (GST_FLAG_IS_SET (element,GST_ELEMENT_COMPLEX) || + (GST_FLAG_IS_SET (bin,GST_BIN_FLAG_PREFER_COTHREADS) && + !GST_FLAG_IS_SET (element,GST_ELEMENT_DECOUPLED))) { + // base it on whether we're going to loop through source or sink pads + if (element->numsinkpads == 0) + wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_src_wrapper); + else + wrapper_function = GST_DEBUG_FUNCPTR(gst_bin_chain_wrapper); + } + } + + // walk through the all the pads for this element, setting proxy functions + // the selection of proxy functions depends on whether we're in a cothread or not + pads = gst_element_get_pad_list (element); + while (pads) { + pad = GST_PAD (pads->data); + pads = g_list_next (pads); + + // check to see if someone else gets to set up the element + peer_manager = GST_ELEMENT((pad)->peer->parent)->manager; + if (peer_manager != GST_ELEMENT(bin)) { + DEBUG("WARNING: pad %s:%s is connected outside of bin\n",GST_DEBUG_PAD_NAME(pad)); + } + + // if the wrapper_function is set, we need to use the proxy functions + if (wrapper_function != NULL) { + // set up proxy functions + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + DEBUG("setting push proxy for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + pad->pushfunc = GST_DEBUG_FUNCPTR(gst_bin_pushfunc_proxy); + } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) { + DEBUG("setting pull proxy for srcpad %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + pad->pullfunc = GST_DEBUG_FUNCPTR(gst_bin_pullfunc_proxy); + } + } else { + // otherwise we need to set up for 'traditional' chaining + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + // we can just copy the chain function, since it shares the prototype + DEBUG("copying chain function into push proxy for %s:%s\n", + GST_DEBUG_PAD_NAME(pad)); + pad->pushfunc = pad->chainfunc; + } else if (gst_pad_get_direction (pad) == GST_PAD_SRC) { + // we can just copy the get function, since it shares the prototype + DEBUG("copying get function into pull proxy for %s:%s\n", + GST_DEBUG_PAD_NAME(pad)); + pad->pullfunc = pad->getfunc; + } + } + } + + // if a loopfunc has been specified, create and set up a cothread + if (wrapper_function != NULL) { + if (element->threadstate == NULL) { + element->threadstate = cothread_create (bin->threadcontext); + DEBUG("created cothread %p (@%p) for \"%s\"\n",element->threadstate, + &element->threadstate,gst_element_get_name(element)); + } + cothread_setfunc (element->threadstate, wrapper_function, 0, (char **)element); + DEBUG("set wrapper function for \"%s\" to &%s\n",gst_element_get_name(element), + GST_DEBUG_FUNCPTR_NAME(wrapper_function)); + } + +// // HACK: if the element isn't decoupled, it's an entry +// if (!GST_FLAG_IS_SET(element,GST_ELEMENT_DECOUPLED)) +// bin->entries = g_list_append(bin->entries, element); + } + + // otherwise, cothreads are not needed + } else { + DEBUG("bin is chained, no cothreads needed\n"); + + elements = bin->managed_elements; + while (elements) { + element = GST_ELEMENT (elements->data); + elements = g_list_next (elements); + + pads = gst_element_get_pad_list (element); + while (pads) { + pad = GST_PAD (pads->data); + pads = g_list_next (pads); + + if (gst_pad_get_direction (pad) == GST_PAD_SINK) { + DEBUG("copying chain function into push proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + pad->pushfunc = pad->chainfunc; + } else { + DEBUG("copying get function into pull proxy for %s:%s\n",GST_DEBUG_PAD_NAME(pad)); + pad->pullfunc = pad->getfunc; + } + } + } + } +*/ + + diff --git a/gst/gstscheduler.h b/gst/gstscheduler.h new file mode 100644 index 0000000000..a57348f815 --- /dev/null +++ b/gst/gstscheduler.h @@ -0,0 +1,38 @@ +/* Gnome-Streamer + * Copyright (C) <1999> Erik Walthinsen + * + * This library is free software; you can redistribute it and/or + * modify it under the terms of the GNU Library General Public + * License as published by the Free Software Foundation; either + * version 2 of the License, or (at your option) any later version. + * + * This library is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * Library General Public License for more details. + * + * You should have received a copy of the GNU Library General Public + * License along with this library; if not, write to the + * Free Software Foundation, Inc., 59 Temple Place - Suite 330, + * Boston, MA 02111-1307, USA. + */ + + +#ifndef __GST_SCHEDULER_H__ +#define __GST_SCHEDULER_H__ + +#include + +#ifdef __cplusplus +extern "C" { +#endif /* __cplusplus */ + + +void gst_bin_schedule_func(GstBin *bin); + + +#ifdef __cplusplus +} +#endif /* __cplusplus */ + +#endif /* __GST_SCHEDULER_H__ */ diff --git a/gst/gstthread.c b/gst/gstthread.c index 1647800b00..73561d3fc8 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -116,6 +116,8 @@ gst_thread_class_init (GstThreadClass *klass) static void gst_thread_init (GstThread *thread) { + DEBUG("initializing thread '%s'\n",gst_element_get_name(GST_ELEMENT(thread))); + // we're a manager by default GST_FLAG_SET (thread, GST_BIN_FLAG_MANAGER); diff --git a/plugins/elements/gstidentity.c b/plugins/elements/gstidentity.c index 5622f31152..17e4f38a3b 100644 --- a/plugins/elements/gstidentity.c +++ b/plugins/elements/gstidentity.c @@ -134,6 +134,7 @@ gst_identity_loop (GstElement *element) do { buf = gst_pad_pull (identity->sinkpad); + g_print("(%s:%s)i ",GST_DEBUG_PAD_NAME(identity->sinkpad)); gst_pad_push (identity->srcpad, buf); diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index 8c2dcdbc9b..7a86bea0a1 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -49,6 +49,7 @@ enum { ARG_0, ARG_LEVEL, ARG_MAX_LEVEL, + ARG_BLOCK, }; @@ -106,6 +107,8 @@ gst_queue_class_init (GstQueueClass *klass) GTK_ARG_READABLE, ARG_LEVEL); gtk_object_add_arg_type ("GstQueue::max_level", GTK_TYPE_INT, GTK_ARG_READWRITE, ARG_MAX_LEVEL); + gtk_object_add_arg_type ("GstQueue::block", GTK_TYPE_BOOL, + GTK_ARG_READWRITE, ARG_BLOCK); gtkobject_class->set_arg = gst_queue_set_arg; gtkobject_class->get_arg = gst_queue_get_arg; @@ -116,7 +119,8 @@ gst_queue_class_init (GstQueueClass *klass) static void gst_queue_init (GstQueue *queue) { - GST_FLAG_SET (queue, GST_ELEMENT_SCHEDULE_PASSIVELY); + // scheduling on this kind of element is, well, interesting + GST_FLAG_SET (queue, GST_ELEMENT_DECOUPLED); queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK); gst_pad_set_chain_function (queue->sinkpad, GST_DEBUG_FUNCPTR(gst_queue_chain)); @@ -129,6 +133,7 @@ gst_queue_init (GstQueue *queue) queue->queue = NULL; queue->level_buffers = 0; queue->max_buffers = 20; + queue->block = TRUE; queue->level_bytes = 0; queue->size_buffers = 0; queue->size_bytes = 0; @@ -237,6 +242,12 @@ gst_queue_get (GstPad *pad) DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self (), queue->emptycond); DEBUG("queue: %s have queue lock\n", name); + // we bail if there's nothing there + if (!queue->level_buffers && !queue->block) { + GST_UNLOCK(queue); + return NULL; + } + while (!queue->level_buffers) { STATUS("queue: %s U released lock\n"); GST_UNLOCK (queue); @@ -312,6 +323,9 @@ gst_queue_set_arg (GtkObject *object, GtkArg *arg, guint id) case ARG_MAX_LEVEL: queue->max_buffers = GTK_VALUE_INT (*arg); break; + case ARG_BLOCK: + queue->block = GTK_VALUE_BOOL (*arg); + break; default: break; } @@ -334,6 +348,9 @@ gst_queue_get_arg (GtkObject *object, GtkArg *arg, guint id) case ARG_MAX_LEVEL: GTK_VALUE_INT (*arg) = queue->max_buffers; break; + case ARG_BLOCK: + GTK_VALUE_BOOL (*arg) = queue->block; + break; default: arg->type = GTK_TYPE_INVALID; break; diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index a06ff3080e..0274d890a7 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -59,6 +59,7 @@ struct _GstQueue { gint level_buffers; /* number of buffers queued here */ gint max_buffers; /* maximum number of buffers queued here */ + gboolean block; /* if set to FALSE, _get returns NULL if queue empty */ gint level_bytes; /* number of bytes queued here */ gint size_buffers; /* size of queue in buffers */ gint size_bytes; /* size of queue in bytes */