From d00af076607211e718ce1db2336b8d0ae1bb2435 Mon Sep 17 00:00:00 2001 From: Wim Taymans Date: Sun, 24 Sep 2000 14:29:49 +0000 Subject: [PATCH] The cothreads were not initialized in its thread context resulting in severe stack corruption. This was very hard to ... Original commit message from CVS: The cothreads were not initialized in its thread context resulting in severe stack corruption. This was very hard to track down. We should be able now to modify some plugins to a loop based setup so that we can get rid of the mp3parse and mp1videoparse elements. Modified the GList to a GSList in the queue. --- gst/cothreads.c | 4 +--- gst/elements/gstqueue.c | 45 ++++++++++++++----------------------- gst/elements/gstqueue.h | 3 +-- gst/gstbin.c | 29 +++++++++++++++++++----- gst/gstpad.c | 4 ++-- gst/gstthread.c | 12 ++++++++++ plugins/elements/gstqueue.c | 45 ++++++++++++++----------------------- plugins/elements/gstqueue.h | 3 +-- 8 files changed, 74 insertions(+), 71 deletions(-) diff --git a/gst/cothreads.c b/gst/cothreads.c index 9ca1242d83..dd14b9d7b9 100644 --- a/gst/cothreads.c +++ b/gst/cothreads.c @@ -19,8 +19,7 @@ cothread_state *cothread_create(cothread_context *ctx) { cothread_state *s; DEBUG("cothread: pthread_self() %ld\n",pthread_self()); - //if (pthread_self() == 0) { - if (0) { + if (pthread_self() == 0) { s = (cothread_state *)malloc(sizeof(int) * COTHREAD_STACKSIZE); DEBUG("cothread: new stack at %p\n",s); } else { @@ -160,7 +159,6 @@ void cothread_switch(cothread_state *thread) { SETUP_STACK(thread->sp); SET_SP(thread->sp); // start it - //JUMP(cothread_stub); cothread_stub(); DEBUG("cothread: exit thread \n"); ctx->current = 0; diff --git a/gst/elements/gstqueue.c b/gst/elements/gstqueue.c index b9af36de44..1c01308747 100644 --- a/gst/elements/gstqueue.c +++ b/gst/elements/gstqueue.c @@ -106,13 +106,12 @@ static void gst_queue_class_init(GstQueueClass *klass) { static void gst_queue_init(GstQueue *queue) { queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK); gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad); - gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain); + queue->srcpad = gst_pad_new("src",GST_PAD_SRC); gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad); queue->queue = NULL; - queue->tail = NULL; queue->level_buffers = 0; queue->max_buffers = 10; queue->level_bytes = 0; @@ -151,15 +150,15 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { name = gst_element_get_name(GST_ELEMENT(queue)); /* we have to lock the queue since we span threads */ - - DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self()); + DEBUG("queue: try have queue lock\n"); + DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self()); GST_LOCK(queue); DEBUG("queue: have queue lock\n"); if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) { - g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name); - g_list_free(queue->queue); + g_slist_foreach(queue->queue, gst_queue_cleanup_buffers, name); + g_slist_free(queue->queue); queue->queue = NULL; queue->level_buffers = 0; } @@ -167,30 +166,20 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); - g_mutex_lock(queue->fulllock); while (queue->level_buffers >= queue->max_buffers) { DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); STATUS("%s: O\n"); GST_UNLOCK(queue); + g_mutex_lock(queue->fulllock); g_cond_wait(queue->fullcond,queue->fulllock); + g_mutex_unlock(queue->fulllock); GST_LOCK(queue); STATUS("%s: O+\n"); DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); } - g_mutex_unlock(queue->fulllock); - - /* put the buffer on the head of the list */ - /* if the queue is NULL, start a new list and make this the tail */ - if (!queue->queue) { - queue->queue = g_list_prepend(queue->queue,buf); -// queue->tail = queue->queue; - /* otherwise append to the end of the list */ - } else { -// queue->tail = g_list_append(queue->tail,buf); -// queue->tail = g_list_next(queue->tail); - queue->queue = g_list_append(queue->queue,buf); - } + /* put the buffer on the tail of the list */ + queue->queue = g_slist_append(queue->queue,buf); STATUS("%s: +\n"); /* if we were empty, but aren't any more, signal a condition */ @@ -202,27 +191,27 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { GST_UNLOCK(queue); if (tosignal) { - g_mutex_lock(queue->emptylock); STATUS("%s: >\n"); + g_mutex_lock(queue->emptylock); g_cond_signal(queue->emptycond); - STATUS("%s: >>\n"); g_mutex_unlock(queue->emptylock); + STATUS("%s: >>\n"); } } void gst_queue_push(GstConnection *connection) { GstQueue *queue = GST_QUEUE(connection); GstBuffer *buf = NULL; - GList *front; + GSList *front; gboolean tosignal = FALSE; guchar *name; name = gst_element_get_name(GST_ELEMENT(queue)); - DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond); /* have to lock for thread-safety */ DEBUG("queue: try have queue lock\n"); GST_LOCK(queue); + DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond); DEBUG("queue: have queue lock\n"); while (!queue->level_buffers) { @@ -237,8 +226,8 @@ void gst_queue_push(GstConnection *connection) { front = queue->queue; buf = (GstBuffer *)(front->data); - queue->queue = g_list_remove_link(queue->queue,front); - g_list_free(front); + queue->queue = g_slist_remove_link(queue->queue,front); + g_slist_free(front); queue->level_buffers--; STATUS("%s: -\n"); tosignal = queue->level_buffers < queue->max_buffers; @@ -252,9 +241,9 @@ void gst_queue_push(GstConnection *connection) { g_mutex_unlock(queue->fulllock); } - //DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf); + DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf); gst_pad_push(queue->srcpad,buf); - //DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers); + DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers); /* unlock now */ } diff --git a/gst/elements/gstqueue.h b/gst/elements/gstqueue.h index bb9c47bf9e..8e8a608330 100644 --- a/gst/elements/gstqueue.h +++ b/gst/elements/gstqueue.h @@ -55,8 +55,7 @@ struct _GstQueue { GstPad *srcpad; /* the queue of buffers we're keeping our grubby hands on */ - GList *queue; - GList *tail; /* have to keep track of this myself */ + GSList *queue; gint level_buffers; /* number of buffers queued here */ gint max_buffers; /* maximum number of buffers queued here */ diff --git a/gst/gstbin.c b/gst/gstbin.c index d10051c350..9c038868d4 100644 --- a/gst/gstbin.c +++ b/gst/gstbin.c @@ -299,7 +299,7 @@ gboolean gst_bin_set_state_type(GstBin *bin, if (oclass->change_state_type) (oclass->change_state_type)(bin,state,type); - return TRUE; + return TRUE; } void gst_bin_real_destroy(GtkObject *object) { @@ -424,7 +424,7 @@ static int gst_bin_loopfunc_wrapper(int argc,char *argv[]) { GList *pads; GstPad *pad; GstBuffer *buf; - gchar *name = gst_element_get_name(element); + G_GNUC_UNUSED gchar *name = gst_element_get_name(element); DEBUG("** gst_bin_loopfunc_wrapper(%d,\"%s\")\n", argc,gst_element_get_name(element)); @@ -502,11 +502,13 @@ static void gst_bin_create_plan_func(GstBin *bin) { if (element->loopfunc != NULL) { g_print("gstbin: loop based element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin))); bin->need_cothreads = TRUE; + break; } // if it's a complex element, use cothreads else if (GST_ELEMENT_IS_MULTI_IN(element)) { g_print("gstbin: complex element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin))); bin->need_cothreads = TRUE; + break; } // if it has more than one input pad, use cothreads sink_pads = 0; @@ -553,6 +555,11 @@ static void gst_bin_create_plan_func(GstBin *bin) { cothread_setfunc(element->threadstate,gst_bin_loopfunc_wrapper, 0,(char **)element); } + if (GST_IS_SRC(element)) { + g_print("gstbin: adding '%s' as entry point\n",gst_element_get_name(element)); + bin->entries = g_list_prepend(bin->entries,element); + bin->numentries++; + } pads = gst_element_get_pad_list(element); while (pads) { @@ -581,10 +588,12 @@ static void gst_bin_create_plan_func(GstBin *bin) { while (connection_pads) { opad = GST_PAD(connection_pads->data); if (gst_pad_get_direction(opad) == GST_PAD_SRC) { - g_print("gstbin: setting push&pull handlers for %s:%s SRC connection\n", - gst_element_get_name(outside),gst_pad_get_name(opad)); + g_print("gstbin: setting push&pull handlers for %s:%s SRC connection %p %p\n", + gst_element_get_name(outside),gst_pad_get_name(opad), opad, opad->pullfunc); + opad->pushfunc = gst_bin_pushfunc_wrapper; opad->pullfunc = gst_bin_pullfunc_wrapper; + if (outside->threadstate == NULL) { outside->threadstate = cothread_create(bin->threadcontext); cothread_setfunc(outside->threadstate,gst_bin_loopfunc_wrapper, @@ -623,9 +632,13 @@ static void gst_bin_create_plan_func(GstBin *bin) { pad = GST_PAD(pads->data); /* we only worry about sink pads */ if (gst_pad_get_direction(pad) == GST_PAD_SINK) { + g_print("gstbin: found SINK pad %s\n", gst_pad_get_name(pad)); /* get the pad's peer */ peer = gst_pad_get_peer(pad); - if (!peer) break; + if (!peer) { + g_print("gstbin: found SINK pad %s has no peer\n", gst_pad_get_name(pad)); + break; + } /* get the parent of the peer of the pad */ outside = GST_ELEMENT(gst_pad_get_parent(peer)); if (!outside) break; @@ -640,6 +653,9 @@ static void gst_bin_create_plan_func(GstBin *bin) { bin->numentries++; } } + else { + g_print("gstbin: found pad %s\n", gst_pad_get_name(pad)); + } pads = g_list_next(pads); } } @@ -668,7 +684,8 @@ void gst_bin_iterate_func(GstBin *bin) { cothread_switch(GST_ELEMENT(bin->children->data)->threadstate); } else { if (bin->numentries <= 0) { - printf("gstbin: no elements in bin \"%s\"\n", gst_element_get_name(GST_ELEMENT(bin))); + printf("gstbin: no entries in bin \"%s\" trying children...\n", gst_element_get_name(GST_ELEMENT(bin))); + // we will try loop over the elements then... entries = bin->children; } else { diff --git a/gst/gstpad.c b/gst/gstpad.c index ad3f5dd3f0..92661c85c5 100644 --- a/gst/gstpad.c +++ b/gst/gstpad.c @@ -201,7 +201,7 @@ void gst_pad_push(GstPad *pad,GstBuffer *buffer) { (pad->chainfunc)(pad->peer,buffer); // else we squawk } else { - //g_print("-- gst_pad_push(): houston, we have a problem, no way of talking to peer\n"); + g_print("-- gst_pad_push(): houston, we have a problem, no way of talking to peer\n"); } } @@ -340,7 +340,7 @@ void gst_pad_connect(GstPad *srcpad,GstPad *sinkpad) { /* now copy the chain pointer from sink to src */ srcpad->chainfunc = sinkpad->chainfunc; /* and the pull function */ - srcpad->pullfunc = sinkpad->pullfunc; + //srcpad->pullfunc = sinkpad->pullfunc; /* set the connected flag */ /* FIXME: set connected flag */ diff --git a/gst/gstthread.c b/gst/gstthread.c index b84533df3e..5ad60ea6af 100644 --- a/gst/gstthread.c +++ b/gst/gstthread.c @@ -55,6 +55,7 @@ static xmlNodePtr gst_thread_save_thyself(GstElement *element,xmlNodePtr parent) static void gst_thread_prepare(GstThread *thread); static void gst_thread_signal_thread(GstThread *thread); +static void gst_thread_create_plan_dummy(GstBin *bin); static GstBin *parent_class = NULL; @@ -85,10 +86,12 @@ gst_thread_class_init(GstThreadClass *klass) { GtkObjectClass *gtkobject_class; GstObjectClass *gstobject_class; GstElementClass *gstelement_class; + GstBinClass *gstbin_class; gtkobject_class = (GtkObjectClass*)klass; gstobject_class = (GstObjectClass*)klass; gstelement_class = (GstElementClass*)klass; + gstbin_class = (GstBinClass*)klass; parent_class = gtk_type_class(gst_bin_get_type()); @@ -98,6 +101,8 @@ gst_thread_class_init(GstThreadClass *klass) { gstelement_class->change_state = gst_thread_change_state; gstelement_class->save_thyself = gst_thread_save_thyself; + gstbin_class->create_plan = gst_thread_create_plan_dummy; + gtkobject_class->set_arg = gst_thread_set_arg; gtkobject_class->get_arg = gst_thread_get_arg; } @@ -109,6 +114,10 @@ static void gst_thread_init(GstThread *thread) { thread->cond = g_cond_new(); } +static void gst_thread_create_plan_dummy(GstBin *bin) { + gst_info("gstthread: create plan delayed until thread starts\n"); +} + static void gst_thread_set_arg(GtkObject *object,GtkArg *arg,guint id) { /* it's not null if we got it, but it might not be ours */ g_return_if_fail(GST_IS_THREAD(object)); @@ -247,6 +256,9 @@ void *gst_thread_main_loop(void *arg) { gst_info("gstthread: thread \"%s\" is running with PID %d\n", gst_element_get_name(GST_ELEMENT(thread)), getpid()); + if (GST_BIN_CLASS(parent_class)->create_plan) + GST_BIN_CLASS(parent_class)->create_plan(GST_BIN(thread)); + while(!GST_FLAG_IS_SET(thread,GST_THREAD_STATE_REAPING)) { if (GST_FLAG_IS_SET(thread,GST_THREAD_STATE_SPINNING)) gst_bin_iterate(GST_BIN(thread)); diff --git a/plugins/elements/gstqueue.c b/plugins/elements/gstqueue.c index b9af36de44..1c01308747 100644 --- a/plugins/elements/gstqueue.c +++ b/plugins/elements/gstqueue.c @@ -106,13 +106,12 @@ static void gst_queue_class_init(GstQueueClass *klass) { static void gst_queue_init(GstQueue *queue) { queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK); gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad); - gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain); + queue->srcpad = gst_pad_new("src",GST_PAD_SRC); gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad); queue->queue = NULL; - queue->tail = NULL; queue->level_buffers = 0; queue->max_buffers = 10; queue->level_bytes = 0; @@ -151,15 +150,15 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { name = gst_element_get_name(GST_ELEMENT(queue)); /* we have to lock the queue since we span threads */ - - DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self()); + DEBUG("queue: try have queue lock\n"); + DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self()); GST_LOCK(queue); DEBUG("queue: have queue lock\n"); if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) { - g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name); - g_list_free(queue->queue); + g_slist_foreach(queue->queue, gst_queue_cleanup_buffers, name); + g_slist_free(queue->queue); queue->queue = NULL; queue->level_buffers = 0; } @@ -167,30 +166,20 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf); - g_mutex_lock(queue->fulllock); while (queue->level_buffers >= queue->max_buffers) { DEBUG("queue: %s waiting %d\n", name, queue->level_buffers); STATUS("%s: O\n"); GST_UNLOCK(queue); + g_mutex_lock(queue->fulllock); g_cond_wait(queue->fullcond,queue->fulllock); + g_mutex_unlock(queue->fulllock); GST_LOCK(queue); STATUS("%s: O+\n"); DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers); } - g_mutex_unlock(queue->fulllock); - - /* put the buffer on the head of the list */ - /* if the queue is NULL, start a new list and make this the tail */ - if (!queue->queue) { - queue->queue = g_list_prepend(queue->queue,buf); -// queue->tail = queue->queue; - /* otherwise append to the end of the list */ - } else { -// queue->tail = g_list_append(queue->tail,buf); -// queue->tail = g_list_next(queue->tail); - queue->queue = g_list_append(queue->queue,buf); - } + /* put the buffer on the tail of the list */ + queue->queue = g_slist_append(queue->queue,buf); STATUS("%s: +\n"); /* if we were empty, but aren't any more, signal a condition */ @@ -202,27 +191,27 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) { GST_UNLOCK(queue); if (tosignal) { - g_mutex_lock(queue->emptylock); STATUS("%s: >\n"); + g_mutex_lock(queue->emptylock); g_cond_signal(queue->emptycond); - STATUS("%s: >>\n"); g_mutex_unlock(queue->emptylock); + STATUS("%s: >>\n"); } } void gst_queue_push(GstConnection *connection) { GstQueue *queue = GST_QUEUE(connection); GstBuffer *buf = NULL; - GList *front; + GSList *front; gboolean tosignal = FALSE; guchar *name; name = gst_element_get_name(GST_ELEMENT(queue)); - DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond); /* have to lock for thread-safety */ DEBUG("queue: try have queue lock\n"); GST_LOCK(queue); + DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond); DEBUG("queue: have queue lock\n"); while (!queue->level_buffers) { @@ -237,8 +226,8 @@ void gst_queue_push(GstConnection *connection) { front = queue->queue; buf = (GstBuffer *)(front->data); - queue->queue = g_list_remove_link(queue->queue,front); - g_list_free(front); + queue->queue = g_slist_remove_link(queue->queue,front); + g_slist_free(front); queue->level_buffers--; STATUS("%s: -\n"); tosignal = queue->level_buffers < queue->max_buffers; @@ -252,9 +241,9 @@ void gst_queue_push(GstConnection *connection) { g_mutex_unlock(queue->fulllock); } - //DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf); + DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf); gst_pad_push(queue->srcpad,buf); - //DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers); + DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers); /* unlock now */ } diff --git a/plugins/elements/gstqueue.h b/plugins/elements/gstqueue.h index bb9c47bf9e..8e8a608330 100644 --- a/plugins/elements/gstqueue.h +++ b/plugins/elements/gstqueue.h @@ -55,8 +55,7 @@ struct _GstQueue { GstPad *srcpad; /* the queue of buffers we're keeping our grubby hands on */ - GList *queue; - GList *tail; /* have to keep track of this myself */ + GSList *queue; gint level_buffers; /* number of buffers queued here */ gint max_buffers; /* maximum number of buffers queued here */