mirror of
https://gitlab.freedesktop.org/gstreamer/gstreamer.git
synced 2024-11-22 17:51:16 +00:00
The cothreads were not initialized in its thread context resulting in severe stack corruption. This was very hard to ...
Original commit message from CVS: The cothreads were not initialized in its thread context resulting in severe stack corruption. This was very hard to track down. We should be able now to modify some plugins to a loop based setup so that we can get rid of the mp3parse and mp1videoparse elements. Modified the GList to a GSList in the queue.
This commit is contained in:
parent
e5ab7f33ac
commit
d00af07660
8 changed files with 74 additions and 71 deletions
|
@ -19,8 +19,7 @@ cothread_state *cothread_create(cothread_context *ctx) {
|
||||||
cothread_state *s;
|
cothread_state *s;
|
||||||
|
|
||||||
DEBUG("cothread: pthread_self() %ld\n",pthread_self());
|
DEBUG("cothread: pthread_self() %ld\n",pthread_self());
|
||||||
//if (pthread_self() == 0) {
|
if (pthread_self() == 0) {
|
||||||
if (0) {
|
|
||||||
s = (cothread_state *)malloc(sizeof(int) * COTHREAD_STACKSIZE);
|
s = (cothread_state *)malloc(sizeof(int) * COTHREAD_STACKSIZE);
|
||||||
DEBUG("cothread: new stack at %p\n",s);
|
DEBUG("cothread: new stack at %p\n",s);
|
||||||
} else {
|
} else {
|
||||||
|
@ -160,7 +159,6 @@ void cothread_switch(cothread_state *thread) {
|
||||||
SETUP_STACK(thread->sp);
|
SETUP_STACK(thread->sp);
|
||||||
SET_SP(thread->sp);
|
SET_SP(thread->sp);
|
||||||
// start it
|
// start it
|
||||||
//JUMP(cothread_stub);
|
|
||||||
cothread_stub();
|
cothread_stub();
|
||||||
DEBUG("cothread: exit thread \n");
|
DEBUG("cothread: exit thread \n");
|
||||||
ctx->current = 0;
|
ctx->current = 0;
|
||||||
|
|
|
@ -106,13 +106,12 @@ static void gst_queue_class_init(GstQueueClass *klass) {
|
||||||
static void gst_queue_init(GstQueue *queue) {
|
static void gst_queue_init(GstQueue *queue) {
|
||||||
queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK);
|
queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK);
|
||||||
gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad);
|
gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad);
|
||||||
|
|
||||||
gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain);
|
gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain);
|
||||||
|
|
||||||
queue->srcpad = gst_pad_new("src",GST_PAD_SRC);
|
queue->srcpad = gst_pad_new("src",GST_PAD_SRC);
|
||||||
gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad);
|
gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad);
|
||||||
|
|
||||||
queue->queue = NULL;
|
queue->queue = NULL;
|
||||||
queue->tail = NULL;
|
|
||||||
queue->level_buffers = 0;
|
queue->level_buffers = 0;
|
||||||
queue->max_buffers = 10;
|
queue->max_buffers = 10;
|
||||||
queue->level_bytes = 0;
|
queue->level_bytes = 0;
|
||||||
|
@ -152,14 +151,14 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
|
||||||
|
|
||||||
/* we have to lock the queue since we span threads */
|
/* we have to lock the queue since we span threads */
|
||||||
|
|
||||||
DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self());
|
DEBUG("queue: try have queue lock\n");
|
||||||
|
DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self());
|
||||||
GST_LOCK(queue);
|
GST_LOCK(queue);
|
||||||
DEBUG("queue: have queue lock\n");
|
DEBUG("queue: have queue lock\n");
|
||||||
|
|
||||||
if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) {
|
if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) {
|
||||||
g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name);
|
g_slist_foreach(queue->queue, gst_queue_cleanup_buffers, name);
|
||||||
g_list_free(queue->queue);
|
g_slist_free(queue->queue);
|
||||||
queue->queue = NULL;
|
queue->queue = NULL;
|
||||||
queue->level_buffers = 0;
|
queue->level_buffers = 0;
|
||||||
}
|
}
|
||||||
|
@ -167,30 +166,20 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
|
||||||
|
|
||||||
DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
|
DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
|
||||||
|
|
||||||
g_mutex_lock(queue->fulllock);
|
|
||||||
while (queue->level_buffers >= queue->max_buffers) {
|
while (queue->level_buffers >= queue->max_buffers) {
|
||||||
DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
|
DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
|
||||||
STATUS("%s: O\n");
|
STATUS("%s: O\n");
|
||||||
GST_UNLOCK(queue);
|
GST_UNLOCK(queue);
|
||||||
|
g_mutex_lock(queue->fulllock);
|
||||||
g_cond_wait(queue->fullcond,queue->fulllock);
|
g_cond_wait(queue->fullcond,queue->fulllock);
|
||||||
|
g_mutex_unlock(queue->fulllock);
|
||||||
GST_LOCK(queue);
|
GST_LOCK(queue);
|
||||||
STATUS("%s: O+\n");
|
STATUS("%s: O+\n");
|
||||||
DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
|
DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
|
||||||
}
|
}
|
||||||
g_mutex_unlock(queue->fulllock);
|
|
||||||
|
|
||||||
|
/* put the buffer on the tail of the list */
|
||||||
/* put the buffer on the head of the list */
|
queue->queue = g_slist_append(queue->queue,buf);
|
||||||
/* if the queue is NULL, start a new list and make this the tail */
|
|
||||||
if (!queue->queue) {
|
|
||||||
queue->queue = g_list_prepend(queue->queue,buf);
|
|
||||||
// queue->tail = queue->queue;
|
|
||||||
/* otherwise append to the end of the list */
|
|
||||||
} else {
|
|
||||||
// queue->tail = g_list_append(queue->tail,buf);
|
|
||||||
// queue->tail = g_list_next(queue->tail);
|
|
||||||
queue->queue = g_list_append(queue->queue,buf);
|
|
||||||
}
|
|
||||||
STATUS("%s: +\n");
|
STATUS("%s: +\n");
|
||||||
|
|
||||||
/* if we were empty, but aren't any more, signal a condition */
|
/* if we were empty, but aren't any more, signal a condition */
|
||||||
|
@ -202,27 +191,27 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
|
||||||
GST_UNLOCK(queue);
|
GST_UNLOCK(queue);
|
||||||
|
|
||||||
if (tosignal) {
|
if (tosignal) {
|
||||||
g_mutex_lock(queue->emptylock);
|
|
||||||
STATUS("%s: >\n");
|
STATUS("%s: >\n");
|
||||||
|
g_mutex_lock(queue->emptylock);
|
||||||
g_cond_signal(queue->emptycond);
|
g_cond_signal(queue->emptycond);
|
||||||
STATUS("%s: >>\n");
|
|
||||||
g_mutex_unlock(queue->emptylock);
|
g_mutex_unlock(queue->emptylock);
|
||||||
|
STATUS("%s: >>\n");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void gst_queue_push(GstConnection *connection) {
|
void gst_queue_push(GstConnection *connection) {
|
||||||
GstQueue *queue = GST_QUEUE(connection);
|
GstQueue *queue = GST_QUEUE(connection);
|
||||||
GstBuffer *buf = NULL;
|
GstBuffer *buf = NULL;
|
||||||
GList *front;
|
GSList *front;
|
||||||
gboolean tosignal = FALSE;
|
gboolean tosignal = FALSE;
|
||||||
guchar *name;
|
guchar *name;
|
||||||
|
|
||||||
name = gst_element_get_name(GST_ELEMENT(queue));
|
name = gst_element_get_name(GST_ELEMENT(queue));
|
||||||
|
|
||||||
DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond);
|
|
||||||
/* have to lock for thread-safety */
|
/* have to lock for thread-safety */
|
||||||
DEBUG("queue: try have queue lock\n");
|
DEBUG("queue: try have queue lock\n");
|
||||||
GST_LOCK(queue);
|
GST_LOCK(queue);
|
||||||
|
DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond);
|
||||||
DEBUG("queue: have queue lock\n");
|
DEBUG("queue: have queue lock\n");
|
||||||
|
|
||||||
while (!queue->level_buffers) {
|
while (!queue->level_buffers) {
|
||||||
|
@ -237,8 +226,8 @@ void gst_queue_push(GstConnection *connection) {
|
||||||
|
|
||||||
front = queue->queue;
|
front = queue->queue;
|
||||||
buf = (GstBuffer *)(front->data);
|
buf = (GstBuffer *)(front->data);
|
||||||
queue->queue = g_list_remove_link(queue->queue,front);
|
queue->queue = g_slist_remove_link(queue->queue,front);
|
||||||
g_list_free(front);
|
g_slist_free(front);
|
||||||
queue->level_buffers--;
|
queue->level_buffers--;
|
||||||
STATUS("%s: -\n");
|
STATUS("%s: -\n");
|
||||||
tosignal = queue->level_buffers < queue->max_buffers;
|
tosignal = queue->level_buffers < queue->max_buffers;
|
||||||
|
@ -252,9 +241,9 @@ void gst_queue_push(GstConnection *connection) {
|
||||||
g_mutex_unlock(queue->fulllock);
|
g_mutex_unlock(queue->fulllock);
|
||||||
}
|
}
|
||||||
|
|
||||||
//DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf);
|
DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf);
|
||||||
gst_pad_push(queue->srcpad,buf);
|
gst_pad_push(queue->srcpad,buf);
|
||||||
//DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers);
|
DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers);
|
||||||
|
|
||||||
/* unlock now */
|
/* unlock now */
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,8 +55,7 @@ struct _GstQueue {
|
||||||
GstPad *srcpad;
|
GstPad *srcpad;
|
||||||
|
|
||||||
/* the queue of buffers we're keeping our grubby hands on */
|
/* the queue of buffers we're keeping our grubby hands on */
|
||||||
GList *queue;
|
GSList *queue;
|
||||||
GList *tail; /* have to keep track of this myself */
|
|
||||||
|
|
||||||
gint level_buffers; /* number of buffers queued here */
|
gint level_buffers; /* number of buffers queued here */
|
||||||
gint max_buffers; /* maximum number of buffers queued here */
|
gint max_buffers; /* maximum number of buffers queued here */
|
||||||
|
|
27
gst/gstbin.c
27
gst/gstbin.c
|
@ -424,7 +424,7 @@ static int gst_bin_loopfunc_wrapper(int argc,char *argv[]) {
|
||||||
GList *pads;
|
GList *pads;
|
||||||
GstPad *pad;
|
GstPad *pad;
|
||||||
GstBuffer *buf;
|
GstBuffer *buf;
|
||||||
gchar *name = gst_element_get_name(element);
|
G_GNUC_UNUSED gchar *name = gst_element_get_name(element);
|
||||||
|
|
||||||
DEBUG("** gst_bin_loopfunc_wrapper(%d,\"%s\")\n",
|
DEBUG("** gst_bin_loopfunc_wrapper(%d,\"%s\")\n",
|
||||||
argc,gst_element_get_name(element));
|
argc,gst_element_get_name(element));
|
||||||
|
@ -502,11 +502,13 @@ static void gst_bin_create_plan_func(GstBin *bin) {
|
||||||
if (element->loopfunc != NULL) {
|
if (element->loopfunc != NULL) {
|
||||||
g_print("gstbin: loop based element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin)));
|
g_print("gstbin: loop based element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin)));
|
||||||
bin->need_cothreads = TRUE;
|
bin->need_cothreads = TRUE;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
// if it's a complex element, use cothreads
|
// if it's a complex element, use cothreads
|
||||||
else if (GST_ELEMENT_IS_MULTI_IN(element)) {
|
else if (GST_ELEMENT_IS_MULTI_IN(element)) {
|
||||||
g_print("gstbin: complex element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin)));
|
g_print("gstbin: complex element \"%s\" in bin \"%s\"\n", gst_element_get_name(element), gst_element_get_name(GST_ELEMENT(bin)));
|
||||||
bin->need_cothreads = TRUE;
|
bin->need_cothreads = TRUE;
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
// if it has more than one input pad, use cothreads
|
// if it has more than one input pad, use cothreads
|
||||||
sink_pads = 0;
|
sink_pads = 0;
|
||||||
|
@ -553,6 +555,11 @@ static void gst_bin_create_plan_func(GstBin *bin) {
|
||||||
cothread_setfunc(element->threadstate,gst_bin_loopfunc_wrapper,
|
cothread_setfunc(element->threadstate,gst_bin_loopfunc_wrapper,
|
||||||
0,(char **)element);
|
0,(char **)element);
|
||||||
}
|
}
|
||||||
|
if (GST_IS_SRC(element)) {
|
||||||
|
g_print("gstbin: adding '%s' as entry point\n",gst_element_get_name(element));
|
||||||
|
bin->entries = g_list_prepend(bin->entries,element);
|
||||||
|
bin->numentries++;
|
||||||
|
}
|
||||||
|
|
||||||
pads = gst_element_get_pad_list(element);
|
pads = gst_element_get_pad_list(element);
|
||||||
while (pads) {
|
while (pads) {
|
||||||
|
@ -581,10 +588,12 @@ static void gst_bin_create_plan_func(GstBin *bin) {
|
||||||
while (connection_pads) {
|
while (connection_pads) {
|
||||||
opad = GST_PAD(connection_pads->data);
|
opad = GST_PAD(connection_pads->data);
|
||||||
if (gst_pad_get_direction(opad) == GST_PAD_SRC) {
|
if (gst_pad_get_direction(opad) == GST_PAD_SRC) {
|
||||||
g_print("gstbin: setting push&pull handlers for %s:%s SRC connection\n",
|
g_print("gstbin: setting push&pull handlers for %s:%s SRC connection %p %p\n",
|
||||||
gst_element_get_name(outside),gst_pad_get_name(opad));
|
gst_element_get_name(outside),gst_pad_get_name(opad), opad, opad->pullfunc);
|
||||||
|
|
||||||
opad->pushfunc = gst_bin_pushfunc_wrapper;
|
opad->pushfunc = gst_bin_pushfunc_wrapper;
|
||||||
opad->pullfunc = gst_bin_pullfunc_wrapper;
|
opad->pullfunc = gst_bin_pullfunc_wrapper;
|
||||||
|
|
||||||
if (outside->threadstate == NULL) {
|
if (outside->threadstate == NULL) {
|
||||||
outside->threadstate = cothread_create(bin->threadcontext);
|
outside->threadstate = cothread_create(bin->threadcontext);
|
||||||
cothread_setfunc(outside->threadstate,gst_bin_loopfunc_wrapper,
|
cothread_setfunc(outside->threadstate,gst_bin_loopfunc_wrapper,
|
||||||
|
@ -623,9 +632,13 @@ static void gst_bin_create_plan_func(GstBin *bin) {
|
||||||
pad = GST_PAD(pads->data);
|
pad = GST_PAD(pads->data);
|
||||||
/* we only worry about sink pads */
|
/* we only worry about sink pads */
|
||||||
if (gst_pad_get_direction(pad) == GST_PAD_SINK) {
|
if (gst_pad_get_direction(pad) == GST_PAD_SINK) {
|
||||||
|
g_print("gstbin: found SINK pad %s\n", gst_pad_get_name(pad));
|
||||||
/* get the pad's peer */
|
/* get the pad's peer */
|
||||||
peer = gst_pad_get_peer(pad);
|
peer = gst_pad_get_peer(pad);
|
||||||
if (!peer) break;
|
if (!peer) {
|
||||||
|
g_print("gstbin: found SINK pad %s has no peer\n", gst_pad_get_name(pad));
|
||||||
|
break;
|
||||||
|
}
|
||||||
/* get the parent of the peer of the pad */
|
/* get the parent of the peer of the pad */
|
||||||
outside = GST_ELEMENT(gst_pad_get_parent(peer));
|
outside = GST_ELEMENT(gst_pad_get_parent(peer));
|
||||||
if (!outside) break;
|
if (!outside) break;
|
||||||
|
@ -640,6 +653,9 @@ static void gst_bin_create_plan_func(GstBin *bin) {
|
||||||
bin->numentries++;
|
bin->numentries++;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
else {
|
||||||
|
g_print("gstbin: found pad %s\n", gst_pad_get_name(pad));
|
||||||
|
}
|
||||||
pads = g_list_next(pads);
|
pads = g_list_next(pads);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -668,7 +684,8 @@ void gst_bin_iterate_func(GstBin *bin) {
|
||||||
cothread_switch(GST_ELEMENT(bin->children->data)->threadstate);
|
cothread_switch(GST_ELEMENT(bin->children->data)->threadstate);
|
||||||
} else {
|
} else {
|
||||||
if (bin->numentries <= 0) {
|
if (bin->numentries <= 0) {
|
||||||
printf("gstbin: no elements in bin \"%s\"\n", gst_element_get_name(GST_ELEMENT(bin)));
|
printf("gstbin: no entries in bin \"%s\" trying children...\n", gst_element_get_name(GST_ELEMENT(bin)));
|
||||||
|
// we will try loop over the elements then...
|
||||||
entries = bin->children;
|
entries = bin->children;
|
||||||
}
|
}
|
||||||
else {
|
else {
|
||||||
|
|
|
@ -201,7 +201,7 @@ void gst_pad_push(GstPad *pad,GstBuffer *buffer) {
|
||||||
(pad->chainfunc)(pad->peer,buffer);
|
(pad->chainfunc)(pad->peer,buffer);
|
||||||
// else we squawk
|
// else we squawk
|
||||||
} else {
|
} else {
|
||||||
//g_print("-- gst_pad_push(): houston, we have a problem, no way of talking to peer\n");
|
g_print("-- gst_pad_push(): houston, we have a problem, no way of talking to peer\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -340,7 +340,7 @@ void gst_pad_connect(GstPad *srcpad,GstPad *sinkpad) {
|
||||||
/* now copy the chain pointer from sink to src */
|
/* now copy the chain pointer from sink to src */
|
||||||
srcpad->chainfunc = sinkpad->chainfunc;
|
srcpad->chainfunc = sinkpad->chainfunc;
|
||||||
/* and the pull function */
|
/* and the pull function */
|
||||||
srcpad->pullfunc = sinkpad->pullfunc;
|
//srcpad->pullfunc = sinkpad->pullfunc;
|
||||||
|
|
||||||
/* set the connected flag */
|
/* set the connected flag */
|
||||||
/* FIXME: set connected flag */
|
/* FIXME: set connected flag */
|
||||||
|
|
|
@ -55,6 +55,7 @@ static xmlNodePtr gst_thread_save_thyself(GstElement *element,xmlNodePtr parent)
|
||||||
|
|
||||||
static void gst_thread_prepare(GstThread *thread);
|
static void gst_thread_prepare(GstThread *thread);
|
||||||
static void gst_thread_signal_thread(GstThread *thread);
|
static void gst_thread_signal_thread(GstThread *thread);
|
||||||
|
static void gst_thread_create_plan_dummy(GstBin *bin);
|
||||||
|
|
||||||
|
|
||||||
static GstBin *parent_class = NULL;
|
static GstBin *parent_class = NULL;
|
||||||
|
@ -85,10 +86,12 @@ gst_thread_class_init(GstThreadClass *klass) {
|
||||||
GtkObjectClass *gtkobject_class;
|
GtkObjectClass *gtkobject_class;
|
||||||
GstObjectClass *gstobject_class;
|
GstObjectClass *gstobject_class;
|
||||||
GstElementClass *gstelement_class;
|
GstElementClass *gstelement_class;
|
||||||
|
GstBinClass *gstbin_class;
|
||||||
|
|
||||||
gtkobject_class = (GtkObjectClass*)klass;
|
gtkobject_class = (GtkObjectClass*)klass;
|
||||||
gstobject_class = (GstObjectClass*)klass;
|
gstobject_class = (GstObjectClass*)klass;
|
||||||
gstelement_class = (GstElementClass*)klass;
|
gstelement_class = (GstElementClass*)klass;
|
||||||
|
gstbin_class = (GstBinClass*)klass;
|
||||||
|
|
||||||
parent_class = gtk_type_class(gst_bin_get_type());
|
parent_class = gtk_type_class(gst_bin_get_type());
|
||||||
|
|
||||||
|
@ -98,6 +101,8 @@ gst_thread_class_init(GstThreadClass *klass) {
|
||||||
gstelement_class->change_state = gst_thread_change_state;
|
gstelement_class->change_state = gst_thread_change_state;
|
||||||
gstelement_class->save_thyself = gst_thread_save_thyself;
|
gstelement_class->save_thyself = gst_thread_save_thyself;
|
||||||
|
|
||||||
|
gstbin_class->create_plan = gst_thread_create_plan_dummy;
|
||||||
|
|
||||||
gtkobject_class->set_arg = gst_thread_set_arg;
|
gtkobject_class->set_arg = gst_thread_set_arg;
|
||||||
gtkobject_class->get_arg = gst_thread_get_arg;
|
gtkobject_class->get_arg = gst_thread_get_arg;
|
||||||
}
|
}
|
||||||
|
@ -109,6 +114,10 @@ static void gst_thread_init(GstThread *thread) {
|
||||||
thread->cond = g_cond_new();
|
thread->cond = g_cond_new();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void gst_thread_create_plan_dummy(GstBin *bin) {
|
||||||
|
gst_info("gstthread: create plan delayed until thread starts\n");
|
||||||
|
}
|
||||||
|
|
||||||
static void gst_thread_set_arg(GtkObject *object,GtkArg *arg,guint id) {
|
static void gst_thread_set_arg(GtkObject *object,GtkArg *arg,guint id) {
|
||||||
/* it's not null if we got it, but it might not be ours */
|
/* it's not null if we got it, but it might not be ours */
|
||||||
g_return_if_fail(GST_IS_THREAD(object));
|
g_return_if_fail(GST_IS_THREAD(object));
|
||||||
|
@ -247,6 +256,9 @@ void *gst_thread_main_loop(void *arg) {
|
||||||
gst_info("gstthread: thread \"%s\" is running with PID %d\n",
|
gst_info("gstthread: thread \"%s\" is running with PID %d\n",
|
||||||
gst_element_get_name(GST_ELEMENT(thread)), getpid());
|
gst_element_get_name(GST_ELEMENT(thread)), getpid());
|
||||||
|
|
||||||
|
if (GST_BIN_CLASS(parent_class)->create_plan)
|
||||||
|
GST_BIN_CLASS(parent_class)->create_plan(GST_BIN(thread));
|
||||||
|
|
||||||
while(!GST_FLAG_IS_SET(thread,GST_THREAD_STATE_REAPING)) {
|
while(!GST_FLAG_IS_SET(thread,GST_THREAD_STATE_REAPING)) {
|
||||||
if (GST_FLAG_IS_SET(thread,GST_THREAD_STATE_SPINNING))
|
if (GST_FLAG_IS_SET(thread,GST_THREAD_STATE_SPINNING))
|
||||||
gst_bin_iterate(GST_BIN(thread));
|
gst_bin_iterate(GST_BIN(thread));
|
||||||
|
|
|
@ -106,13 +106,12 @@ static void gst_queue_class_init(GstQueueClass *klass) {
|
||||||
static void gst_queue_init(GstQueue *queue) {
|
static void gst_queue_init(GstQueue *queue) {
|
||||||
queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK);
|
queue->sinkpad = gst_pad_new("sink",GST_PAD_SINK);
|
||||||
gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad);
|
gst_element_add_pad(GST_ELEMENT(queue),queue->sinkpad);
|
||||||
|
|
||||||
gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain);
|
gst_pad_set_chain_function(queue->sinkpad,gst_queue_chain);
|
||||||
|
|
||||||
queue->srcpad = gst_pad_new("src",GST_PAD_SRC);
|
queue->srcpad = gst_pad_new("src",GST_PAD_SRC);
|
||||||
gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad);
|
gst_element_add_pad(GST_ELEMENT(queue),queue->srcpad);
|
||||||
|
|
||||||
queue->queue = NULL;
|
queue->queue = NULL;
|
||||||
queue->tail = NULL;
|
|
||||||
queue->level_buffers = 0;
|
queue->level_buffers = 0;
|
||||||
queue->max_buffers = 10;
|
queue->max_buffers = 10;
|
||||||
queue->level_bytes = 0;
|
queue->level_bytes = 0;
|
||||||
|
@ -152,14 +151,14 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
|
||||||
|
|
||||||
/* we have to lock the queue since we span threads */
|
/* we have to lock the queue since we span threads */
|
||||||
|
|
||||||
DEBUG("queue: %s adding buffer %p %d\n", name, buf, pthread_self());
|
DEBUG("queue: try have queue lock\n");
|
||||||
|
DEBUG("queue: %s adding buffer %p %ld\n", name, buf, pthread_self());
|
||||||
GST_LOCK(queue);
|
GST_LOCK(queue);
|
||||||
DEBUG("queue: have queue lock\n");
|
DEBUG("queue: have queue lock\n");
|
||||||
|
|
||||||
if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) {
|
if (GST_BUFFER_FLAG_IS_SET(buf, GST_BUFFER_FLUSH)) {
|
||||||
g_list_foreach(queue->queue, gst_queue_cleanup_buffers, name);
|
g_slist_foreach(queue->queue, gst_queue_cleanup_buffers, name);
|
||||||
g_list_free(queue->queue);
|
g_slist_free(queue->queue);
|
||||||
queue->queue = NULL;
|
queue->queue = NULL;
|
||||||
queue->level_buffers = 0;
|
queue->level_buffers = 0;
|
||||||
}
|
}
|
||||||
|
@ -167,30 +166,20 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
|
||||||
|
|
||||||
DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
|
DEBUG("queue: %s: chain %d %p\n", name, queue->level_buffers, buf);
|
||||||
|
|
||||||
g_mutex_lock(queue->fulllock);
|
|
||||||
while (queue->level_buffers >= queue->max_buffers) {
|
while (queue->level_buffers >= queue->max_buffers) {
|
||||||
DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
|
DEBUG("queue: %s waiting %d\n", name, queue->level_buffers);
|
||||||
STATUS("%s: O\n");
|
STATUS("%s: O\n");
|
||||||
GST_UNLOCK(queue);
|
GST_UNLOCK(queue);
|
||||||
|
g_mutex_lock(queue->fulllock);
|
||||||
g_cond_wait(queue->fullcond,queue->fulllock);
|
g_cond_wait(queue->fullcond,queue->fulllock);
|
||||||
|
g_mutex_unlock(queue->fulllock);
|
||||||
GST_LOCK(queue);
|
GST_LOCK(queue);
|
||||||
STATUS("%s: O+\n");
|
STATUS("%s: O+\n");
|
||||||
DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
|
DEBUG("queue: %s waiting done %d\n", name, queue->level_buffers);
|
||||||
}
|
}
|
||||||
g_mutex_unlock(queue->fulllock);
|
|
||||||
|
|
||||||
|
/* put the buffer on the tail of the list */
|
||||||
/* put the buffer on the head of the list */
|
queue->queue = g_slist_append(queue->queue,buf);
|
||||||
/* if the queue is NULL, start a new list and make this the tail */
|
|
||||||
if (!queue->queue) {
|
|
||||||
queue->queue = g_list_prepend(queue->queue,buf);
|
|
||||||
// queue->tail = queue->queue;
|
|
||||||
/* otherwise append to the end of the list */
|
|
||||||
} else {
|
|
||||||
// queue->tail = g_list_append(queue->tail,buf);
|
|
||||||
// queue->tail = g_list_next(queue->tail);
|
|
||||||
queue->queue = g_list_append(queue->queue,buf);
|
|
||||||
}
|
|
||||||
STATUS("%s: +\n");
|
STATUS("%s: +\n");
|
||||||
|
|
||||||
/* if we were empty, but aren't any more, signal a condition */
|
/* if we were empty, but aren't any more, signal a condition */
|
||||||
|
@ -202,27 +191,27 @@ void gst_queue_chain(GstPad *pad,GstBuffer *buf) {
|
||||||
GST_UNLOCK(queue);
|
GST_UNLOCK(queue);
|
||||||
|
|
||||||
if (tosignal) {
|
if (tosignal) {
|
||||||
g_mutex_lock(queue->emptylock);
|
|
||||||
STATUS("%s: >\n");
|
STATUS("%s: >\n");
|
||||||
|
g_mutex_lock(queue->emptylock);
|
||||||
g_cond_signal(queue->emptycond);
|
g_cond_signal(queue->emptycond);
|
||||||
STATUS("%s: >>\n");
|
|
||||||
g_mutex_unlock(queue->emptylock);
|
g_mutex_unlock(queue->emptylock);
|
||||||
|
STATUS("%s: >>\n");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void gst_queue_push(GstConnection *connection) {
|
void gst_queue_push(GstConnection *connection) {
|
||||||
GstQueue *queue = GST_QUEUE(connection);
|
GstQueue *queue = GST_QUEUE(connection);
|
||||||
GstBuffer *buf = NULL;
|
GstBuffer *buf = NULL;
|
||||||
GList *front;
|
GSList *front;
|
||||||
gboolean tosignal = FALSE;
|
gboolean tosignal = FALSE;
|
||||||
guchar *name;
|
guchar *name;
|
||||||
|
|
||||||
name = gst_element_get_name(GST_ELEMENT(queue));
|
name = gst_element_get_name(GST_ELEMENT(queue));
|
||||||
|
|
||||||
DEBUG("queue: %s push %d %d %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond);
|
|
||||||
/* have to lock for thread-safety */
|
/* have to lock for thread-safety */
|
||||||
DEBUG("queue: try have queue lock\n");
|
DEBUG("queue: try have queue lock\n");
|
||||||
GST_LOCK(queue);
|
GST_LOCK(queue);
|
||||||
|
DEBUG("queue: %s push %d %ld %p\n", name, queue->level_buffers, pthread_self(), queue->emptycond);
|
||||||
DEBUG("queue: have queue lock\n");
|
DEBUG("queue: have queue lock\n");
|
||||||
|
|
||||||
while (!queue->level_buffers) {
|
while (!queue->level_buffers) {
|
||||||
|
@ -237,8 +226,8 @@ void gst_queue_push(GstConnection *connection) {
|
||||||
|
|
||||||
front = queue->queue;
|
front = queue->queue;
|
||||||
buf = (GstBuffer *)(front->data);
|
buf = (GstBuffer *)(front->data);
|
||||||
queue->queue = g_list_remove_link(queue->queue,front);
|
queue->queue = g_slist_remove_link(queue->queue,front);
|
||||||
g_list_free(front);
|
g_slist_free(front);
|
||||||
queue->level_buffers--;
|
queue->level_buffers--;
|
||||||
STATUS("%s: -\n");
|
STATUS("%s: -\n");
|
||||||
tosignal = queue->level_buffers < queue->max_buffers;
|
tosignal = queue->level_buffers < queue->max_buffers;
|
||||||
|
@ -252,9 +241,9 @@ void gst_queue_push(GstConnection *connection) {
|
||||||
g_mutex_unlock(queue->fulllock);
|
g_mutex_unlock(queue->fulllock);
|
||||||
}
|
}
|
||||||
|
|
||||||
//DEBUG("queue: %s pushing %d %p %p %p\n", name, queue->level_buffers, buf);
|
DEBUG("queue: %s pushing %d %p \n", name, queue->level_buffers, buf);
|
||||||
gst_pad_push(queue->srcpad,buf);
|
gst_pad_push(queue->srcpad,buf);
|
||||||
//DEBUG("queue: %s pushing %d done %p %p\n", name, queue->level_buffers);
|
DEBUG("queue: %s pushing %d done \n", name, queue->level_buffers);
|
||||||
|
|
||||||
/* unlock now */
|
/* unlock now */
|
||||||
}
|
}
|
||||||
|
|
|
@ -55,8 +55,7 @@ struct _GstQueue {
|
||||||
GstPad *srcpad;
|
GstPad *srcpad;
|
||||||
|
|
||||||
/* the queue of buffers we're keeping our grubby hands on */
|
/* the queue of buffers we're keeping our grubby hands on */
|
||||||
GList *queue;
|
GSList *queue;
|
||||||
GList *tail; /* have to keep track of this myself */
|
|
||||||
|
|
||||||
gint level_buffers; /* number of buffers queued here */
|
gint level_buffers; /* number of buffers queued here */
|
||||||
gint max_buffers; /* maximum number of buffers queued here */
|
gint max_buffers; /* maximum number of buffers queued here */
|
||||||
|
|
Loading…
Reference in a new issue