diff --git a/gst/cothreads.c b/gst/cothreads.c index 9ca1242d83..dd14b9d7b9 100644 --- a/gst/cothreads.c +++ b/gst/cothreads.c @@ -19,8 +19,7 @@ cothread_state *cothread_create(cothread_context *ctx) { cothread_state *s; DEBUG("cothread: pthread_self() %ld\n",pthread_self()); - //if (pthread_self() == 0) { - if (0) { + if (pthread_self() == 0) { s = (cothread_state *)malloc(sizeof(int) * COTHREAD_STACKSIZE); DEBUG("cothread: new stack at %p\n",s); } else { @@ -160,7 +159,6 @@ void cothread_switch(cothread_state *thread) { SETUP_STACK(thread->sp); SET_SP(thread->sp); // start it - //JUMP(cothread_stub); cothread_stub(); DEBUG("cothread: exit thread \n"); ctx->current = 0; diff --git a/gst/elements/gstqueue.c b/gst/elements/gstqueue.c index b9af36de44..1c01308747 100644 --- a/gst/elements/gstqueue.c +++ b/gst/elements/gstqueue.c @@ -106,13 +106,12 @@ static void gst_queue_class_init(GstQueueClass *klass) { static void gst_queue_init(GstQueue *queue) { queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK); gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad); - gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain); + queue->srcpad = gst_pad_new("src",GST_PAD_SRC); gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad); queue->queue = NULL; - queue->tail = NULL; queue->level_buffers = 0; queue->max_buffers = 10; queue->level_bytes = 0; @@ -151,15 +150,15 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { name = gst_element_get_name(GST_ELEMENT(queue)); /* we have to lock the queue since we span threads */ - - DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self()); + DEBUG("queue: try have queue lock\n"); + DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self()); GST_LOCK(queue); DEBUG("queue: have queue lock\n"); if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) { - g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name); - g_list_free(queue->queue); + g_slist_foreach(queue->queue, gst_queue_cleanup_buffers, name); + g_slist_free(queue->queue); queue->queue = NULL; queue->level_buffers = 0; } @@ -167,30 +166,20 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); - g_mutex_lock(queue->fulllock); while (queue->level_buffers >= queue->max_buffers) { DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); STATUS("%s: O\n"); GST_UNLOCK(queue); + g_mutex_lock(queue->fulllock); g_cond_wait(queue->fullcond,queue->fulllock); + g_mutex_unlock(queue->fulllock); GST_LOCK(queue); STATUS("%s: O+\n"); DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); } - g_mutex_unlock(queue->fulllock); - - /* put the buffer on the head of the list */ - /* if the queue is NULL, start a new list and make this the tail */ - if (!queue->queue) { - queue->queue = g_list_prepend(queue->queue,buf); -// queue->tail = queue->queue; - /* otherwise append to the end of the list */ - } else { -// queue->tail = g_list_append(queue->tail,buf); -// queue->tail = g_list_next(queue->tail); - queue->queue = g_list_append(queue->queue,buf); - } + /* put the buffer on the tail of the list */ + queue->queue = g_slist_append(queue->queue,buf); STATUS("%s: +\n"); /* if we were empty, but aren't any more, signal a condition */ @@ -202,27 +191,27 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { GST_UNLOCK(queue); if (tosignal) { - g_mutex_lock(queue->emptylock); STATUS("%s: >\n"); + g_mutex_lock(queue->emptylock); g_cond_signal(queue->emptycond); - STATUS("%s: >>\n"); g_mutex_unlock(queue->emptylock); + STATUS("%s: >>\n"); } } void gst_queue_push(GstConnection *connection) { GstQueue *queue = GST_QUEUE(connection); GstBuffer *buf = NULL; - GList *front; + GSList *front; gboolean tosignal = FALSE; guchar *name; name = gst_element_get_name(GST_ELEMENT(queue)); - DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond); /* have to lock for thread-safety */ DEBUG("queue: try have queue lock\n"); GST_LOCK(queue); + DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond); DEBUG("queue: have queue lock\n"); while (!queue->level_buffers) { @@ -237,8 +226,8 @@ void gst_queue_push(GstConnection *connection) { front = queue->queue; buf = (GstBuffer *)(front->data); - queue->queue = g_list_remove_link(queue->queue,front); - g_list_free(front); + queue->queue = g_slist_remove_link(queue->queue,front); + g_slist_free(front); queue->level_buffers--; STATUS("%s: -\n"); tosignal = queue->level_buffers < queue->max_buffers; @@ -252,9 +241,9 @@ void gst_queue_push(GstConnection *connection) { g_mutex_unlock(queue->fulllock); } - //DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf); + DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf); gst_pad_push(queue->srcpad,buf); - //DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers); + DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers); /* unlock now */ } diff --git a/gst/elements/gstqueue.h b/gst/elements/gstqueue.h index bb9c47bf9e..8e8a608330 100644 --- a/gst/elements/gstqueue.h +++ b/gst/elements/gstqueue.h @@ -55,8 +55,7 @@ struct _GstQueue { GstPad *srcpad; /* the queue of buffers we're keeping our grubby hands on */ - GList *queue; - GList *tail; /* have to keep track of this myself */ + GSList *queue; gint level_buffers; /* number of buffers queued here */ gint max_buffers; /* maximum number of buffers queued here */ diff --git a/gst/gstbin.c b/gst/gstbin.c index d10051c350..9c038868d4 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -299,7 +299,7 @@ gboolean gst_bin_set_state_type(GstBin *bin, if (oclass->change_state_type) (oclass->change_state_type)(bin,state,type); - return TRUE; + return TRUE; } void gst_bin_real_destroy(GtkObject *object) { @@ -424,7 +424,7 @@ static int gst_bin_loopfunc_wrapper(int argc,char *argv[]) { GList *pads; GstPad *pad; GstBuffer *buf; - gchar *name = gst_element_get_name(element); + G_GNUC_UNUSED gchar *name = gst_element_get_name(element); DEBUG("** gst_bin_loopfunc_wrapper(%d,\"%s\")\n", argc,gst_element_get_name(element)); @@ -502,11 +502,13 @@ static void gst_bin_create_plan_func(GstBin *bin) { if (element->loopfunc != NULL) { g_print("gstbin: loop based element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin))); bin->need_cothreads = TRUE; + break; } // if it's a complex element, use cothreads else if (GST_ELEMENT_IS_MULTI_IN(element)) { g_print("gstbin: complex element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin))); bin->need_cothreads = TRUE; + break; } // if it has more than one input pad, use cothreads sink_pads = 0; @@ -553,6 +555,11 @@ static void gst_bin_create_plan_func(GstBin *bin) { cothread_setfunc(element->threadstate,gst_bin_loopfunc_wrapper, 0,(char **)element); } + if (GST_IS_SRC(element)) { + g_print("gstbin: adding '%s' as entry point\n",gst_element_get_name(element)); + bin->entries = g_list_prepend(bin->entries,element); + bin->numentries++; + } pads = gst_element_get_pad_list(element); while (pads) { @@ -581,10 +588,12 @@ static void gst_bin_create_plan_func(GstBin *bin) { while (connection_pads) { opad = GST_PAD(connection_pads->data); if (gst_pad_get_direction(opad) == GST_PAD_SRC) { - g_print("gstbin: setting push&pull handlers for %s:%s SRC connection\n", - gst_element_get_name(outside),gst_pad_get_name(opad)); + g_print("gstbin: setting push&pull handlers for %s:%s SRC connection %p %p\n", + gst_element_get_name(outside),gst_pad_get_name(opad), opad, opad->pullfunc); + opad->pushfunc = gst_bin_pushfunc_wrapper; opad->pullfunc = gst_bin_pullfunc_wrapper; + if (outside->threadstate == NULL) { outside->threadstate = cothread_create(bin->threadcontext); cothread_setfunc(outside->threadstate,gst_bin_loopfunc_wrapper, @@ -623,9 +632,13 @@ static void gst_bin_create_plan_func(GstBin *bin) { pad = GST_PAD(pads->data); /* we only worry about sink pads */ if (gst_pad_get_direction(pad) == GST_PAD_SINK) { + g_print("gstbin: found SINK pad %s\n", gst_pad_get_name(pad)); /* get the pad's peer */ peer = gst_pad_get_peer(pad); - if (!peer) break; + if (!peer) { + g_print("gstbin: found SINK pad %s has no peer\n", gst_pad_get_name(pad)); + break; + } /* get the parent of the peer of the pad */ outside = GST_ELEMENT(gst_pad_get_parent(peer)); if (!outside) break; @@ -640,6 +653,9 @@ static void gst_bin_create_plan_func(GstBin *bin) { bin->numentries++; } } + else { + g_print("gstbin: found pad %s\n", gst_pad_get_name(pad)); + } pads = g_list_next(pads); } } @@ -668,7 +684,8 @@ void gst_bin_iterate_func(GstBin *bin) { cothread_switch(GST_ELEMENT(bin->children->data)->threadstate); } else { if (bin->numentries <= 0) { - printf("gstbin: no elements in bin \"%s\"\n", gst_element_get_name(GST_ELEMENT(bin))); + printf("gstbin: no entries in bin \"%s\" trying children...\n", gst_element_get_name(GST_ELEMENT(bin))); + // we will try loop over the elements then... entries = bin->children; } else { diff --git a/gst/gstpad.c b/gst/gstpad.c index ad3f5dd3f0..92661c85c5 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -201,7 +201,7 @@ void gst_pad_push(GstPad *pad,GstBuffer *buffer) { (pad->chainfunc)(pad->peer,buffer); // else we squawk } else { - //g_print("-- gst_pad_push(): houston, we have a problem, no way of talking to peer\n"); + g_print("-- gst_pad_push(): houston, we have a problem, no way of talking to peer\n"); } } @@ -340,7 +340,7 @@ void gst_pad_connect(GstPad *srcpad,GstPad *sinkpad) { /* now copy the chain pointer from sink to src */ srcpad->chainfunc = sinkpad->chainfunc; /* and the pull function */ - srcpad->pullfunc = sinkpad->pullfunc; + //srcpad->pullfunc = sinkpad->pullfunc; /* set the connected flag */ /* FIXME: set connected flag */ diff --git a/gst/gstthread.c b/gst/gstthread.c index b84533df3e..5ad60ea6af 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -55,6 +55,7 @@ static xmlNodePtr gst_thread_save_thyself(GstElement *element,xmlNodePtr parent) static void gst_thread_prepare(GstThread *thread); static void gst_thread_signal_thread(GstThread *thread); +static void gst_thread_create_plan_dummy(GstBin *bin); static GstBin *parent_class = NULL; @@ -85,10 +86,12 @@ gst_thread_class_init(GstThreadClass *klass) { GtkObjectClass *gtkobject_class; GstObjectClass *gstobject_class; GstElementClass *gstelement_class; + GstBinClass *gstbin_class; gtkobject_class = (GtkObjectClass*)klass; gstobject_class = (GstObjectClass*)klass; gstelement_class = (GstElementClass*)klass; + gstbin_class = (GstBinClass*)klass; parent_class = gtk_type_class(gst_bin_get_type()); @@ -98,6 +101,8 @@ gst_thread_class_init(GstThreadClass *klass) { gstelement_class->change_state = gst_thread_change_state; gstelement_class->save_thyself = gst_thread_save_thyself; + gstbin_class->create_plan = gst_thread_create_plan_dummy; + gtkobject_class->set_arg = gst_thread_set_arg; gtkobject_class->get_arg = gst_thread_get_arg; } @@ -109,6 +114,10 @@ static void gst_thread_init(GstThread *thread) { thread->cond = g_cond_new(); } +static void gst_thread_create_plan_dummy(GstBin *bin) { + gst_info("gstthread: create plan delayed until thread starts\n"); +} + static void gst_thread_set_arg(GtkObject *object,GtkArg *arg,guint id) { /* it's not null if we got it, but it might not be ours */ g_return_if_fail(GST_IS_THREAD(object)); @@ -247,6 +256,9 @@ void *gst_thread_main_loop(void *arg) { gst_info("gstthread: thread \"%s\" is running with PID %d\n", gst_element_get_name(GST_ELEMENT(thread)), getpid()); + if (GST_BIN_CLASS(parent_class)->create_plan) + GST_BIN_CLASS(parent_class)->create_plan(GST_BIN(thread)); + while(!GST_FLAG_IS_SET(thread,GST_THREAD_STATE_REAPING)) { if (GST_FLAG_IS_SET(thread,GST_THREAD_STATE_SPINNING)) gst_bin_iterate(GST_BIN(thread)); diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index b9af36de44..1c01308747 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -106,13 +106,12 @@ static void gst_queue_class_init(GstQueueClass *klass) { static void gst_queue_init(GstQueue *queue) { queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK); gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad); - gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain); + queue->srcpad = gst_pad_new("src",GST_PAD_SRC); gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad); queue->queue = NULL; - queue->tail = NULL; queue->level_buffers = 0; queue->max_buffers = 10; queue->level_bytes = 0; @@ -151,15 +150,15 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { name = gst_element_get_name(GST_ELEMENT(queue)); /* we have to lock the queue since we span threads */ - - DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self()); + DEBUG("queue: try have queue lock\n"); + DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self()); GST_LOCK(queue); DEBUG("queue: have queue lock\n"); if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) { - g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name); - g_list_free(queue->queue); + g_slist_foreach(queue->queue, gst_queue_cleanup_buffers, name); + g_slist_free(queue->queue); queue->queue = NULL; queue->level_buffers = 0; } @@ -167,30 +166,20 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); - g_mutex_lock(queue->fulllock); while (queue->level_buffers >= queue->max_buffers) { DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); STATUS("%s: O\n"); GST_UNLOCK(queue); + g_mutex_lock(queue->fulllock); g_cond_wait(queue->fullcond,queue->fulllock); + g_mutex_unlock(queue->fulllock); GST_LOCK(queue); STATUS("%s: O+\n"); DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); } - g_mutex_unlock(queue->fulllock); - - /* put the buffer on the head of the list */ - /* if the queue is NULL, start a new list and make this the tail */ - if (!queue->queue) { - queue->queue = g_list_prepend(queue->queue,buf); -// queue->tail = queue->queue; - /* otherwise append to the end of the list */ - } else { -// queue->tail = g_list_append(queue->tail,buf); -// queue->tail = g_list_next(queue->tail); - queue->queue = g_list_append(queue->queue,buf); - } + /* put the buffer on the tail of the list */ + queue->queue = g_slist_append(queue->queue,buf); STATUS("%s: +\n"); /* if we were empty, but aren't any more, signal a condition */ @@ -202,27 +191,27 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { GST_UNLOCK(queue); if (tosignal) { - g_mutex_lock(queue->emptylock); STATUS("%s: >\n"); + g_mutex_lock(queue->emptylock); g_cond_signal(queue->emptycond); - STATUS("%s: >>\n"); g_mutex_unlock(queue->emptylock); + STATUS("%s: >>\n"); } } void gst_queue_push(GstConnection *connection) { GstQueue *queue = GST_QUEUE(connection); GstBuffer *buf = NULL; - GList *front; + GSList *front; gboolean tosignal = FALSE; guchar *name; name = gst_element_get_name(GST_ELEMENT(queue)); - DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond); /* have to lock for thread-safety */ DEBUG("queue: try have queue lock\n"); GST_LOCK(queue); + DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond); DEBUG("queue: have queue lock\n"); while (!queue->level_buffers) { @@ -237,8 +226,8 @@ void gst_queue_push(GstConnection *connection) { front = queue->queue; buf = (GstBuffer *)(front->data); - queue->queue = g_list_remove_link(queue->queue,front); - g_list_free(front); + queue->queue = g_slist_remove_link(queue->queue,front); + g_slist_free(front); queue->level_buffers--; STATUS("%s: -\n"); tosignal = queue->level_buffers < queue->max_buffers; @@ -252,9 +241,9 @@ void gst_queue_push(GstConnection *connection) { g_mutex_unlock(queue->fulllock); } - //DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf); + DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf); gst_pad_push(queue->srcpad,buf); - //DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers); + DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers); /* unlock now */ } diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index bb9c47bf9e..8e8a608330 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -55,8 +55,7 @@ struct _GstQueue { GstPad *srcpad; /* the queue of buffers we're keeping our grubby hands on */ - GList *queue; - GList *tail; /* have to keep track of this myself */ + GSList *queue; gint level_buffers; /* number of buffers queued here */ gint max_buffers; /* maximum number of buffers queued here */