Implemented the hybrid scheduling system for sources and connections outside the current Bin. Is a bit hackish in on...

Original commit message from CVS:
Implemented the hybrid scheduling system for sources and connections
outside the current Bin.  Is a bit hackish in one place, but I'll work out
a way to make that cleaner soon.  queue.c in tests now works nicely in all
cases.  More to come later.
This commit is contained in:
Erik Walthinsen 2000-12-11 00:24:32 +00:00
parent ec1548618e
commit b063cb96f1
6 changed files with 65 additions and 121 deletions

View file

@ -117,8 +117,8 @@ static void
gst_queue_init (GstQueue *queue)
{
queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_pad_set_pull_function (queue->srcpad, gst_queue_pull);

View file

@ -690,72 +690,8 @@ gst_bin_pushfunc_proxy (GstPad *pad)
cothread_switch (pad->threadstate);
}
/* Creates the plan for a src or connection outside the bin. */
static _GstBinOutsideSchedule *
gst_bin_create_plan_outside (GstBin *bin, GstElement *element)
{
GList *pads;
GstPad *pad;
gboolean dedicated = TRUE;
cothread_state *threadstate;
_GstBinOutsideSchedule *sched = NULL;
// walk through all the pads, find out of this is hard or not
pads = gst_element_get_pad_list (element);
while (pads) {
pad = GST_PAD (pads->data);
// if the pad's peer's parent isn't the Bin, it's hard
// FIXME gst_pad_get_parent should return a GstElement
if (gst_pad_get_parent (pad->peer) != GST_OBJECT (bin))
dedicated = FALSE;
pads = g_list_next (pads);
}
// if the element is wholely in the Bin, create a plugin context
if (dedicated == TRUE) {
if (element->threadstate == NULL) {
element->threadstate = cothread_create (bin->threadcontext);
DEBUG("created element threadstate %p for \"%s\"\n",element->threadstate,
gst_element_get_name(element));
}
// set the cothread loopfunc
if (GST_IS_SRC(element))
cothread_setfunc (element->threadstate, gst_bin_src_wrapper,
0, (char **)element);
else if (GST_IS_CONNECTION(element))
cothread_setfunc (element->threadstate, gst_bin_connection_wrapper,
0, (char **)element);
}
// otherwise, we have some work to do
else {
// create a cothread state
threadstate = cothread_create (bin->threadcontext);
// construct an outside schedule struct
sched = g_new0(_GstBinOutsideSchedule,1);
sched->element = element;
sched->bin = bin;
sched->threadstate = threadstate;
// loop through the pads again looking for candidates
pads = gst_element_get_pad_list (element);
while (pads) {
pad = GST_PAD (pads->data);
// if the pad's peer's parent is this bin, it's a candidate
// FIXME gst_pad_get_parent should return a GstElement
if (gst_pad_get_parent (pad->peer) == GST_OBJECT(bin))
sched->padlist = g_slist_prepend(sched->padlist,pad);
pads = g_list_next (pads);
}
// add this schedule to the list of outside schedules
bin->outside_schedules = g_list_prepend(bin->outside_schedules,sched);
// set the cothread loopfunc
cothread_setfunc (sched->threadstate, gst_bin_sched_wrapper,
0, (char **)sched);
}
return sched;
static void
gst_bin_pushfunc_fake_proxy (GstPad *pad) {
}
static void
@ -771,24 +707,23 @@ gst_bin_create_plan_func (GstBin *bin)
DEBUG_SET_STRING("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
DEBUG_ENTER_STRING;
// g_print("gstbin: creating plan for bin \"%s\"\n", gst_element_get_name (GST_ELEMENT (bin)));
// first loop through all children to see if we need cothreads
// we break immediately when we find we need to, why keep searching?
elements = bin->children;
while (elements) {
element = GST_ELEMENT (elements->data);
DEBUG("found element \"%s\" in bin \"%s\"\n",
gst_element_get_name (element),
gst_element_get_name (GST_ELEMENT (bin)));
// if it's a loop-based element, use cothreads
if (element->loopfunc != NULL) {
DEBUG("loop based element \"%s\" in bin \"%s\"\n",
gst_element_get_name (element),
gst_element_get_name (GST_ELEMENT (bin)));
bin->need_cothreads = TRUE;
DEBUG("NEED COTHREADS, it's \"%s\"'s fault\n",gst_element_get_name(element));
break;
@ -816,7 +751,7 @@ gst_bin_create_plan_func (GstBin *bin)
DEBUG("more than 1 sinkpad for element \"%s\" in bin \"%s\"\n",
gst_element_get_name (element),
gst_element_get_name (GST_ELEMENT (bin)));
bin->need_cothreads = TRUE;
DEBUG("NEED COTHREADS, it's \"%s\"'s fault\n",gst_element_get_name(element));
break;
@ -868,52 +803,51 @@ gst_bin_create_plan_func (GstBin *bin)
pads = gst_element_get_pad_list (element);
while (pads) {
pad = GST_PAD(pads->data);
if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
DEBUG("checking/setting push proxy for srcpad %s:%s\n",
GST_DEBUG_PAD_NAME(pad));
// set the proxy functions
if (!pad->pushfunc)
pad->pushfunc = gst_bin_pushfunc_proxy;
} else if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
DEBUG("checking/setting pull proxies for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
// set the proxy functions
if (!pad->pullfunc)
pad->pullfunc = gst_bin_pullfunc_proxy;
if (!pad->pullregionfunc)
pad->pullregionfunc = gst_bin_pullregionfunc_proxy;
// ***** check for possible connections outside
// get the pad's peer
peer = gst_pad_get_peer (pad);
// FIXME this should be an error condition, if not disabled
if (!peer) break;
// get the parent of the peer of the pad
outside = GST_ELEMENT (gst_pad_get_parent (peer));
// FIXME this should *really* be an error condition
if (!outside) break;
// if it's a source or connection and it's not ours...
if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) &&
(gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
DEBUG("PUNT: copying pullfunc ptr from %s:%s to %s:%s (@ %p)\n",
GST_DEBUG_PAD_NAME(pad->peer),GST_DEBUG_PAD_NAME(pad),&pad->pullfunc);
pad->pullfunc = pad->peer->pullfunc;
DEBUG("PUNT: setting pushfunc proxy to fake proxy on %s:%s\n",GST_DEBUG_PAD_NAME(pad->peer));
pad->peer->pushfunc = gst_bin_pushfunc_fake_proxy;
}
} else {
if (gst_pad_get_direction (pad) == GST_PAD_SRC) {
DEBUG("checking/setting push proxy for srcpad %s:%s\n",
GST_DEBUG_PAD_NAME(pad));
// set the proxy functions
if (!pad->pushfunc)
pad->pushfunc = gst_bin_pushfunc_proxy;
// ***** check for possible connections outside
// get the pad's peer
peer = gst_pad_get_peer (pad);
// FIXME this should be an error condition, if not disabled
if (!peer) break;
// get the parent of the peer of the pad
outside = GST_ELEMENT (gst_pad_get_parent (peer));
// FIXME this should *really* be an error condition
if (!outside) break;
/* if it's a source or connection and it's not ours... */
if ((GST_IS_SRC (outside) || GST_IS_CONNECTION (outside)) &&
(gst_object_get_parent (GST_OBJECT (outside)) != GST_OBJECT (bin))) {
DEBUG("element '%s' outside bin is an entry\n",gst_element_get_name(outside));
/*
bin->entries = g_list_prepend (bin->entries,outside);
bin->numentries++;
if (outside->threadstate == NULL) {
outside->threadstate = cothread_create (bin->threadcontext);
DEBUG("created element threadstate %p for \"%s\"\n",outside->threadstate,
gst_element_get_name(outside));
}
cothread_setfunc(outside->threadstate,gst_bin_connection_wrapper,0,(char **)outside);
*/
bin->entries = g_list_prepend (bin->entries, gst_bin_create_plan_outside(bin,outside));
}
}
} else if (gst_pad_get_direction (pad) == GST_PAD_SINK) {
DEBUG("checking/setting pull proxies for sinkpad %s:%s\n",GST_DEBUG_PAD_NAME(pad));
// set the proxy functions
if (!pad->pullfunc)
pad->pullfunc = gst_bin_pullfunc_proxy;
if (!pad->pullregionfunc)
pad->pullregionfunc = gst_bin_pullregionfunc_proxy;
}
}
pads = g_list_next (pads);
}
elements = g_list_next (elements);
}
// if there are no entries, we have to pick one at random
if (bin->numentries == 0)
bin->entries = g_list_prepend (bin->entries, GST_ELEMENT(bin->children->data));
}
} else {
g_print("gstbin: don't need cothreads, looking for entry points\n");
// we have to find which elements will drive an iteration
@ -965,6 +899,8 @@ gst_bin_create_plan_func (GstBin *bin)
elements = g_list_next (elements);
}
}
DEBUG_LEAVE("(\"%s\")",gst_element_get_name(GST_ELEMENT(bin)));
}
void

View file

@ -103,7 +103,8 @@ G_GNUC_UNUSED static GModule *_debug_self_module = NULL;
#define DEBUG_ENTER_STRING DEBUG_ENTER("%s",_debug_string)
#define DEBUG_LEAVE(format, args...) \
if (_debug_string != NULL) g_free(_debug_string),\
fprintf(stderr,GST_DEBUG_PREFIX(format": leaving\n" , ## args ))
fprintf(stderr,GST_DEBUG_PREFIX(format": leaving\n" , ## args ))
#define DEBUG_LEAVE_STRING DEBUG_LEAVE("%s",_debug_string)
#else
#define DEBUG(format, args...)
#define DEBUG_ENTER(format, args...)

View file

@ -144,6 +144,8 @@ gst_element_add_pad (GstElement *element, GstPad *pad)
g_return_if_fail (GST_IS_PAD (pad));
/* set the pad's parent */
DEBUG("setting parent of pad '%s'(%p) to '%s'(%p)\n",
gst_pad_get_name(pad),pad,gst_element_get_name(element),element);
gst_pad_set_parent (pad,GST_OBJECT (element));
/* add it to the list */

View file

@ -318,7 +318,7 @@ gst_pad_push (GstPad *pad,
{
GstPad *peer;
DEBUG_ENTER("(pad:'%s',buffer:%p)",gst_pad_get_name(pad),buffer);
DEBUG_ENTER("(pad:'%s'(%p),buffer:%p)",gst_pad_get_name(pad),pad,buffer);
g_return_if_fail(pad != NULL);
g_return_if_fail(GST_IS_PAD(pad));
@ -339,12 +339,14 @@ gst_pad_push (GstPad *pad,
// first check to see if there's a push handler
if (pad->pushfunc != NULL) {
DEBUG("putting the buffer in the pen and calling pushfunc\n");
// put the buffer in peer's holding pen
peer->bufpen = buffer;
// now inform the handler that the peer pad has something
(pad->pushfunc)(peer);
// otherwise we assume we're chaining directly
} else if (peer->chainfunc != NULL) {
DEBUG("calling chain function\n");
//g_print("-- gst_pad_push(): calling chain handler\n");
(peer->chainfunc)(peer,buffer);
// else we squawk
@ -366,23 +368,26 @@ gst_pad_pull (GstPad *pad)
{
GstBuffer *buf;
DEBUG_ENTER("(%s:%s)",GST_DEBUG_PAD_NAME(pad));
g_return_val_if_fail(pad != NULL, NULL);
g_return_val_if_fail(GST_IS_PAD(pad), NULL);
/* check to see if the peer pad is disabled. return NULL if it is */
/* FIXME: this may be the wrong way to go about it */
if (GST_FLAG_IS_SET(pad->peer,GST_PAD_DISABLED)) {
g_print("gst_pad_pull: pad disabled, returning NULL\n");
DEBUG("pad disabled, returning NULL\n");
return NULL;
}
// if no buffer in pen and there's a pull handler, fire it
if (pad->bufpen == NULL) {
if (pad->pullfunc != NULL) {
DEBUG("calling pullfunc to fill buffer pen\n");
(pad->pullfunc)(pad->peer);
} else {
g_print("-- gst_pad_pull(%s:%s): no buffer in pen, and no handler to get one there!!!\n",
GST_ELEMENT(pad->parent)->name, pad->name);
DEBUG("no buffer in pen, and no handler (# %p) to get one there!!!\n",&pad->pullfunc);
g_return_if_fail(pad->pullfunc != NULL);
}
}
@ -393,8 +398,8 @@ gst_pad_pull (GstPad *pad)
return buf;
// else we have a big problem...
} else {
g_print("-- gst_pad_pull(%s:%s): no buffer in pen, and no handler\n",
GST_ELEMENT(pad->parent)->name, pad->peer->name);
DEBUG("no buffer in pen, and no handler\n");
g_return_if_fail(pad->pullfunc != NULL);
return NULL;
}

View file

@ -117,8 +117,8 @@ static void
gst_queue_init (GstQueue *queue)
{
queue->sinkpad = gst_pad_new ("sink", GST_PAD_SINK);
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
gst_pad_set_chain_function (queue->sinkpad, gst_queue_chain);
gst_element_add_pad (GST_ELEMENT (queue), queue->sinkpad);
queue->srcpad = gst_pad_new ("src", GST_PAD_SRC);
gst_pad_set_pull_function (queue->srcpad, gst_queue_pull);